Skip to content

Server

Configurable server with concurrency, service time distributions, and queue policies.

Server components for request processing simulation.

This package provides server abstractions with configurable concurrency, service time distributions, and queue management.

AsyncServer

AsyncServer(
    name: str,
    max_connections: int = 10000,
    cpu_work_distribution: LatencyDistribution
    | None = None,
    io_handler: Callable[
        [Event],
        Generator[float, None, list[Event] | Event | None]
        | list[Event]
        | Event
        | None,
    ]
    | None = None,
)

Bases: Entity

Non-blocking server that multiplexes many connections on single thread.

Models event-loop style servers where: - Many connections can be active simultaneously (up to max_connections) - CPU-bound work blocks the event loop (serialized processing) - I/O-bound work (waiting for responses) is non-blocking

The server processes requests in two phases: 1. CPU phase: Serialized, one request at a time (blocks event loop) 2. I/O phase: Concurrent, requests wait for external responses

Attributes:

Name Type Description
name

Server identifier for logging.

max_connections int

Maximum simultaneous connections.

cpu_work_distribution int

Distribution for CPU-bound work time.

Initialize the async server.

Parameters:

Name Type Description Default
name str

Server identifier.

required
max_connections int

Maximum concurrent connections (default 10000).

10000
cpu_work_distribution LatencyDistribution | None

Distribution for CPU work time per request. Default is 0 (no CPU work, pure I/O).

None
io_handler Callable[[Event], Generator[float, None, list[Event] | Event | None] | list[Event] | Event | None] | None

Optional handler for I/O phase. If provided, called after CPU work completes. Can return events or yield delays for I/O waits.

None

max_connections property

max_connections: int

Maximum allowed concurrent connections.

active_connections property

active_connections: int

Number of currently active connections.

peak_connections property

peak_connections: int

Peak number of concurrent connections observed.

cpu_queue_depth property

cpu_queue_depth: int

Number of requests waiting for CPU time.

is_cpu_busy property

is_cpu_busy: bool

Whether CPU is currently processing a request.

stats property

stats: AsyncServerStats

Frozen snapshot of current statistics.

utilization property

utilization: float

Connection utilization as fraction of max_connections.

average_cpu_time property

average_cpu_time: float

Average observed CPU time per request.

has_capacity

has_capacity() -> bool

Check if server can accept another connection.

Returns:

Type Description
bool

True if under max_connections limit.

handle_event

handle_event(
    event: Event,
) -> (
    Generator[float, None, list[Event] | Event | None]
    | list[Event]
    | Event
    | None
)

Handle an incoming request.

Requests go through: 1. Connection acceptance (checked against max_connections) 2. CPU work phase (serialized via queue) 3. Optional I/O phase (concurrent)

Parameters:

Name Type Description Default
event Event

The incoming request event.

required

Returns:

Type Description
Generator[float, None, list[Event] | Event | None] | list[Event] | Event | None

Events or generator for async processing.

get_cpu_time_percentile

get_cpu_time_percentile(percentile: float) -> float

Calculate a percentile of observed CPU times.

Parameters:

Name Type Description Default
percentile float

Percentile value between 0 and 1.

required

Returns:

Type Description
float

CPU time at the given percentile, or 0 if no data.

AsyncServerStats dataclass

AsyncServerStats(
    requests_completed: int = 0,
    requests_rejected: int = 0,
    total_cpu_time: float = 0.0,
    total_io_time: float = 0.0,
)

Statistics tracked by AsyncServer.

ConcurrencyModel

Bases: Protocol

Protocol for concurrency control strategies.

Implementations manage a pool of processing slots that requests must acquire before processing and release when complete.

available property

available: int

Number of available slots/capacity units.

active property

active: int

Number of currently used slots/capacity units.

limit property

limit: int

Total capacity limit.

acquire

acquire(weight: int = 1) -> bool

Attempt to acquire processing capacity.

Parameters:

Name Type Description Default
weight int

Amount of capacity to acquire (default 1).

1

Returns:

Type Description
bool

True if capacity was acquired, False if unavailable.

release

release(weight: int = 1) -> None

Release previously acquired processing capacity.

Parameters:

Name Type Description Default
weight int

Amount of capacity to release (default 1).

1

has_capacity

has_capacity(weight: int = 1) -> bool

Check if capacity is available without acquiring.

Parameters:

Name Type Description Default
weight int

Amount of capacity needed (default 1).

1

Returns:

Type Description
bool

True if the requested capacity is available.

DynamicConcurrency

DynamicConcurrency(
    initial: int,
    min_limit: int = 1,
    max_limit: int | None = None,
)

Adjustable concurrency limit for autoscaling simulation.

Allows the concurrency limit to be changed at runtime within configured bounds. Useful for simulating autoscaling behavior where capacity adjusts based on load.

Attributes:

Name Type Description
current_limit int

Current concurrency limit.

min_limit int

Minimum allowed limit.

max_limit int | None

Maximum allowed limit.

Initialize with adjustable concurrency bounds.

Parameters:

Name Type Description Default
initial int

Starting concurrency limit.

required
min_limit int

Minimum concurrency (default 1).

1
max_limit int | None

Maximum concurrency (default None = unlimited).

None

Raises:

Type Description
ValueError

If initial < min_limit or initial > max_limit.

current_limit property

current_limit: int

Current concurrency limit.

min_limit property

min_limit: int

Minimum allowed limit.

max_limit property

max_limit: int | None

Maximum allowed limit (None = unlimited).

available property

available: int

Number of available slots.

active property

active: int

Number of currently used slots.

limit property

limit: int

Current total slot limit.

set_limit

set_limit(new_limit: int) -> None

Adjust the concurrency limit.

The new limit is clamped to [min_limit, max_limit]. If active requests exceed the new limit, they continue processing but no new requests are admitted.

Parameters:

Name Type Description Default
new_limit int

Desired new concurrency limit.

required

scale_up

scale_up(amount: int = 1) -> None

Increase the concurrency limit.

Parameters:

Name Type Description Default
amount int

How much to increase (default 1).

1

scale_down

scale_down(amount: int = 1) -> None

Decrease the concurrency limit.

Parameters:

Name Type Description Default
amount int

How much to decrease (default 1).

1

acquire

acquire(weight: int = 1) -> bool

Acquire a processing slot.

Parameters:

Name Type Description Default
weight int

Ignored for DynamicConcurrency (always 1).

1

Returns:

Type Description
bool

True if a slot was acquired, False if at capacity.

release

release(weight: int = 1) -> None

Release a processing slot.

Parameters:

Name Type Description Default
weight int

Ignored for DynamicConcurrency (always 1).

1

has_capacity

has_capacity(weight: int = 1) -> bool

Check if a slot is available.

Parameters:

Name Type Description Default
weight int

Ignored for DynamicConcurrency.

1

Returns:

Type Description
bool

True if active < current_limit.

FixedConcurrency

FixedConcurrency(max_concurrent: int)

Fixed number of concurrent slots.

The simplest concurrency model with a static limit that cannot change during simulation. Each request consumes exactly one slot.

Attributes:

Name Type Description
max_concurrent

Maximum number of simultaneous requests.

Initialize with a fixed concurrency limit.

Parameters:

Name Type Description Default
max_concurrent int

Maximum simultaneous requests.

required

Raises:

Type Description
ValueError

If max_concurrent < 1.

available property

available: int

Number of available slots.

active property

active: int

Number of currently used slots.

limit property

limit: int

Total slot limit.

acquire

acquire(weight: int = 1) -> bool

Acquire a processing slot.

Parameters:

Name Type Description Default
weight int

Ignored for FixedConcurrency (always 1).

1

Returns:

Type Description
bool

True if a slot was acquired, False if at capacity.

release

release(weight: int = 1) -> None

Release a processing slot.

Parameters:

Name Type Description Default
weight int

Ignored for FixedConcurrency (always 1).

1

has_capacity

has_capacity(weight: int = 1) -> bool

Check if a slot is available.

Parameters:

Name Type Description Default
weight int

Ignored for FixedConcurrency.

1

Returns:

Type Description
bool

True if active < max_concurrent.

WeightedConcurrency

WeightedConcurrency(total_capacity: int)

Requests consume variable weight from a capacity pool.

Instead of fixed slots, this model provides a capacity pool where different requests can consume different amounts. Useful for modeling resources like memory, CPU cores, or database connections where operations have varying resource requirements.

Example

pool = WeightedConcurrency(total_capacity=100) pool.acquire(weight=10) # Heavy query pool.acquire(weight=1) # Light query pool.available # Returns 89

Attributes:

Name Type Description
total_capacity int

Total capacity units available.

Initialize with a total capacity pool.

Parameters:

Name Type Description Default
total_capacity int

Total capacity units available.

required

Raises:

Type Description
ValueError

If total_capacity < 1.

total_capacity property

total_capacity: int

Total capacity units available.

available property

available: int

Available capacity units.

active property

active: int

Currently used capacity units.

limit property

limit: int

Total capacity limit.

utilization property

utilization: float

Current utilization as fraction of total capacity.

acquire

acquire(weight: int = 1) -> bool

Acquire capacity from the pool.

Parameters:

Name Type Description Default
weight int

Amount of capacity to acquire.

1

Returns:

Type Description
bool

True if capacity was acquired, False if insufficient.

release

release(weight: int = 1) -> None

Release capacity back to the pool.

Parameters:

Name Type Description Default
weight int

Amount of capacity to release.

1

has_capacity

has_capacity(weight: int = 1) -> bool

Check if capacity is available.

Parameters:

Name Type Description Default
weight int

Amount of capacity needed.

1

Returns:

Type Description
bool

True if the requested capacity is available.

Server

Server(
    name: str,
    concurrency: int | ConcurrencyModel = 1,
    service_time: LatencyDistribution | None = None,
    queue_policy: QueuePolicy | None = None,
    queue_capacity: int | None = None,
    downstream: Entity | None = None,
)

Bases: QueuedResource

Server with configurable concurrency and service time distribution.

Processes requests with simulated service time drawn from a distribution. Supports multiple concurrent requests up to the configured limit. Excess requests are queued according to the configured policy.

Concurrency can be specified as: - An integer (wrapped in FixedConcurrency) - A ConcurrencyModel instance for advanced control (Dynamic, Weighted, etc.)

The server uses completion hooks internally to signal when a request slot becomes available, allowing the queue driver to dispatch the next waiting request.

Attributes:

Name Type Description
name

Identifier for logging and debugging.

concurrency_model ConcurrencyModel

The concurrency control strategy.

service_time LatencyDistribution

Distribution for request processing time.

Initialize the server.

Parameters:

Name Type Description Default
name str

Server identifier.

required
concurrency int | ConcurrencyModel

Maximum concurrent requests as int, or a ConcurrencyModel instance for advanced control (default 1).

1
service_time LatencyDistribution | None

Service time distribution (default 10ms constant).

None
queue_policy QueuePolicy | None

Queue ordering policy (default FIFO).

None
queue_capacity int | None

Maximum queue size (default unlimited).

None
downstream Entity | None

Optional entity to forward completed requests to. When set, completed events are automatically sent to this entity with the original context preserved.

None

downstream property writable

downstream: Entity | None

Optional downstream entity for event forwarding.

concurrency_model property

concurrency_model: ConcurrencyModel

The concurrency control model.

concurrency property

concurrency: int

Maximum concurrent requests (current limit).

service_time property

service_time: LatencyDistribution

Service time distribution.

active_requests property

active_requests: int

Number of requests currently being processed.

available_capacity property

available_capacity: int

Number of available processing slots.

utilization property

utilization: float

Current utilization as fraction of concurrency.

Returns:

Type Description
float

Value between 0.0 and 1.0 representing active/concurrency.

average_service_time property

average_service_time: float

Average observed service time in seconds.

stats property

stats: ServerStats

Frozen snapshot of current statistics.

has_capacity

has_capacity(weight: int = 1) -> bool

Check if server can accept another request.

Parameters:

Name Type Description Default
weight int

Capacity weight for weighted concurrency models (default 1).

1

Returns:

Type Description
bool

True if sufficient capacity is available.

handle_queued_event

handle_queued_event(
    event: Event,
) -> Generator[float, None, list[Event] | Event | None]

Process a request from the queue.

Simulates request processing by yielding the service time. Updates statistics on completion.

Parameters:

Name Type Description Default
event Event

The request event to process.

required

Yields:

Type Description
float

Service time delay.

Returns:

Type Description
list[Event] | Event | None

Any completion events (typically None).

get_service_time_percentile

get_service_time_percentile(percentile: float) -> float

Calculate a percentile of observed service times.

Parameters:

Name Type Description Default
percentile float

Percentile value between 0 and 1 (e.g., 0.99 for p99).

required

Returns:

Type Description
float

Service time at the given percentile, or 0 if no data.

ServerStats dataclass

ServerStats(
    requests_completed: int = 0,
    requests_rejected: int = 0,
    total_service_time: float = 0.0,
)

Statistics tracked by Server.

ThreadPool

ThreadPool(
    name: str,
    num_workers: int,
    queue_policy: QueuePolicy | None = None,
    queue_capacity: int | None = None,
    processing_time_extractor: Callable[[Event], float]
    | None = None,
    default_processing_time: float = 0.01,
)

Bases: QueuedResource

Simulates a pool of worker threads processing tasks.

Tasks are events with processing time specified in their context metadata. Workers pick tasks from the queue when idle and process them for the specified duration.

Processing time can be specified in task context as: - task.context["metadata"]["processing_time"] = 0.05 # 50ms - Or via a custom extractor function

Attributes:

Name Type Description
name

Identifier for logging and debugging.

num_workers int

Number of worker threads in the pool.

Initialize the thread pool.

Parameters:

Name Type Description Default
name str

Pool identifier.

required
num_workers int

Number of worker threads (must be >= 1).

required
queue_policy QueuePolicy | None

Queue ordering policy (default FIFO).

None
queue_capacity int | None

Maximum queue size (default unlimited).

None
processing_time_extractor Callable[[Event], float] | None

Optional function to extract processing time from a task event. If None, looks for task.context["metadata"]["processing_time"].

None
default_processing_time float

Default processing time in seconds when not specified in task context (default 10ms).

0.01

Raises:

Type Description
ValueError

If num_workers < 1.

num_workers property

num_workers: int

Total number of worker threads.

active_workers property

active_workers: int

Number of workers currently processing tasks.

idle_workers property

idle_workers: int

Number of workers available to process tasks.

queued_tasks property

queued_tasks: int

Number of tasks waiting in the queue.

worker_utilization property

worker_utilization: float

Current worker utilization as fraction of total workers.

Returns:

Type Description
float

Value between 0.0 and 1.0 representing active/total workers.

average_processing_time property

average_processing_time: float

Average observed processing time in seconds.

stats property

stats: ThreadPoolStats

Frozen snapshot of current statistics.

has_capacity

has_capacity() -> bool

Check if the pool has an idle worker available.

Returns:

Type Description
bool

True if at least one worker is idle.

submit

submit(task: Event) -> Event | None

Submit a task to the pool for processing.

This is a convenience method that creates a properly formatted task event and returns it for scheduling.

Parameters:

Name Type Description Default
task Event

The task event to process. Processing time should be in task.context["metadata"]["processing_time"] or will use the default.

required

Returns:

Type Description
Event | None

The task event targeted at this pool, ready for scheduling.

handle_queued_event

handle_queued_event(
    event: Event,
) -> Generator[float, None, list[Event] | Event | None]

Process a task from the queue.

Acquires a worker, processes for the task's specified time, then releases the worker.

Parameters:

Name Type Description Default
event Event

The task event to process.

required

Yields:

Type Description
float

Processing time delay.

Returns:

Type Description
list[Event] | Event | None

Any completion events (typically None).

get_processing_time_percentile

get_processing_time_percentile(percentile: float) -> float

Calculate a percentile of observed processing times.

Parameters:

Name Type Description Default
percentile float

Percentile value between 0 and 1 (e.g., 0.99 for p99).

required

Returns:

Type Description
float

Processing time at the given percentile, or 0 if no data.

ThreadPoolStats dataclass

ThreadPoolStats(
    tasks_completed: int = 0,
    tasks_rejected: int = 0,
    total_processing_time: float = 0.0,
)

Statistics tracked by ThreadPool.