Skip to content

Sketching

Probabilistic data structures: TopK, CountMinSketch, TDigest, HyperLogLog, BloomFilter, ReservoirSampler, MerkleTree.

Streaming/sketching algorithms for approximate statistics.

This module provides space-efficient algorithms for computing approximate statistics over data streams. These are useful in simulations for: - Tracking heavy hitters (most frequent items) - Estimating item frequencies - Computing approximate quantiles/percentiles - Estimating cardinality (distinct count) - Testing set membership - Uniform random sampling

All algorithms share common properties: - Bounded memory usage (configurable) - Single-pass processing (add items one at a time) - Mergeable (combine sketches from parallel streams) - Reproducible (optional seed for deterministic behavior)

Quick Reference

TopK: Heavy hitters (top-k most frequent items) CountMinSketch: Frequency estimation for any item TDigest: Quantile/percentile estimation (p50, p95, p99) HyperLogLog: Cardinality (distinct count) estimation BloomFilter: Probabilistic set membership testing ReservoirSampler: Uniform random sampling from streams

Example

from happysimulator.sketching import ( TopK, CountMinSketch, TDigest, HyperLogLog, BloomFilter, ReservoirSampler, )

Track top 100 customers by request volume

topk = TopKint for customer_id in requests: topk.add(customer_id) print(topk.top(10)) # Top 10 most active customers

Track latency percentiles

td = TDigest(compression=100) for latency in latencies: td.add(latency) print(f"p99: {td.percentile(99)}")

Count unique visitors

hll = HyperLogLogstr for visitor_id in visitors: hll.add(visitor_id) print(f"~{hll.cardinality()} unique visitors")

CardinalitySketch

Bases: Sketch

Protocol for sketches that estimate cardinality (distinct count).

Used for estimating the number of unique items in a stream without storing all items.

Implementations: HyperLogLog

cardinality abstractmethod

cardinality() -> int

Estimate the number of distinct items.

Returns:

Type Description
int

Estimated count of unique items added.

FrequencyEstimate dataclass

FrequencyEstimate(item: T, count: int, error: int)

A frequency estimate for an item.

Attributes:

Name Type Description
item T

The item being estimated.

count int

Estimated frequency count.

error int

Upper bound on the estimation error.

FrequencySketch

Bases: Sketch

Protocol for sketches that estimate item frequencies.

Used for finding heavy hitters (most frequent items) and estimating how many times specific items appeared in the stream.

Implementations: TopK (Space-Saving), Count-Min Sketch

estimate abstractmethod

estimate(item: T) -> int

Estimate the frequency of an item.

Parameters:

Name Type Description Default
item T

The item to estimate frequency for.

required

Returns:

Type Description
int

Estimated count. May be an overestimate (never an underestimate

int

for Count-Min) or approximate (for Space-Saving).

top abstractmethod

top(k: int) -> list[FrequencyEstimate[T]]

Return the top-k most frequent items.

Parameters:

Name Type Description Default
k int

Number of top items to return.

required

Returns:

Type Description
list[FrequencyEstimate[T]]

List of FrequencyEstimate objects sorted by count (descending).

list[FrequencyEstimate[T]]

May return fewer than k items if fewer distinct items were seen.

MembershipSketch

Bases: Sketch

Protocol for sketches that test set membership.

Used for efficiently testing if an item was (probably) seen before. False positives are possible; false negatives are not.

Implementations: Bloom Filter

false_positive_rate abstractmethod property

false_positive_rate: float

Estimated false positive rate given current fill level.

Returns:

Type Description
float

Probability that contains() returns True for an item not in the set.

contains abstractmethod

contains(item: T) -> bool

Test if an item is (probably) in the set.

Parameters:

Name Type Description Default
item T

The item to test.

required

Returns:

Type Description
bool

True if the item might be in the set (possible false positive).

bool

False if the item is definitely not in the set (no false negatives).

QuantileSketch

Bases: Sketch

Protocol for sketches that estimate quantiles/percentiles.

Used for estimating latency percentiles (p50, p95, p99) without storing all values.

Implementations: T-Digest

quantile abstractmethod

quantile(q: float) -> float

Estimate the value at a given quantile.

Parameters:

Name Type Description Default
q float

Quantile to estimate (0.0 to 1.0). - 0.5 = median (p50) - 0.95 = 95th percentile (p95) - 0.99 = 99th percentile (p99)

required

Returns:

Type Description
float

Estimated value at the quantile.

Raises:

Type Description
ValueError

If q is not in [0, 1].

cdf abstractmethod

cdf(value: float) -> float

Estimate the cumulative distribution function at a value.

Parameters:

Name Type Description Default
value float

The value to get CDF for.

required

Returns:

Type Description
float

Estimated probability that a random sample <= value (0.0 to 1.0).

percentile

percentile(p: float) -> float

Convenience method for percentile estimation.

Parameters:

Name Type Description Default
p float

Percentile (0 to 100).

required

Returns:

Type Description
float

Estimated value at the percentile.

Raises:

Type Description
ValueError

If p is not in [0, 100].

SamplingSketch

Bases: Sketch

Protocol for sketches that maintain a sample of stream items.

Used for maintaining a representative sample of a stream for later analysis.

Implementations: Reservoir Sampling

capacity abstractmethod property

capacity: int

Maximum number of items in the sample.

Returns:

Type Description
int

Sample size limit.

sample abstractmethod

sample() -> list[T]

Return the current sample.

Returns:

Type Description
list[T]

List of sampled items (may be smaller than capacity if fewer items seen).

__iter__ abstractmethod

__iter__() -> Iterator[T]

Iterate over sampled items.

Sketch

Bases: ABC

Base protocol for all streaming/sketching algorithms.

Sketches process a stream of items and provide approximate answers to queries about the stream. They support: - Adding items (with optional counts) - Merging two sketches of the same type - Estimating memory usage - Clearing state for reuse

All sketches should accept a seed parameter for reproducibility when the algorithm uses randomization.

memory_bytes abstractmethod property

memory_bytes: int

Estimated memory usage in bytes.

Returns:

Type Description
int

Approximate memory footprint of the sketch data structures.

item_count abstractmethod property

item_count: int

Total count of items added to the sketch.

Returns:

Type Description
int

Sum of all counts added via add().

add abstractmethod

add(item: T, count: int = 1) -> None

Add an item to the sketch.

Parameters:

Name Type Description Default
item T

The item to add.

required
count int

Number of occurrences to add (default 1).

1

merge abstractmethod

merge(other: Sketch) -> None

Merge another sketch of the same type into this one.

After merging, this sketch contains the combined information from both sketches, as if all items from both had been added to one sketch.

Parameters:

Name Type Description Default
other Sketch

Another sketch of the same type and configuration.

required

Raises:

Type Description
TypeError

If other is not the same sketch type.

ValueError

If other has incompatible configuration.

clear abstractmethod

clear() -> None

Reset the sketch to its initial empty state.

BloomFilter

BloomFilter(
    size_bits: int,
    num_hashes: int | None = None,
    seed: int | None = None,
)

Bases: MembershipSketch[T]

Bloom Filter for probabilistic set membership.

Space-efficient data structure for testing set membership. Can report false positives but never false negatives.

Parameters:

Name Type Description Default
size_bits int

Size of the bit array. Larger = lower false positive rate.

required
num_hashes int | None

Number of hash functions. If None, calculated optimally.

None
seed int | None

Random seed for hash function initialization.

None

Alternatively, create from expected usage: BloomFilter.from_expected_items(n=10000, fp_rate=0.01)

Example

Check if URLs have been visited

visited = BloomFilter.from_expected_items(n=1_000_000, fp_rate=0.01)

for url in crawl_history: visited.add(url)

if visited.contains(new_url): print("Probably visited before") else: print("Definitely not visited")

Initialize Bloom Filter.

Parameters:

Name Type Description Default
size_bits int

Size of the bit array. Must be > 0.

required
num_hashes int | None

Number of hash functions. If None, defaults to 7.

None
seed int | None

Random seed for reproducibility.

None

Raises:

Type Description
ValueError

If size_bits <= 0 or num_hashes <= 0.

size_bits property

size_bits: int

Size of the bit array in bits.

num_hashes property

num_hashes: int

Number of hash functions used.

false_positive_rate property

false_positive_rate: float

Estimated current false positive rate.

Returns:

Type Description
float

Probability that contains() returns True for an item not in the set.

fill_ratio property

fill_ratio: float

Proportion of bits that are set.

memory_bytes property

memory_bytes: int

Estimated memory usage in bytes.

item_count property

item_count: int

Total count of items added.

from_expected_items classmethod

from_expected_items(
    n: int, fp_rate: float, seed: int | None = None
) -> BloomFilter[T]

Create a filter sized for expected items and false positive rate.

Parameters:

Name Type Description Default
n int

Expected number of items to insert.

required
fp_rate float

Desired false positive rate (e.g., 0.01 for 1%).

required
seed int | None

Random seed for reproducibility.

None

Returns:

Type Description
BloomFilter[T]

BloomFilter configured for the specified parameters.

Raises:

Type Description
ValueError

If n < 0 or fp_rate not in (0, 1).

add

add(item: T, count: int = 1) -> None

Add an item to the filter.

Note: count > 1 has the same effect as count = 1 for membership.

Parameters:

Name Type Description Default
item T

The item to add. Must be hashable.

required
count int

Number of times to add (only presence matters).

1

Raises:

Type Description
ValueError

If count is negative.

contains

contains(item: T) -> bool

Test if an item is (probably) in the set.

Parameters:

Name Type Description Default
item T

The item to test.

required

Returns:

Type Description
bool

True if the item might be in the set (possible false positive).

bool

False if the item is definitely not in the set.

__contains__

__contains__(item: T) -> bool

Support 'in' operator.

merge

merge(other: BloomFilter[T]) -> None

Merge another Bloom Filter into this one (OR operation).

After merging, this filter represents the union of both sets.

Parameters:

Name Type Description Default
other BloomFilter[T]

Another BloomFilter with same size and hash count.

required

Raises:

Type Description
TypeError

If other is not a BloomFilter.

ValueError

If other has different configuration.

clear

clear() -> None

Reset the filter to empty state.

CountMinSketch

CountMinSketch(
    width: int, depth: int, seed: int | None = None
)

Bases: FrequencySketch[T]

Count-Min Sketch for frequency estimation.

A probabilistic data structure that estimates item frequencies using multiple hash functions and a 2D counter array. Provides strong guarantees on estimation error.

Parameters:

Name Type Description Default
width int

Number of counters per row. Larger = more accurate.

required
depth int

Number of hash functions/rows. Larger = higher confidence.

required
seed int | None

Random seed for hash function initialization.

None

Alternatively, create from error bounds: CountMinSketch.from_error_rate(epsilon=0.01, delta=0.01)

Error guarantees
  • Query always returns count >= true count (never underestimates)
  • With probability ≥ 1-δ: estimate ≤ true_count + εN
  • Where ε = e/width, δ = e^(-depth), N = total item count
Example

Create sketch for 1% error with 99% confidence

cms = CountMinSketch.from_error_rate(epsilon=0.01, delta=0.01)

for item in data_stream: cms.add(item)

Query frequency (may overestimate)

freq = cms.estimate("hot_key")

Initialize Count-Min Sketch.

Parameters:

Name Type Description Default
width int

Number of counters per row. Must be positive.

required
depth int

Number of hash functions/rows. Must be positive.

required
seed int | None

Random seed for reproducible hash functions.

None

Raises:

Type Description
ValueError

If width or depth <= 0.

width property

width: int

Number of counters per row.

depth property

depth: int

Number of hash functions/rows.

epsilon property

epsilon: float

Error rate parameter (e/width).

delta property

delta: float

Failure probability parameter (e^-depth).

memory_bytes property

memory_bytes: int

Estimated memory usage in bytes.

item_count property

item_count: int

Total count of items added.

from_error_rate classmethod

from_error_rate(
    epsilon: float, delta: float, seed: int | None = None
) -> CountMinSketch[T]

Create a sketch with specified error guarantees.

Parameters:

Name Type Description Default
epsilon float

Maximum relative error (e.g., 0.01 for 1%).

required
delta float

Failure probability (e.g., 0.01 for 99% confidence).

required
seed int | None

Random seed for reproducibility.

None

Returns:

Type Description
CountMinSketch[T]

CountMinSketch configured for the specified guarantees.

Raises:

Type Description
ValueError

If epsilon or delta not in (0, 1).

add

add(item: T, count: int = 1) -> None

Add an item occurrence to the sketch.

Parameters:

Name Type Description Default
item T

The item to add. Must be hashable.

required
count int

Number of occurrences to add (default 1).

1

Raises:

Type Description
ValueError

If count is negative.

estimate

estimate(item: T) -> int

Estimate the frequency of an item.

Parameters:

Name Type Description Default
item T

The item to estimate frequency for.

required

Returns:

Type Description
int

Estimated count. This is guaranteed to be >= true count,

int

and with high probability <= true_count + epsilon * total_count.

estimate_with_error

estimate_with_error(item: T) -> FrequencyEstimate[T]

Get frequency estimate with error bounds.

Parameters:

Name Type Description Default
item T

The item to estimate.

required

Returns:

Type Description
FrequencyEstimate[T]

FrequencyEstimate with count and error bound.

top

top(k: int) -> list[FrequencyEstimate[T]]

Return top-k items.

Note: Count-Min Sketch doesn't track items, only counts. This method cannot efficiently find top-k. For top-k queries, use TopK instead.

Parameters:

Name Type Description Default
k int

Number of top items to return.

required

Returns:

Type Description
list[FrequencyEstimate[T]]

Empty list (Count-Min Sketch doesn't track item identities).

Raises:

Type Description
NotImplementedError

Always, as CMS cannot enumerate items.

inner_product

inner_product(other: CountMinSketch[T]) -> int

Estimate the inner product of two streams.

The inner product is sum(count_A[x] * count_B[x]) over all items x. Useful for comparing similarity of two streams.

Parameters:

Name Type Description Default
other CountMinSketch[T]

Another Count-Min Sketch with same dimensions.

required

Returns:

Type Description
int

Estimated inner product.

Raises:

Type Description
ValueError

If sketches have different dimensions.

merge

merge(other: CountMinSketch[T]) -> None

Merge another Count-Min Sketch into this one.

After merging, this sketch estimates frequencies for the combined stream.

Parameters:

Name Type Description Default
other CountMinSketch[T]

Another Count-Min Sketch with same dimensions and seed.

required

Raises:

Type Description
TypeError

If other is not a CountMinSketch.

ValueError

If other has different dimensions or seed.

clear

clear() -> None

Reset the sketch to empty state.

HyperLogLog

HyperLogLog(precision: int = 14, seed: int | None = None)

Bases: CardinalitySketch

HyperLogLog for streaming cardinality estimation.

Estimates the number of distinct elements in a stream using fixed memory. Uses multiple registers that track the maximum number of leading zeros in hashed values.

Parameters:

Name Type Description Default
precision int

Number of bits for register index (4-16). - precision=4: 16 registers, ~26% error - precision=10: 1024 registers, ~3.2% error - precision=14: 16384 registers, ~0.8% error Default is 14 for good accuracy.

14
seed int | None

Random seed for hash function initialization.

None
Example

Count unique visitors

hll = HyperLogLogstr

for visitor_id in visitor_stream: hll.add(visitor_id)

unique_count = hll.cardinality() print(f"~{unique_count} unique visitors")

Initialize HyperLogLog.

Parameters:

Name Type Description Default
precision int

Bits for register index (4-16). Default 14.

14
seed int | None

Random seed for reproducibility.

None

Raises:

Type Description
ValueError

If precision not in [4, 16].

precision property

precision: int

Number of bits used for register indexing.

num_registers property

num_registers: int

Number of registers (2^precision).

memory_bytes property

memory_bytes: int

Estimated memory usage in bytes.

item_count property

item_count: int

Total count of items added (not distinct count).

add

add(item: T, count: int = 1) -> None

Add an item to the sketch.

Note: count parameter is accepted but only the presence of the item matters for cardinality estimation, not how many times it's added.

Parameters:

Name Type Description Default
item T

The item to add. Must be hashable.

required
count int

Ignored for cardinality (only presence matters).

1

cardinality

cardinality() -> int

Estimate the number of distinct items.

Returns:

Type Description
int

Estimated count of unique items.

standard_error

standard_error() -> float

Theoretical standard error of the estimate.

Returns:

Type Description
float

Expected relative error (e.g., 0.01 for 1% error).

merge

merge(other: HyperLogLog[T]) -> None

Merge another HyperLogLog into this one.

After merging, this sketch estimates the cardinality of the union of both streams.

Parameters:

Name Type Description Default
other HyperLogLog[T]

Another HyperLogLog with same precision.

required

Raises:

Type Description
TypeError

If other is not a HyperLogLog.

ValueError

If other has different precision.

clear

clear() -> None

Reset the sketch to empty state.

KeyRange dataclass

KeyRange(start: str, end: str)

An inclusive range of keys [start, end].

Attributes:

Name Type Description
start str

First key in the range (inclusive).

end str

Last key in the range (inclusive).

contains

contains(key: str) -> bool

Check if a key falls within this range.

MerkleNode dataclass

MerkleNode(
    hash: str,
    key_range: KeyRange,
    left: MerkleNode | None = None,
    right: MerkleNode | None = None,
)

A node in the Merkle tree.

Leaf nodes cover a single key; internal nodes cover the union of their children's key ranges.

Attributes:

Name Type Description
hash str

SHA-256 hex digest of the subtree contents.

key_range KeyRange

The range of keys covered by this subtree.

left MerkleNode | None

Left child (None for leaf nodes).

right MerkleNode | None

Right child (None for leaf nodes).

is_leaf property

is_leaf: bool

True if this is a leaf node.

MerkleTree

MerkleTree()

Hash tree for detecting divergent key ranges between replicas.

Binary tree over sorted keys where each internal node's hash is derived from its children's hashes. Comparing two trees top-down lets you skip matching subtrees and only transfer data for divergent ranges.

Attributes:

Name Type Description
root_hash str

SHA-256 hex digest of the root node (empty string if empty).

size int

Number of key-value pairs in the tree.

root_hash property

root_hash: str

SHA-256 hex digest of the root node, or empty string if empty.

root property

root: MerkleNode | None

The root node of the tree, or None if empty.

size property

size: int

Number of key-value pairs in the tree.

build classmethod

build(data: dict[str, Any]) -> MerkleTree

Build a Merkle tree from a dictionary of key-value pairs.

Parameters:

Name Type Description Default
data dict[str, Any]

Key-value pairs to build the tree from. Keys are sorted lexicographically to produce a deterministic tree structure.

required

Returns:

Type Description
MerkleTree

A new MerkleTree covering all provided keys.

update

update(key: str, value: Any) -> None

Update a single key and rebuild the tree.

Parameters:

Name Type Description Default
key str

The key to update or insert.

required
value Any

The new value.

required

remove

remove(key: str) -> bool

Remove a key and rebuild the tree.

Parameters:

Name Type Description Default
key str

The key to remove.

required

Returns:

Type Description
bool

True if the key existed, False otherwise.

get

get(key: str) -> Any | None

Get a value by key (for convenience during anti-entropy sync).

Parameters:

Name Type Description Default
key str

The key to look up.

required

Returns:

Type Description
Any | None

The value, or None if not present.

keys

keys() -> list[str]

Return all keys in sorted order.

items

items() -> list[tuple[str, Any]]

Return all key-value pairs in sorted order.

diff

diff(other: MerkleTree) -> list[KeyRange]

Find key ranges that differ between this tree and another.

Walks both trees top-down, skipping subtrees whose hashes match. Returns the key ranges of leaf-level divergences.

Parameters:

Name Type Description Default
other MerkleTree

The other MerkleTree to compare against.

required

Returns:

Type Description
list[KeyRange]

List of KeyRange objects covering keys that differ.

list[KeyRange]

Empty list if the trees are identical.

ReservoirSampler

ReservoirSampler(size: int, seed: int | None = None)

Bases: SamplingSketch[T]

Reservoir Sampling for uniform stream sampling.

Maintains a uniform random sample of fixed size from a stream. Uses Algorithm R from Vitter's paper for O(1) update time.

Parameters:

Name Type Description Default
size int

Maximum number of items to keep in the sample.

required
seed int | None

Random seed for reproducibility.

None
Example

Sample 1000 requests for analysis

sampler = ReservoirSamplerRequest

for request in request_stream: sampler.add(request)

Get uniform sample

sample = sampler.sample() analyze(sample)

Initialize Reservoir Sampler.

Parameters:

Name Type Description Default
size int

Maximum sample size. Must be > 0.

required
seed int | None

Random seed for reproducibility.

None

Raises:

Type Description
ValueError

If size <= 0.

capacity property

capacity: int

Maximum number of items in the sample.

is_full property

is_full: bool

Whether the reservoir is at capacity.

memory_bytes property

memory_bytes: int

Estimated memory usage in bytes.

item_count property

item_count: int

Total count of items seen (not sample size).

sample_size property

sample_size: int

Current number of items in the sample.

add

add(item: T, count: int = 1) -> None

Add an item to the stream.

The item may or may not be kept in the reservoir, depending on random selection to maintain uniform sampling.

Parameters:

Name Type Description Default
item T

The item to add.

required
count int

Number of times to add this item. Each occurrence is considered independently for sampling.

1

Raises:

Type Description
ValueError

If count is negative.

sample

sample() -> list[T]

Return the current sample.

Returns:

Type Description
list[T]

List of sampled items. Length is min(capacity, items_seen).

__iter__

__iter__() -> Iterator[T]

Iterate over sampled items.

__len__

__len__() -> int

Number of items currently in the reservoir.

__getitem__

__getitem__(index: int) -> T

Get item at index in the reservoir.

merge

merge(other: ReservoirSampler[T]) -> None

Merge another reservoir into this one.

Maintains uniform sampling over the combined streams. Uses weighted random selection based on total counts.

Parameters:

Name Type Description Default
other ReservoirSampler[T]

Another ReservoirSampler with same capacity.

required

Raises:

Type Description
TypeError

If other is not a ReservoirSampler.

ValueError

If other has different capacity.

clear

clear() -> None

Reset the sampler to empty state.

TDigest

TDigest(
    compression: float = 100.0, seed: int | None = None
)

Bases: QuantileSketch

T-Digest for streaming quantile estimation.

Maintains a compressed representation of the data distribution using weighted centroids. Provides excellent accuracy at extreme quantiles (p1, p99, p999) while using bounded memory.

Parameters:

Name Type Description Default
compression float

Controls accuracy vs memory tradeoff (default 100). Higher values = more centroids = more accuracy = more memory. Typical values: 50-500.

100.0
seed int | None

Not used (algorithm is deterministic) but accepted for API consistency.

None
Example

Track latency distribution

td = TDigest(compression=100)

for latency in latencies: td.add(latency)

Get percentiles

p50 = td.percentile(50) p95 = td.percentile(95) p99 = td.percentile(99) p999 = td.percentile(99.9)

Initialize T-Digest.

Parameters:

Name Type Description Default
compression float

Controls number of centroids. Must be > 0.

100.0
seed int | None

Unused (algorithm is deterministic). Accepted for API consistency.

None

Raises:

Type Description
ValueError

If compression <= 0.

compression property

compression: float

Compression factor (higher = more accuracy).

memory_bytes property

memory_bytes: int

Estimated memory usage in bytes.

item_count property

item_count: int

Total count of items added.

centroid_count property

centroid_count: int

Number of centroids currently stored.

min property

min: float | None

Minimum value seen.

max property

max: float | None

Maximum value seen.

add

add(value: float, count: int = 1) -> None

Add a value to the digest.

Parameters:

Name Type Description Default
value float

The value to add.

required
count int

Number of times to add this value (default 1).

1

Raises:

Type Description
ValueError

If count is negative.

quantile

quantile(q: float) -> float

Estimate the value at a given quantile.

Parameters:

Name Type Description Default
q float

Quantile to estimate (0.0 to 1.0).

required

Returns:

Type Description
float

Estimated value at the quantile.

Raises:

Type Description
ValueError

If q is not in [0, 1] or digest is empty.

cdf

cdf(value: float) -> float

Estimate the cumulative distribution function at a value.

Parameters:

Name Type Description Default
value float

The value to get CDF for.

required

Returns:

Type Description
float

Estimated probability that a random sample <= value (0.0 to 1.0).

merge

merge(other: TDigest) -> None

Merge another T-Digest into this one.

Parameters:

Name Type Description Default
other TDigest

Another T-Digest to merge.

required

Raises:

Type Description
TypeError

If other is not a TDigest.

clear

clear() -> None

Reset the digest to empty state.

TopK

TopK(k: int, seed: int | None = None)

Bases: FrequencySketch[T]

Top-K heavy hitters using the Space-Saving algorithm.

Maintains k counters to track the approximately k most frequent items. When a new item arrives that isn't tracked, it replaces the item with the minimum count, inheriting that count as an error bound.

Parameters:

Name Type Description Default
k int

Number of items to track. Larger k = more accurate but more memory.

required
seed int | None

Not used (algorithm is deterministic) but accepted for API consistency.

None
Example

Track top 100 customers by request count

topk = TopKint

for customer_id in request_stream: topk.add(customer_id)

Get the 10 most frequent customers

for estimate in topk.top(10): print(f"Customer {estimate.item}: ~{estimate.count} requests " f"(error <= {estimate.error})")

Initialize TopK sketch.

Parameters:

Name Type Description Default
k int

Number of items to track. Must be positive.

required
seed int | None

Unused (algorithm is deterministic). Accepted for API consistency.

None

Raises:

Type Description
ValueError

If k <= 0.

k property

k: int

Number of items being tracked.

memory_bytes property

memory_bytes: int

Estimated memory usage in bytes.

item_count property

item_count: int

Total count of items added.

tracked_count property

tracked_count: int

Number of distinct items currently tracked.

add

add(item: T, count: int = 1) -> None

Add an item occurrence to the sketch.

Parameters:

Name Type Description Default
item T

The item to add. Must be hashable.

required
count int

Number of occurrences to add (default 1).

1

Raises:

Type Description
ValueError

If count is negative.

estimate

estimate(item: T) -> int

Estimate the frequency of an item.

Parameters:

Name Type Description Default
item T

The item to estimate.

required

Returns:

Type Description
int

Estimated count. If item is tracked, returns its counter value.

int

If not tracked, returns 0 (it may have been evicted, but its

int

true count is bounded by max_error()).

estimate_with_error

estimate_with_error(item: T) -> FrequencyEstimate[T]

Get frequency estimate with error bounds.

Parameters:

Name Type Description Default
item T

The item to estimate.

required

Returns:

Type Description
FrequencyEstimate[T]

FrequencyEstimate with count and error bound.

top

top(n: int | None = None) -> list[FrequencyEstimate[T]]

Return the top-n most frequent items.

Parameters:

Name Type Description Default
n int | None

Number of top items to return. If None, returns all tracked items.

None

Returns:

Type Description
list[FrequencyEstimate[T]]

List of FrequencyEstimate objects sorted by count (descending).

__contains__

__contains__(item: T) -> bool

Check if an item is currently being tracked.

max_error

max_error() -> int

Maximum possible error for any estimate.

Returns:

Type Description
int

Upper bound on estimation error. For any item, its true count

int

differs from estimate() by at most this amount.

guaranteed_threshold

guaranteed_threshold() -> int

Frequency threshold above which items are guaranteed to be tracked.

Returns:

Type Description
int

Threshold T such that any item with true frequency > T is

int

guaranteed to be in the top-k. Equal to N/k where N is total count.

merge

merge(other: TopK[T]) -> None

Merge another TopK sketch into this one.

After merging, this sketch approximates the top-k of the combined stream. Note that merging TopK sketches can lose accuracy compared to a single sketch processing both streams - this is a fundamental limitation.

Parameters:

Name Type Description Default
other TopK[T]

Another TopK sketch with the same k value.

required

Raises:

Type Description
TypeError

If other is not a TopK.

ValueError

If other has different k value.

clear

clear() -> None

Reset the sketch to empty state.