Skip to content

Data Store

Key-value stores and database abstractions for distributed data simulation.

Data store components for simulating storage systems.

This module provides key-value stores, caching layers, and related storage infrastructure for simulating realistic data access patterns.

Example

from happysimulator.components.datastore import KVStore, CachedStore, LRUEviction

Simple key-value store

store = KVStore(name="db")

Cached store with LRU eviction

cache = CachedStore( name="cached_db", backing_store=store, cache_capacity=1000, eviction_policy=LRUEviction(), )

CacheWarmer

CacheWarmer(
    name: str,
    cache: Entity,
    keys_to_warm: list[str] | Callable[[], list[str]],
    warmup_rate: float = 100.0,
    warmup_latency: float = 0.001,
)

Bases: Entity

Pre-populates cache during cold start.

Fetches specified keys from the backing store and loads them into the cache before normal traffic begins. This reduces cache misses during the initial warm-up period.

The warmer processes keys at a configurable rate to avoid overwhelming the backing store.

Attributes:

Name Type Description
name

Entity name for identification.

progress float

Fraction of warming complete (0.0 to 1.0).

is_complete bool

Whether warming has finished.

Initialize the cache warmer.

Parameters:

Name Type Description Default
name str

Name for this warmer entity.

required
cache Entity

The cache to warm (must have a get method).

required
keys_to_warm list[str] | Callable[[], list[str]]

List of keys or callable that returns keys.

required
warmup_rate float

Keys to warm per second.

100.0
warmup_latency float

Simulated latency per key fetch in seconds.

0.001

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: CacheWarmerStats

Frozen snapshot of cache warmer statistics.

progress property

progress: float

Fraction of warming complete (0.0 to 1.0).

is_complete property

is_complete: bool

Whether warming has finished.

is_started property

is_started: bool

Whether warming has started.

warmup_rate property

warmup_rate: float

Keys warmed per second.

get_keys_to_warm

get_keys_to_warm() -> list[str]

Get the list of keys to warm.

Resolves the keys provider if it's a callable.

Returns:

Type Description
list[str]

List of keys to warm.

start_warming

start_warming() -> Event

Start the cache warming process.

Returns:

Type Description
Event

Initial event to begin warming.

warm_keys

warm_keys(now: Instant) -> Generator[float]

Warm all keys.

This is a generator that yields delays while warming.

Parameters:

Name Type Description Default
now Instant

Current simulation time.

required

Yields:

Type Description
Generator[float]

Delays between key fetches.

handle_event

handle_event(
    event: Event,
) -> Generator[float, None, list[Event]]

Handle warming events.

Parameters:

Name Type Description Default
event Event

The warming event.

required

Yields:

Type Description
float

Delays for key fetches.

Returns:

Type Description
list[Event]

Empty list when warming is complete.

CacheWarmerStats dataclass

CacheWarmerStats(
    keys_to_warm: int = 0,
    keys_warmed: int = 0,
    keys_failed: int = 0,
    warmup_time_seconds: float = 0.0,
)

Statistics tracked by CacheWarmer.

CachedStore

CachedStore(
    name: str,
    backing_store: KVStore,
    cache_capacity: int,
    eviction_policy: CacheEvictionPolicy,
    cache_read_latency: float = 0.0001,
    write_through: bool = True,
)

Bases: Entity

Cache layer in front of a backing store.

Provides fast access to frequently-used data by caching it in memory. Supports various eviction policies and write strategies.

Attributes:

Name Type Description
name

Entity name for identification.

cache_capacity int

Maximum number of cached entries.

hit_rate float

Ratio of cache hits to total reads.

miss_rate float

Ratio of cache misses to total reads.

Initialize the cached store.

Parameters:

Name Type Description Default
name str

Name for this cache entity.

required
backing_store KVStore

The underlying storage to cache.

required
cache_capacity int

Maximum number of entries to cache.

required
eviction_policy CacheEvictionPolicy

Policy for selecting entries to evict.

required
cache_read_latency float

Latency for cache hits in seconds.

0.0001
write_through bool

If True, writes go to both cache and backing store. If False, writes only go to cache (write-back).

True

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: CachedStoreStats

Frozen snapshot of cached store statistics.

backing_store property

backing_store: KVStore

The underlying backing store.

cache_capacity property

cache_capacity: int

Maximum number of cached entries.

cache_size property

cache_size: int

Current number of cached entries.

hit_rate property

hit_rate: float

Ratio of cache hits to total reads.

miss_rate property

miss_rate: float

Ratio of cache misses to total reads.

get

get(key: str) -> Generator[float, None, Any | None]

Get a value, checking cache first.

Parameters:

Name Type Description Default
key str

The key to look up.

required

Yields:

Type Description
float

Cache or backing store latency.

Returns:

Type Description
Any | None

The value if found, None otherwise.

put

put(key: str, value: Any) -> Generator[float]

Store a value.

With write-through, writes to both cache and backing store. With write-back, writes only to cache (must flush later).

Parameters:

Name Type Description Default
key str

The key to store under.

required
value Any

The value to store.

required

Yields:

Type Description
Generator[float]

Write latency.

delete

delete(key: str) -> Generator[float, None, bool]

Delete a key from cache and backing store.

Parameters:

Name Type Description Default
key str

The key to delete.

required

Yields:

Type Description
float

Delete latency.

Returns:

Type Description
bool

True if key existed in either cache or backing store.

invalidate

invalidate(key: str) -> None

Remove a key from cache only (not backing store).

Parameters:

Name Type Description Default
key str

The key to invalidate.

required

invalidate_all

invalidate_all() -> None

Clear the entire cache.

flush

flush() -> Generator[float, None, int]

Write all dirty entries to backing store.

Only relevant for write-back mode.

Yields:

Type Description
float

Write latencies.

Returns:

Type Description
int

Number of entries flushed.

contains_cached

contains_cached(key: str) -> bool

Check if a key is in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Type Description
bool

True if key is cached.

get_cached_keys

get_cached_keys() -> list[str]

Get all cached keys.

Returns:

Type Description
list[str]

List of cached keys.

get_dirty_keys

get_dirty_keys() -> list[str]

Get keys pending writeback.

Returns:

Type Description
list[str]

List of dirty keys.

handle_event

handle_event(event: Event) -> None

CachedStore can handle events for cache operations.

CachedStoreStats dataclass

CachedStoreStats(
    reads: int = 0,
    writes: int = 0,
    hits: int = 0,
    misses: int = 0,
    evictions: int = 0,
    writebacks: int = 0,
)

Statistics tracked by CachedStore.

Database

Database(
    name: str,
    max_connections: int = 100,
    query_latency: float | Callable[[str], float] = 0.005,
    connection_latency: float = 0.01,
    commit_latency: float = 0.01,
    rollback_latency: float = 0.005,
)

Bases: Entity

Relational database with connection pool and transactions.

Simulates a database with connection pooling, query execution, and transaction support. Query latency can be fixed or depend on the query type.

Attributes:

Name Type Description
name

Entity name for identification.

max_connections int

Maximum concurrent connections.

active_connections int

Currently active connections.

available_connections int

Connections available for use.

Initialize the database.

Parameters:

Name Type Description Default
name str

Name for this database entity.

required
max_connections int

Maximum number of concurrent connections.

100
query_latency float | Callable[[str], float]

Latency per query (fixed or function of query).

0.005
connection_latency float

Latency to acquire a connection.

0.01
commit_latency float

Latency for transaction commit.

0.01
rollback_latency float

Latency for transaction rollback.

0.005

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: DatabaseStats

Frozen snapshot of database statistics.

max_connections property

max_connections: int

Maximum concurrent connections.

active_connections property

active_connections: int

Number of currently active connections.

available_connections property

available_connections: int

Number of available connections.

pending_waiters property

pending_waiters: int

Number of waiters for connections.

execute

execute(query: str) -> Generator[float, None, Any]

Execute a query using a temporary connection.

Acquires a connection, executes the query, and releases.

Parameters:

Name Type Description Default
query str

The SQL query to execute.

required

Yields:

Type Description
float

Connection and query latency.

Returns:

Type Description
Any

Query result.

begin_transaction

begin_transaction() -> Generator[float, None, Transaction]

Begin a new transaction.

Acquires a connection and starts a transaction.

Yields:

Type Description
float

Connection latency.

Returns:

Type Description
Transaction

The new transaction.

create_table

create_table(name: str) -> None

Create a table (for simulation).

Parameters:

Name Type Description Default
name str

Table name.

required

get_table_names

get_table_names() -> list[str]

Get all table names.

Returns:

Type Description
list[str]

List of table names.

handle_event

handle_event(event: Event) -> None

Database can handle events for query execution.

DatabaseStats dataclass

DatabaseStats(
    queries_executed: int = 0,
    transactions_started: int = 0,
    transactions_committed: int = 0,
    transactions_rolled_back: int = 0,
    connections_created: int = 0,
    connections_closed: int = 0,
    connection_wait_count: int = 0,
    connection_wait_time_total: float = 0.0,
    query_latencies: list[float] = list(),
)

Statistics tracked by Database.

avg_query_latency property

avg_query_latency: float

Average query latency.

query_latency_p95 property

query_latency_p95: float

95th percentile query latency.

Transaction

Transaction(
    transaction_id: int,
    database: Database,
    connection: Connection,
)

Represents a database transaction.

Initialize the transaction.

Parameters:

Name Type Description Default
transaction_id int

Unique transaction identifier.

required
database Database

The database this transaction belongs to.

required
connection Connection

The connection used by this transaction.

required

id property

id: int

Transaction identifier.

state property

state: TransactionState

Current transaction state.

is_active property

is_active: bool

Whether the transaction is still active.

execute

execute(query: str) -> Generator[float, None, Any]

Execute a query within this transaction.

Parameters:

Name Type Description Default
query str

The SQL query to execute.

required

Yields:

Type Description
float

Query latency.

Returns:

Type Description
Any

Query result.

Raises:

Type Description
RuntimeError

If transaction is not active.

commit

commit() -> Generator[float]

Commit the transaction.

Yields:

Type Description
Generator[float]

Commit latency.

Raises:

Type Description
RuntimeError

If transaction is not active.

rollback

rollback() -> Generator[float]

Roll back the transaction.

Yields:

Type Description
Generator[float]

Rollback latency.

Raises:

Type Description
RuntimeError

If transaction is not active.

TransactionState

Bases: Enum

State of a database transaction.

CacheEvictionPolicy

Bases: ABC

Abstract base class for cache eviction policies.

Eviction policies track key access patterns and decide which key to evict when the cache is full.

on_access abstractmethod

on_access(key: str) -> None

Called when a key is accessed (read or write).

Parameters:

Name Type Description Default
key str

The accessed key.

required

on_insert abstractmethod

on_insert(key: str) -> None

Called when a new key is inserted.

Parameters:

Name Type Description Default
key str

The inserted key.

required

on_remove abstractmethod

on_remove(key: str) -> None

Called when a key is explicitly removed.

Parameters:

Name Type Description Default
key str

The removed key.

required

evict abstractmethod

evict() -> str | None

Select a key to evict.

Returns:

Type Description
str | None

The key to evict, or None if no keys to evict.

clear abstractmethod

clear() -> None

Clear all tracking state.

ClockEviction

ClockEviction()

Bases: CacheEvictionPolicy

Clock (Second-Chance) eviction policy.

Approximates LRU using a circular buffer with reference bits. Each key has a reference bit that is set on access.

On eviction: - Scan keys in circular order - If reference bit is set, clear it and move on (second chance) - If reference bit is clear, evict that key

This provides O(1) access updates and amortized O(1) eviction, making it very efficient for large caches.

Also known as: Second-Chance algorithm, Not Recently Used (NRU).

Initialize Clock eviction policy.

size property

size: int

Number of tracked keys.

on_access

on_access(key: str) -> None

Set reference bit for accessed key.

on_insert

on_insert(key: str) -> None

Insert key with reference bit set.

on_remove

on_remove(key: str) -> None

Remove key from tracking.

evict

evict() -> str | None

Evict using clock algorithm.

clear

clear() -> None

Clear tracking state.

FIFOEviction

FIFOEviction()

Bases: CacheEvictionPolicy

First-In-First-Out eviction policy.

Evicts the key that was inserted earliest. Simple and predictable, but doesn't consider access patterns.

Initialize FIFO eviction policy.

on_access

on_access(key: str) -> None

FIFO doesn't update on access.

on_insert

on_insert(key: str) -> None

Add key to end of queue.

on_remove

on_remove(key: str) -> None

Remove key from queue.

evict

evict() -> str | None

Return first inserted key.

clear

clear() -> None

Clear tracking state.

LFUEviction

LFUEviction()

Bases: CacheEvictionPolicy

Least Frequently Used eviction policy.

Evicts the key with the lowest access count. Good for workloads where popular items should stay cached.

Initialize LFU eviction policy.

on_access

on_access(key: str) -> None

Increment access count for key.

on_insert

on_insert(key: str) -> None

Initialize count for new key.

on_remove

on_remove(key: str) -> None

Remove key from tracking.

evict

evict() -> str | None

Return least frequently used key.

clear

clear() -> None

Clear tracking state.

LRUEviction

LRUEviction()

Bases: CacheEvictionPolicy

Least Recently Used eviction policy.

Evicts the key that hasn't been accessed for the longest time. Good for workloads with temporal locality.

Initialize LRU eviction policy.

on_access

on_access(key: str) -> None

Move key to end (most recently used).

on_insert

on_insert(key: str) -> None

Add key as most recently used.

on_remove

on_remove(key: str) -> None

Remove key from tracking.

evict

evict() -> str | None

Return least recently used key.

clear

clear() -> None

Clear tracking state.

RandomEviction

RandomEviction(seed: int | None = None)

Bases: CacheEvictionPolicy

Random eviction policy.

Evicts a random key. Useful as a baseline for comparison.

Initialize random eviction policy.

Parameters:

Name Type Description Default
seed int | None

Random seed for reproducibility.

None

on_access

on_access(key: str) -> None

Random doesn't track access.

on_insert

on_insert(key: str) -> None

Track key.

on_remove

on_remove(key: str) -> None

Remove key from tracking.

evict

evict() -> str | None

Return a random key.

clear

clear() -> None

Clear tracking state.

SampledLRUEviction

SampledLRUEviction(
    sample_size: int = 5, seed: int | None = None
)

Bases: CacheEvictionPolicy

Sampled LRU (Probabilistic LRU) eviction policy.

Instead of tracking exact LRU order for all keys, this policy: 1. Samples N random keys from the cache 2. Evicts the least recently used among the sample

This provides O(1) access updates (just update timestamp) and O(N) eviction, where N is the sample size. For large caches, this is much cheaper than true LRU's O(log n) or O(n) operations.

Used by Redis for its approximated LRU implementation.

Attributes:

Name Type Description
sample_size int

Number of keys to sample during eviction.

Initialize Sampled LRU eviction policy.

Parameters:

Name Type Description Default
sample_size int

Number of keys to sample for eviction decision. Higher = more accurate but slower. Redis uses 5.

5
seed int | None

Random seed for reproducibility.

None

Raises:

Type Description
ValueError

If sample_size < 1.

sample_size property

sample_size: int

Number of keys sampled during eviction.

on_access

on_access(key: str) -> None

Update access time for key.

on_insert

on_insert(key: str) -> None

Record insertion time as access time.

on_remove

on_remove(key: str) -> None

Remove key from tracking.

evict

evict() -> str | None

Sample keys and evict the least recently accessed.

clear

clear() -> None

Clear tracking state.

SLRUEviction

SLRUEviction(protected_ratio: float = 0.8)

Bases: CacheEvictionPolicy

Segmented LRU (SLRU) eviction policy.

Divides the cache into two segments: - Probationary segment: New items enter here - Protected segment: Items that are accessed again move here

Eviction order: probationary first (LRU), then protected (LRU). This provides scan resistance - a single scan won't evict hot items.

Attributes:

Name Type Description
protected_ratio float

Fraction of capacity for protected segment (0.0-1.0).

Initialize SLRU eviction policy.

Parameters:

Name Type Description Default
protected_ratio float

Fraction of total capacity for protected segment. Default 0.8 means 80% protected, 20% probationary.

0.8

Raises:

Type Description
ValueError

If protected_ratio not in (0, 1).

protected_ratio property

protected_ratio: float

Fraction of capacity for protected segment.

probationary_size property

probationary_size: int

Current size of probationary segment.

protected_size property

protected_size: int

Current size of protected segment.

on_access

on_access(key: str) -> None

Promote key from probationary to protected on re-access.

on_insert

on_insert(key: str) -> None

Insert new key into probationary segment.

on_remove

on_remove(key: str) -> None

Remove key from whichever segment it's in.

evict

evict() -> str | None

Evict from probationary first, then protected.

clear

clear() -> None

Clear both segments.

TTLEviction

TTLEviction(
    ttl: float,
    clock_func: Callable[[], float] | None = None,
)

Bases: CacheEvictionPolicy

Time-To-Live based eviction policy.

Evicts keys that have exceeded their TTL. If no expired keys, evicts the oldest key.

Attributes:

Name Type Description
ttl float

Time-to-live in seconds.

Initialize TTL eviction policy.

Parameters:

Name Type Description Default
ttl float

Time-to-live in seconds.

required
clock_func Callable[[], float] | None

Function returning current time. Defaults to time.time().

None

Raises:

Type Description
ValueError

If ttl <= 0.

ttl property

ttl: float

Time-to-live in seconds.

on_access

on_access(key: str) -> None

TTL doesn't update on access (only insertion time matters).

on_insert

on_insert(key: str) -> None

Record insertion time.

on_remove

on_remove(key: str) -> None

Remove key from tracking.

evict

evict() -> str | None

Return an expired key, or oldest key if none expired.

is_expired

is_expired(key: str) -> bool

Check if a key has expired.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Type Description
bool

True if expired or not tracked.

get_expired_keys

get_expired_keys() -> list[str]

Get all expired keys.

Returns:

Type Description
list[str]

List of expired keys.

clear

clear() -> None

Clear tracking state.

TwoQueueEviction

TwoQueueEviction(kin_ratio: float = 0.25)

Bases: CacheEvictionPolicy

2Q (Two Queue) eviction policy.

Similar to SLRU but uses FIFO for the first queue (A1). Provides excellent scan resistance.

Structure: - A1in: FIFO queue for first-time accessed items - A1out: Ghost queue tracking recently evicted from A1in (keys only) - Am: LRU queue for items accessed more than once

On first access: item goes to A1in On access while in A1in: nothing (FIFO, no promotion yet) On access while in Am: move to MRU position in Am On access of item in A1out (ghost): add to Am (it's a valuable item) On eviction: remove from A1in first, then Am

Attributes:

Name Type Description
kin_ratio float

Fraction of cache for A1in queue (default 0.25).

Initialize 2Q eviction policy.

Parameters:

Name Type Description Default
kin_ratio float

Fraction of total cache for A1in queue.

0.25

Raises:

Type Description
ValueError

If kin_ratio not in (0, 1).

kin_ratio property

kin_ratio: float

Fraction of cache for A1in queue.

on_access

on_access(key: str) -> None

Handle access - promote if in ghost queue or Am.

on_insert

on_insert(key: str) -> None

Insert new key, checking ghost queue first.

on_remove

on_remove(key: str) -> None

Remove key from all queues.

evict

evict() -> str | None

Evict from A1in first, then Am.

clear

clear() -> None

Clear all queues.

KVStore

KVStore(
    name: str,
    read_latency: float = 0.001,
    write_latency: float = 0.005,
    delete_latency: float | None = None,
    capacity: int | None = None,
)

Bases: Entity

In-memory key-value store with latency simulation.

Stores key-value pairs with configurable read and write latencies. Optionally limits capacity and evicts oldest entries when full.

Attributes:

Name Type Description
name

Entity name for identification.

read_latency float

Time in seconds for read operations.

write_latency float

Time in seconds for write operations.

capacity int | None

Maximum number of keys (None = unlimited).

Initialize the key-value store.

Parameters:

Name Type Description Default
name str

Name for this store entity.

required
read_latency float

Latency for get operations in seconds.

0.001
write_latency float

Latency for put operations in seconds.

0.005
delete_latency float | None

Latency for delete operations (defaults to write_latency).

None
capacity int | None

Maximum number of keys (None = unlimited).

None

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: KVStoreStats

Frozen snapshot of KVStore statistics.

read_latency property

read_latency: float

Latency for read operations in seconds.

write_latency property

write_latency: float

Latency for write operations in seconds.

capacity property

capacity: int | None

Maximum number of keys.

size property

size: int

Current number of stored keys.

get

get(key: str) -> Generator[float, None, Any | None]

Get a value by key.

Parameters:

Name Type Description Default
key str

The key to look up.

required

Yields:

Type Description
float

Read latency delay.

Returns:

Type Description
Any | None

The value if found, None otherwise.

get_sync

get_sync(key: str) -> Any | None

Get a value synchronously (no latency, for internal use).

Parameters:

Name Type Description Default
key str

The key to look up.

required

Returns:

Type Description
Any | None

The value if found, None otherwise.

put

put(key: str, value: Any) -> Generator[float]

Store a value.

Parameters:

Name Type Description Default
key str

The key to store under.

required
value Any

The value to store.

required

Yields:

Type Description
Generator[float]

Write latency delay.

put_sync

put_sync(key: str, value: Any) -> None

Store a value synchronously (no latency, for internal use).

Parameters:

Name Type Description Default
key str

The key to store under.

required
value Any

The value to store.

required

delete

delete(key: str) -> Generator[float, None, bool]

Delete a key.

Parameters:

Name Type Description Default
key str

The key to delete.

required

Yields:

Type Description
float

Delete latency delay.

Returns:

Type Description
bool

True if key existed, False otherwise.

delete_sync

delete_sync(key: str) -> bool

Delete a key synchronously (no latency, for internal use).

Parameters:

Name Type Description Default
key str

The key to delete.

required

Returns:

Type Description
bool

True if key existed, False otherwise.

contains

contains(key: str) -> bool

Check if a key exists (no latency).

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Type Description
bool

True if key exists.

keys

keys() -> list[str]

Get all keys (no latency).

Returns:

Type Description
list[str]

List of all keys.

clear

clear() -> None

Clear all data (no latency).

handle_event

handle_event(event: Event) -> None

KVStore can handle events for get/put operations.

KVStoreStats dataclass

KVStoreStats(
    reads: int = 0,
    writes: int = 0,
    deletes: int = 0,
    hits: int = 0,
    misses: int = 0,
    evictions: int = 0,
)

Statistics tracked by KVStore.

MultiTierCache

MultiTierCache(
    name: str,
    tiers: list[Entity],
    backing_store: Entity,
    promotion_policy: PromotionPolicy
    | str = PromotionPolicy.ALWAYS,
)

Bases: Entity

Hierarchical multi-tier cache.

Checks caches from fastest (L1) to slowest (Ln), then backing store. Supports promotion of items to faster tiers on access.

Tiers should be ordered from fastest to slowest, with L1 typically being smallest/fastest and Ln being largest/slowest.

Attributes:

Name Type Description
name

Entity name for identification.

num_tiers int

Number of cache tiers.

hit_rate float

Overall cache hit rate across all tiers.

Initialize the multi-tier cache.

Parameters:

Name Type Description Default
name str

Name for this cache entity.

required
tiers list[Entity]

List of cache tiers from fastest (L1) to slowest.

required
backing_store Entity

The underlying storage behind all caches.

required
promotion_policy PromotionPolicy | str

Policy for promoting items to higher tiers.

ALWAYS

Raises:

Type Description
ValueError

If no tiers provided.

stats property

stats: MultiTierCacheStats

Frozen snapshot of multi-tier cache statistics.

num_tiers property

num_tiers: int

Number of cache tiers.

tiers property

tiers: list[Entity]

The cache tiers (L1 to Ln).

backing_store property

backing_store: Entity

The underlying backing store.

promotion_policy property

promotion_policy: PromotionPolicy

Policy for promoting items between tiers.

hit_rate property

hit_rate: float

Overall cache hit rate across all tiers.

get

get(key: str) -> Generator[float, None, Any | None]

Get a value, checking tiers from fastest to slowest.

Parameters:

Name Type Description Default
key str

The key to look up.

required

Yields:

Type Description
float

Cache or backing store latency.

Returns:

Type Description
Any | None

The value if found, None otherwise.

put

put(key: str, value: Any) -> Generator[float]

Store a value in cache and backing store.

Writes to backing store and invalidates/updates all cache tiers.

Parameters:

Name Type Description Default
key str

The key to store under.

required
value Any

The value to store.

required

Yields:

Type Description
Generator[float]

Write latency.

delete

delete(key: str) -> Generator[float, None, bool]

Delete a key from all tiers and backing store.

Parameters:

Name Type Description Default
key str

The key to delete.

required

Yields:

Type Description
float

Delete latency.

Returns:

Type Description
bool

True if key existed anywhere.

invalidate

invalidate(key: str) -> None

Remove a key from all cache tiers (not backing store).

Parameters:

Name Type Description Default
key str

The key to invalidate.

required

invalidate_all

invalidate_all() -> None

Clear all cache tiers.

get_tier_stats

get_tier_stats() -> dict[int, dict]

Get statistics for each tier.

Returns:

Type Description
dict[int, dict]

Dictionary mapping tier index to tier stats.

handle_event

handle_event(event: Event) -> None

MultiTierCache can handle events for cache operations.

MultiTierCacheStats dataclass

MultiTierCacheStats(
    reads: int = 0,
    writes: int = 0,
    tier_hits: dict[int, int] = dict(),
    backing_store_hits: int = 0,
    misses: int = 0,
    promotions: int = 0,
)

Statistics tracked by MultiTierCache.

PromotionPolicy

Bases: Enum

Policy for promoting items to higher cache tiers.

ConsistencyLevel

Bases: Enum

Consistency level for distributed operations.

ReplicatedStore

ReplicatedStore(
    name: str,
    replicas: list[Entity],
    read_consistency: ConsistencyLevel = ConsistencyLevel.QUORUM,
    write_consistency: ConsistencyLevel = ConsistencyLevel.QUORUM,
    read_timeout: float = 1.0,
    write_timeout: float = 2.0,
)

Bases: Entity

Distributed key-value store with replication.

Replicates data across multiple nodes and uses configurable consistency levels for read and write operations.

Quorum = floor(n/2) + 1, where n is the number of replicas.

For strong consistency, use QUORUM for both reads and writes (R + W > N guarantees seeing most recent write).

Attributes:

Name Type Description
name

Entity name for identification.

num_replicas int

Number of replica nodes.

read_consistency ConsistencyLevel

Consistency level for reads.

write_consistency ConsistencyLevel

Consistency level for writes.

Initialize the replicated store.

Parameters:

Name Type Description Default
name str

Name for this store entity.

required
replicas list[Entity]

List of replica KVStore nodes.

required
read_consistency ConsistencyLevel

Consistency level for read operations.

QUORUM
write_consistency ConsistencyLevel

Consistency level for write operations.

QUORUM
read_timeout float

Timeout for read operations in seconds.

1.0
write_timeout float

Timeout for write operations in seconds.

2.0

Raises:

Type Description
ValueError

If insufficient replicas for consistency level.

stats property

stats: ReplicatedStoreStats

Frozen snapshot of replicated store statistics.

num_replicas property

num_replicas: int

Number of replica nodes.

replicas property

replicas: list[Entity]

The replica nodes.

read_consistency property

read_consistency: ConsistencyLevel

Consistency level for reads.

write_consistency property

write_consistency: ConsistencyLevel

Consistency level for writes.

quorum_size property

quorum_size: int

Number of replicas needed for quorum.

get

get(key: str) -> Generator[float, None, Any | None]

Get a value with configured read consistency.

Reads from multiple replicas in parallel and returns the value once enough responses are received.

Parameters:

Name Type Description Default
key str

The key to look up.

required

Yields:

Type Description
float

Read latency delays.

Returns:

Type Description
Any | None

The value if found, None otherwise.

put

put(key: str, value: Any) -> Generator[float, None, bool]

Store a value with configured write consistency.

Writes to multiple replicas in parallel and returns success once enough acknowledgments are received.

Parameters:

Name Type Description Default
key str

The key to store under.

required
value Any

The value to store.

required

Yields:

Type Description
float

Write latency delays.

Returns:

Type Description
bool

True if write met consistency requirement.

delete

delete(key: str) -> Generator[float, None, bool]

Delete a key from all replicas.

Requires write consistency for deletion.

Parameters:

Name Type Description Default
key str

The key to delete.

required

Yields:

Type Description
float

Delete latency delays.

Returns:

Type Description
bool

True if delete met consistency requirement.

get_replica_status

get_replica_status() -> list[dict]

Get status of all replicas.

Returns:

Type Description
list[dict]

List of dictionaries with replica status.

handle_event

handle_event(event: Event) -> None

ReplicatedStore can handle events for store operations.

ReplicatedStoreStats dataclass

ReplicatedStoreStats(
    reads: int = 0,
    writes: int = 0,
    read_successes: int = 0,
    read_failures: int = 0,
    write_successes: int = 0,
    write_failures: int = 0,
    replica_timeouts: int = 0,
    read_latencies: list[float] = list(),
    write_latencies: list[float] = list(),
)

Statistics tracked by ReplicatedStore.

read_latency_p50 property

read_latency_p50: float

50th percentile read latency.

read_latency_p99 property

read_latency_p99: float

99th percentile read latency.

write_latency_p50 property

write_latency_p50: float

50th percentile write latency.

write_latency_p99 property

write_latency_p99: float

99th percentile write latency.

ConsistentHashSharding

ConsistentHashSharding(
    virtual_nodes: int = 100, seed: int | None = None
)

Consistent hash sharding.

Uses consistent hashing with virtual nodes to minimize key redistribution when shards are added or removed.

Initialize consistent hash sharding.

Parameters:

Name Type Description Default
virtual_nodes int

Number of virtual nodes per shard.

100
seed int | None

Optional random seed for reproducibility.

None

get_shard

get_shard(key: str, num_shards: int) -> int

Get shard index using consistent hashing.

HashSharding

Simple hash-based sharding.

Uses MD5 hash of the key modulo number of shards. Fast and uniform, but reshards all keys when shard count changes.

get_shard

get_shard(key: str, num_shards: int) -> int

Get shard index using hash modulo.

RangeSharding

RangeSharding(boundaries: list[str] | None = None)

Range-based sharding.

Assigns key ranges to shards based on alphabetical ordering. Good for range queries, but can lead to hot spots.

Initialize range sharding.

Parameters:

Name Type Description Default
boundaries list[str] | None

Optional list of key boundaries for shards. If None, keys are distributed alphabetically.

None

get_shard

get_shard(key: str, num_shards: int) -> int

Get shard index using range boundaries.

ShardedStore

ShardedStore(
    name: str,
    shards: list[Entity],
    sharding_strategy: ShardingStrategy | None = None,
)

Bases: Entity

Horizontally partitioned key-value store.

Distributes data across multiple shards using a configurable sharding strategy. Each key maps to exactly one shard.

Attributes:

Name Type Description
name

Entity name for identification.

num_shards int

Number of shard nodes.

sharding_strategy ShardingStrategy

Strategy used for key distribution.

Initialize the sharded store.

Parameters:

Name Type Description Default
name str

Name for this store entity.

required
shards list[Entity]

List of shard nodes (KVStore or similar).

required
sharding_strategy ShardingStrategy | None

Strategy for distributing keys. Defaults to HashSharding.

None

Raises:

Type Description
ValueError

If no shards provided.

stats property

stats: ShardedStoreStats

Frozen snapshot of sharded store statistics.

num_shards property

num_shards: int

Number of shard nodes.

shards property

shards: list[Entity]

The shard nodes.

sharding_strategy property

sharding_strategy: ShardingStrategy

Strategy used for key distribution.

get

get(key: str) -> Generator[float, None, Any | None]

Get a value from the appropriate shard.

Parameters:

Name Type Description Default
key str

The key to look up.

required

Yields:

Type Description
float

Read latency delay.

Returns:

Type Description
Any | None

The value if found, None otherwise.

put

put(key: str, value: Any) -> Generator[float]

Store a value in the appropriate shard.

Parameters:

Name Type Description Default
key str

The key to store under.

required
value Any

The value to store.

required

Yields:

Type Description
Generator[float]

Write latency delay.

delete

delete(key: str) -> Generator[float, None, bool]

Delete a key from the appropriate shard.

Parameters:

Name Type Description Default
key str

The key to delete.

required

Yields:

Type Description
float

Delete latency delay.

Returns:

Type Description
bool

True if key existed.

get_shard_for_key

get_shard_for_key(key: str) -> int

Get the shard index for a key without accessing the shard.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Type Description
int

Shard index (0 to num_shards-1).

get_shard_sizes

get_shard_sizes() -> dict[int, int]

Get the size of each shard.

Returns:

Type Description
dict[int, int]

Dictionary mapping shard index to key count.

get_all_keys

get_all_keys() -> list[str]

Get all keys across all shards.

Returns:

Type Description
list[str]

List of all keys.

scatter_gather

scatter_gather(
    keys: list[str],
) -> Generator[float, None, dict[str, Any]]

Get multiple keys, optimizing for parallel shard access.

Groups keys by shard and fetches from each shard.

Parameters:

Name Type Description Default
keys list[str]

List of keys to fetch.

required

Yields:

Type Description
float

Latency delays.

Returns:

Type Description
dict[str, Any]

Dictionary mapping keys to values (missing keys omitted).

handle_event

handle_event(event: Event) -> None

ShardedStore can handle events for store operations.

ShardedStoreStats dataclass

ShardedStoreStats(
    reads: int = 0,
    writes: int = 0,
    deletes: int = 0,
    shard_reads: dict[int, int] = dict(),
    shard_writes: dict[int, int] = dict(),
)

Statistics tracked by ShardedStore.

get_shard_distribution

get_shard_distribution() -> dict[int, float]

Get read distribution across shards as percentages.

ShardingStrategy

Bases: Protocol

Protocol for sharding strategies.

get_shard abstractmethod

get_shard(key: str, num_shards: int) -> int

Determine which shard a key belongs to.

Parameters:

Name Type Description Default
key str

The key to shard.

required
num_shards int

Total number of shards.

required

Returns:

Type Description
int

Shard index (0 to num_shards-1).

CacheEntry dataclass

CacheEntry(value: Any, cached_at: Instant)

A cached value with its timestamp.

Attributes:

Name Type Description
value Any

The cached data.

cached_at Instant

Simulation time when the entry was cached.

is_fresh

is_fresh(now: Instant, soft_ttl: Duration) -> bool

Check if entry is within soft TTL (fresh zone).

Parameters:

Name Type Description Default
now Instant

Current simulation time.

required
soft_ttl Duration

The soft TTL duration.

required

Returns:

Type Description
bool

True if age < soft_ttl.

is_valid

is_valid(now: Instant, hard_ttl: Duration) -> bool

Check if entry is within hard TTL (still usable).

Parameters:

Name Type Description Default
now Instant

Current simulation time.

required
hard_ttl Duration

The hard TTL duration.

required

Returns:

Type Description
bool

True if age < hard_ttl.

SoftTTLCache

SoftTTLCache(
    name: str,
    backing_store: KVStore,
    soft_ttl: float | Duration,
    hard_ttl: float | Duration,
    cache_capacity: int | None = None,
    cache_read_latency: float = 0.0001,
)

Bases: Entity

Cache with stale-while-revalidate semantics.

Implements the soft TTL / hard TTL pattern where entries transition through three zones:

  1. Fresh (age < soft_ttl): Serve immediately from cache
  2. Stale (soft_ttl <= age < hard_ttl): Serve stale + background refresh
  3. Expired (age >= hard_ttl): Block until fresh data is fetched

Features: - Request coalescing: Multiple requests during refresh share one fetch - LRU eviction when at capacity - Statistics for monitoring cache effectiveness

Attributes:

Name Type Description
name

Entity name for identification.

stats SoftTTLCacheStats

SoftTTLCacheStats with operational metrics.

Initialize the soft TTL cache.

Parameters:

Name Type Description Default
name str

Name for this cache entity.

required
backing_store KVStore

The underlying storage to cache.

required
soft_ttl float | Duration

Time in seconds (or Duration) before entries become stale.

required
hard_ttl float | Duration

Time in seconds (or Duration) before entries expire.

required
cache_capacity int | None

Maximum entries (None = unlimited).

None
cache_read_latency float

Latency for cache reads in seconds.

0.0001

Raises:

Type Description
ValueError

If parameters are invalid (e.g., soft_ttl > hard_ttl).

stats property

stats: SoftTTLCacheStats

Frozen snapshot of soft TTL cache statistics.

backing_store property

backing_store: KVStore

The underlying backing store.

soft_ttl property

soft_ttl: Duration

Duration before entries become stale.

hard_ttl property

hard_ttl: Duration

Duration before entries expire completely.

cache_capacity property

cache_capacity: int | None

Maximum number of cached entries (None = unlimited).

cache_size property

cache_size: int

Current number of cached entries.

get

get(key: str) -> Generator[float, None, Any | None]

Get a value with soft TTL semantics.

Behavior depends on cache state: - Fresh hit: Return immediately from cache - Stale hit: Return from cache AND trigger background refresh - Hard miss: Block until fresh data is fetched

Parameters:

Name Type Description Default
key str

The key to look up.

required

Yields:

Type Description
float

Latency delays (cache read or backing store fetch).

Returns:

Type Description
Any | None

The value if found, None otherwise.

put

put(key: str, value: Any) -> Generator[float]

Store a value in cache and backing store.

Updates the cache timestamp and writes through to backing store.

Parameters:

Name Type Description Default
key str

The key to store under.

required
value Any

The value to store.

required

Yields:

Type Description
Generator[float]

Write latency.

invalidate

invalidate(key: str) -> None

Remove a key from cache only (not backing store).

Parameters:

Name Type Description Default
key str

The key to invalidate.

required

invalidate_all

invalidate_all() -> None

Clear the entire cache.

contains_cached

contains_cached(key: str) -> bool

Check if a key is in the cache (regardless of freshness).

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Type Description
bool

True if key is cached.

is_refreshing

is_refreshing(key: str) -> bool

Check if a background refresh is in progress for a key.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Type Description
bool

True if refresh is in progress.

get_cached_keys

get_cached_keys() -> list[str]

Get all cached keys.

Returns:

Type Description
list[str]

List of cached keys.

handle_event

handle_event(event: Event) -> Generator[float]

Handle internal cache events (background refresh).

Parameters:

Name Type Description Default
event Event

The event to process.

required

Yields:

Type Description
Generator[float]

Processing delays.

Returns:

Type Description
Generator[float]

None (background refresh doesn't produce follow-up events).

SoftTTLCacheStats dataclass

SoftTTLCacheStats(
    reads: int = 0,
    fresh_hits: int = 0,
    stale_hits: int = 0,
    hard_misses: int = 0,
    background_refreshes: int = 0,
    refresh_successes: int = 0,
    coalesced_requests: int = 0,
    evictions: int = 0,
)

Statistics tracked by SoftTTLCache.

Attributes:

Name Type Description
reads int

Total read operations.

fresh_hits int

Reads served from cache within soft TTL.

stale_hits int

Reads served from cache that triggered background refresh.

hard_misses int

Reads requiring blocking fetch (expired or uncached).

background_refreshes int

Background refresh operations started.

refresh_successes int

Background refreshes that completed successfully.

coalesced_requests int

Requests that joined an in-flight refresh.

evictions int

Entries removed due to capacity limits.

fresh_hit_rate property

fresh_hit_rate: float

Ratio of fresh hits to total reads.

stale_hit_rate property

stale_hit_rate: float

Ratio of stale hits to total reads.

total_hit_rate property

total_hit_rate: float

Ratio of all cache hits (fresh + stale) to total reads.

miss_rate property

miss_rate: float

Ratio of hard misses to total reads.

WriteAround

WriteAround()

Bases: WritePolicy

Write-around policy.

Writes go directly to backing store, bypassing cache. Good for write-heavy workloads where written data isn't immediately re-read.

On write, the key is invalidated from cache if present.

Initialize write-around policy.

should_write_through

should_write_through() -> bool

Write directly to backing store.

on_write

on_write(key: str, value: Any) -> None

Track key for cache invalidation.

should_flush

should_flush() -> bool

No buffered writes.

get_keys_to_flush

get_keys_to_flush() -> list[str]

No pending flushes.

on_flush

on_flush(keys: list[str]) -> None

No action needed.

get_keys_to_invalidate

get_keys_to_invalidate() -> list[str]

Get keys to remove from cache.

Returns:

Type Description
list[str]

Keys that should be invalidated from cache.

WriteBack

WriteBack(
    flush_interval: float = 1.0, max_dirty: int = 100
)

Bases: WritePolicy

Write-back (write-behind) policy.

Writes go to cache only, flushed to backing store later. Provides better performance but eventual consistency.

Flush is triggered by: - Number of dirty entries exceeding max_dirty - Time since last flush exceeding flush_interval

Attributes:

Name Type Description
flush_interval float

Maximum time between flushes in seconds.

max_dirty int

Maximum dirty entries before forced flush.

Initialize write-back policy.

Parameters:

Name Type Description Default
flush_interval float

Maximum seconds between flushes.

1.0
max_dirty int

Maximum dirty entries before flush.

100

Raises:

Type Description
ValueError

If parameters are invalid.

flush_interval property

flush_interval: float

Maximum time between flushes.

max_dirty property

max_dirty: int

Maximum dirty entries before flush.

dirty_count property

dirty_count: int

Number of dirty entries.

should_write_through

should_write_through() -> bool

Don't write through - buffer in cache.

on_write

on_write(key: str, value: Any) -> None

Track dirty key.

should_flush

should_flush() -> bool

Check if flush is needed based on dirty count.

get_keys_to_flush

get_keys_to_flush() -> list[str]

Get all dirty keys.

on_flush

on_flush(keys: list[str]) -> None

Remove flushed keys from dirty set.

WritePolicy

Bases: ABC

Abstract base class for write policies.

Write policies determine how writes to a cached store propagate to the backing store.

should_write_through abstractmethod

should_write_through() -> bool

Whether to write to backing store immediately.

Returns:

Type Description
bool

True if writes should go to backing store immediately.

on_write abstractmethod

on_write(key: str, value: Any) -> None

Called when a key is written.

Parameters:

Name Type Description Default
key str

The written key.

required
value Any

The written value.

required

should_flush abstractmethod

should_flush() -> bool

Whether pending writes should be flushed.

Returns:

Type Description
bool

True if flush should be triggered.

get_keys_to_flush abstractmethod

get_keys_to_flush() -> list[str]

Get keys that need to be flushed.

Returns:

Type Description
list[str]

List of keys to write to backing store.

on_flush abstractmethod

on_flush(keys: list[str]) -> None

Called after keys have been flushed.

Parameters:

Name Type Description Default
keys list[str]

Keys that were flushed.

required

WriteThrough dataclass

WriteThrough()

Bases: WritePolicy

Write-through policy.

Writes go to both cache and backing store synchronously. Provides strong consistency but higher latency.

should_write_through

should_write_through() -> bool

Always write through.

on_write

on_write(key: str, value: Any) -> None

No tracking needed for write-through.

should_flush

should_flush() -> bool

Never needs flush - writes are immediate.

get_keys_to_flush

get_keys_to_flush() -> list[str]

No pending flushes.

on_flush

on_flush(keys: list[str]) -> None

No action needed.