Skip to content

Rate Limiter

Rate limiting policies and the RateLimitedEntity wrapper with built-in queuing.

Rate limiter components for controlling request throughput.

This module provides:

Policies (pure algorithms, not Entities): - TokenBucketPolicy: Classic token bucket (allows bursting) - LeakyBucketPolicy: Strict output rate (no bursting) - SlidingWindowPolicy: Sliding window log algorithm - FixedWindowPolicy: Fixed time window counter - AdaptivePolicy: AIMD-based self-tuning rate limiter

Entity (simulation actor): - RateLimitedEntity: Generic Entity that wraps any policy with a FIFO queue

Distributed (unchanged, uses generator yields for I/O): - DistributedRateLimiter: Coordinated limiting across multiple instances

Example

from happysimulator.components.rate_limiter import ( RateLimitedEntity, TokenBucketPolicy, FixedWindowPolicy, )

Token bucket with queuing

limiter = RateLimitedEntity( name="api_limit", downstream=server, policy=TokenBucketPolicy(capacity=10.0, refill_rate=5.0), )

Fixed window with queuing

limiter = RateLimitedEntity( name="api_limit", downstream=server, policy=FixedWindowPolicy(requests_per_window=100, window_size=1.0), )

DistributedRateLimiter

DistributedRateLimiter(
    name: str,
    downstream: Entity,
    backing_store: Entity,
    global_limit: int,
    window_size: float = 1.0,
    key_prefix: str = "ratelimit",
    local_threshold: float = 0.8,
)

Bases: Entity

A distributed rate limiter using a shared backing store.

This rate limiter coordinates across multiple instances by storing the request count in a shared backing store. It uses a fixed window approach with the window ID derived from the current time.

To reduce load on the backing store, instances maintain a local counter and only sync with the store periodically or when the local estimate approaches the limit.

Algorithm: 1. Calculate current window ID from time 2. Read current global count from store 3. If count < limit, increment and write back 4. Forward or reject based on result

Note: In a real system, this would use atomic operations (INCR in Redis). The simulation approximates this with read-modify-write.

Attributes:

Name Type Description
global_limit int

Maximum requests across all instances per window.

window_size float

Size of each window in seconds.

downstream Entity

The entity to forward accepted requests to.

backing_store Entity

Shared storage for coordination.

stats DistributedRateLimiterStats

Request and store operation counts.

Initialize the distributed rate limiter.

Parameters:

Name Type Description Default
name str

Entity name for identification.

required
downstream Entity

Entity to forward accepted requests to.

required
backing_store Entity

Shared KVStore for coordination.

required
global_limit int

Maximum requests across all instances per window.

required
window_size float

Size of each window in seconds.

1.0
key_prefix str

Prefix for keys in the backing store.

'ratelimit'
local_threshold float

Fraction of limit before syncing with store.

0.8

Raises:

Type Description
ValueError

If parameters are invalid.

downstream property

downstream: Entity

The entity receiving forwarded requests.

backing_store property

backing_store: Entity

The shared backing store.

global_limit property

global_limit: int

Maximum requests across all instances per window.

window_size property

window_size: float

Size of each window in seconds.

local_count property

local_count: int

Local request count for current window.

stats property

stats: DistributedRateLimiterStats

Frozen snapshot of distributed rate limiter statistics.

set_clock

set_clock(clock: Clock) -> None

Inject clock and propagate to downstream and backing store.

check_and_increment

check_and_increment(
    now: Instant,
) -> Generator[float, None, bool]

Check rate limit and increment counter if allowed.

This is a generator that yields while accessing the backing store.

Parameters:

Name Type Description Default
now Instant

Current time.

required

Yields:

Type Description
float

Delays for store operations.

Returns:

Type Description
bool

True if request is allowed, False if rate limited.

handle_event

handle_event(
    event: Event,
) -> Generator[float, None, list[Event]]

Handle an incoming request event.

This is a generator that yields while coordinating with the store.

Parameters:

Name Type Description Default
event Event

The incoming request event.

required

Yields:

Type Description
float

Delays for store operations.

Returns:

Type Description
list[Event]

List containing a forwarded event to downstream, or empty if dropped.

DistributedRateLimiterStats dataclass

DistributedRateLimiterStats(
    requests_received: int = 0,
    requests_forwarded: int = 0,
    requests_dropped: int = 0,
    store_reads: int = 0,
    store_writes: int = 0,
    local_rejections: int = 0,
    global_rejections: int = 0,
)

Frozen snapshot of DistributedRateLimiter statistics.

Inductor

Inductor(
    name: str,
    downstream: Entity,
    time_constant: float,
    queue_capacity: int = 10000,
)

Bases: Entity

Entity that smooths bursty traffic using EWMA rate estimation.

The Inductor has no throughput cap. It resists rapid rate changes, letting sustained rates pass through while dampening spikes.

Parameters:

Name Type Description Default
name str

Entity name for identification.

required
downstream Entity

Entity to forward accepted events to.

required
time_constant float

τ in seconds — higher means more smoothing.

required
queue_capacity int

Maximum events buffered before dropping.

10000

estimated_rate property

estimated_rate: float

Current estimated event rate (events/s) from the EWMA.

stats property

stats: InductorStats

Frozen snapshot of inductor statistics.

set_clock

set_clock(clock: Clock) -> None

Inject clock and propagate to downstream.

handle_event

handle_event(event: Event) -> list[Event]

Dispatch arrivals vs internal poll events.

InductorStats dataclass

InductorStats(
    received: int = 0,
    forwarded: int = 0,
    queued: int = 0,
    dropped: int = 0,
)

Frozen snapshot of Inductor statistics.

NullRateLimiter

NullRateLimiter(name: str, downstream: Entity)

Bases: Entity

Pass-through rate limiter that forwards all events immediately.

This is a no-op rate limiter useful for: - Providing a default when rate limiting is optional - Baseline comparisons in experiments - Simplifying pipeline construction

All events are forwarded to the downstream entity without delay.

Initialize the pass-through rate limiter.

Parameters:

Name Type Description Default
name str

Name of this entity.

required
downstream Entity

Entity to forward events to.

required

downstream property

downstream: Entity

The downstream entity receiving forwarded events.

set_clock

set_clock(clock: Clock) -> None

Inject clock and propagate to downstream.

handle_event

handle_event(event: Event) -> list[Event]

Forward the event to downstream immediately.

AdaptivePolicy

AdaptivePolicy(
    initial_rate: float = 100.0,
    min_rate: float = 1.0,
    max_rate: float = 10000.0,
    increase_step: float | None = None,
    decrease_factor: float = 0.5,
    window_size: float = 1.0,
)

Adaptive rate limiter policy using AIMD algorithm.

Uses a token bucket internally with an adaptive refill rate. On success the rate increases additively; on failure it decreases multiplicatively. Exposes record_success / record_failure for feedback.

Parameters:

Name Type Description Default
initial_rate float

Starting rate (requests per second).

100.0
min_rate float

Minimum allowed rate.

1.0
max_rate float

Maximum allowed rate.

10000.0
increase_step float | None

Additive increase on success (default: initial_rate * 0.1).

None
decrease_factor float

Multiplicative factor on failure (0 < f < 1).

0.5
window_size float

Token bucket window size in seconds.

1.0

Raises:

Type Description
ValueError

If parameters are invalid.

record_success

record_success(now: Instant) -> None

Record a successful request, potentially increasing rate.

record_failure

record_failure(
    now: Instant,
    reason: RateAdjustmentReason = RateAdjustmentReason.FAILURE,
) -> None

Record a failed request, decreasing rate.

FixedWindowPolicy

FixedWindowPolicy(
    requests_per_window: int, window_size: float = 1.0
)

Fixed window rate limiter policy.

Divides time into discrete windows and limits requests per window. Simple and O(1) space, but susceptible to boundary bursts.

Parameters:

Name Type Description Default
requests_per_window int

Maximum requests allowed per window.

required
window_size float

Size of each window in seconds.

1.0

Raises:

Type Description
ValueError

If parameters are invalid.

LeakyBucketPolicy

LeakyBucketPolicy(leak_rate: float = 1.0)

Leaky bucket rate limiter policy.

Enforces a strict fixed output rate with no bursting. Tracks whether sufficient time has elapsed since the last leak to allow a new request.

Parameters:

Name Type Description Default
leak_rate float

Requests per second leak rate.

1.0

RateAdjustmentReason

Bases: Enum

Reason for adaptive rate adjustment.

RateLimiterPolicy

Bases: Protocol

Protocol for rate limiter algorithms.

Implementations decide whether a request should be allowed at a given point in time and can report how long until capacity is next available.

try_acquire

try_acquire(now: Instant) -> bool

Attempt to acquire one unit of capacity.

Parameters:

Name Type Description Default
now Instant

Current simulation time.

required

Returns:

Type Description
bool

True if the request is granted, False if denied.

time_until_available

time_until_available(now: Instant) -> Duration

Duration until next capacity is available.

Invariant: if this returns Duration.ZERO, a subsequent call to try_acquire(now) MUST succeed. Implementations must return at least Duration(1) (1 ns) when the computed wait truncates to zero due to floating-point precision but capacity is not yet available.

Parameters:

Name Type Description Default
now Instant

Current simulation time.

required

Returns:

Type Description
Duration

Duration.ZERO if capacity is available now, otherwise the

Duration

duration until the next unit of capacity becomes available.

RateSnapshot dataclass

RateSnapshot(
    time: Instant,
    rate: float,
    reason: RateAdjustmentReason | None = None,
)

A snapshot of the adaptive rate at a point in time.

SlidingWindowPolicy

SlidingWindowPolicy(
    window_size_seconds: float = 1.0, max_requests: int = 10
)

Sliding window log rate limiter policy.

Maintains a log of request timestamps and limits the number of requests within a rolling time window. Avoids the boundary burst problem of fixed windows.

Parameters:

Name Type Description Default
window_size_seconds float

Size of the sliding window in seconds.

1.0
max_requests int

Maximum requests allowed within the window.

10

TokenBucketPolicy

TokenBucketPolicy(
    capacity: float = 10.0,
    refill_rate: float = 1.0,
    initial_tokens: float | None = None,
)

Token bucket rate limiter policy.

Tokens refill at a constant rate up to a maximum capacity. Each request consumes one token. Allows controlled bursting up to the bucket capacity.

Parameters:

Name Type Description Default
capacity float

Maximum tokens the bucket can hold.

10.0
refill_rate float

Tokens added per second.

1.0
initial_tokens float | None

Starting token count (defaults to capacity).

None

RateLimitedEntity

RateLimitedEntity(
    name: str,
    downstream: Entity,
    policy: RateLimiterPolicy,
    queue_capacity: int = 1000,
)

Bases: Entity

Entity that rate-limits incoming events using a pluggable policy.

Incoming events are immediately forwarded if the policy allows. Otherwise they are enqueued in a bounded FIFO buffer and drained via self-scheduling poll events that fire at the exact time the policy reports capacity will be available.

Parameters:

Name Type Description Default
name str

Entity name for identification.

required
downstream Entity

Entity to forward accepted events to.

required
policy RateLimiterPolicy

The rate limiter algorithm to use.

required
queue_capacity int

Maximum events that can be buffered (drops on overflow).

1000

stats property

stats: RateLimitedEntityStats

Frozen snapshot of rate limited entity statistics.

set_clock

set_clock(clock: Clock) -> None

Inject clock and propagate to downstream.

handle_event

handle_event(event: Event) -> list[Event]

Handle an incoming event or internal poll event.

RateLimitedEntityStats dataclass

RateLimitedEntityStats(
    received: int = 0,
    forwarded: int = 0,
    queued: int = 0,
    dropped: int = 0,
)

Frozen snapshot of RateLimitedEntity statistics.