Skip to content

Queuing & Resources

Queue and QueuedResource

QueuedResource combines a queue with processing logic. Events are buffered when the entity is busy and dequeued when capacity is available.

from happysimulator import QueuedResource, FIFOQueue, Event

class MyServer(QueuedResource):
    def __init__(self, name, downstream, concurrency=1):
        super().__init__(name, policy=FIFOQueue())
        self.downstream = downstream
        self.concurrency = concurrency
        self._in_flight = 0

    def has_capacity(self) -> bool:
        return self._in_flight < self.concurrency

    def handle_queued_event(self, event):
        self._in_flight += 1
        try:
            yield 0.1  # processing time
        finally:
            self._in_flight -= 1
        return [Event(time=self.now, event_type="Done", target=self.downstream)]

Queue Policies

Policy Behavior
FIFOQueue() First in, first out
LIFOQueue() Last in, first out (stack)
PriorityQueue(key=...) Ordered by priority key

Resource (Shared Capacity)

Resource models contended capacity (CPU cores, connections, permits) with acquire/release semantics.

from happysimulator import Resource

cpu = Resource("cpu_cores", capacity=8)

# In an entity's handle_event():
grant = yield cpu.acquire(amount=2)   # blocks via SimFuture if unavailable
yield 0.1                              # do work
grant.release()                        # idempotent, wakes FIFO waiters

# Non-blocking alternative
grant = cpu.try_acquire(amount=2)      # Returns Grant | None

Properties: cpu.available, cpu.utilization, cpu.waiters, cpu.stats

Note

Register Resource in Simulation(entities=[cpu, ...]) so it receives clock injection.

Rate Limiter

Rate limiting uses a Policy (pure algorithm) wrapped in a RateLimitedEntity (Entity with queue).

Policies

Policy Algorithm
TokenBucketPolicy(rate, capacity) Refill tokens at fixed rate
LeakyBucketPolicy(rate) Process at constant rate
SlidingWindowPolicy(rate, window_s) Count in sliding window
FixedWindowPolicy(rate, window_s) Count in fixed windows
AdaptivePolicy(initial_rate, ...) Adjusts based on feedback

Usage

from happysimulator import RateLimitedEntity, TokenBucketPolicy

limiter = RateLimitedEntity(
    name="RateLimiter",
    policy=TokenBucketPolicy(rate=100, capacity=10),
    downstream=server,
)

Inductor (Burst Suppression)

EWMA-based smoothing that resists rate changes without capping throughput.

from happysimulator import Inductor

inductor = Inductor(
    name="Smoother",
    downstream=server,
    time_constant=1.0,       # τ in seconds — higher = more smoothing
    queue_capacity=10000,
)

Properties: inductor.stats, inductor.estimated_rate, inductor.queue_depth

Next Steps