Server¶
Configurable server with concurrency, service time distributions, and queue policies.
Server components for request processing simulation.
This package provides server abstractions with configurable concurrency, service time distributions, and queue management.
AsyncServer ¶
AsyncServer(
name: str,
max_connections: int = 10000,
cpu_work_distribution: LatencyDistribution
| None = None,
io_handler: Callable[
[Event],
Generator[float, None, list[Event] | Event | None]
| list[Event]
| Event
| None,
]
| None = None,
)
Bases: Entity
Non-blocking server that multiplexes many connections on single thread.
Models event-loop style servers where: - Many connections can be active simultaneously (up to max_connections) - CPU-bound work blocks the event loop (serialized processing) - I/O-bound work (waiting for responses) is non-blocking
The server processes requests in two phases: 1. CPU phase: Serialized, one request at a time (blocks event loop) 2. I/O phase: Concurrent, requests wait for external responses
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Server identifier for logging. |
|
max_connections |
int
|
Maximum simultaneous connections. |
cpu_work_distribution |
int
|
Distribution for CPU-bound work time. |
Initialize the async server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Server identifier. |
required |
max_connections
|
int
|
Maximum concurrent connections (default 10000). |
10000
|
cpu_work_distribution
|
LatencyDistribution | None
|
Distribution for CPU work time per request. Default is 0 (no CPU work, pure I/O). |
None
|
io_handler
|
Callable[[Event], Generator[float, None, list[Event] | Event | None] | list[Event] | Event | None] | None
|
Optional handler for I/O phase. If provided, called after CPU work completes. Can return events or yield delays for I/O waits. |
None
|
has_capacity ¶
Check if server can accept another connection.
Returns:
| Type | Description |
|---|---|
bool
|
True if under max_connections limit. |
handle_event ¶
handle_event(
event: Event,
) -> (
Generator[float, None, list[Event] | Event | None]
| list[Event]
| Event
| None
)
Handle an incoming request.
Requests go through: 1. Connection acceptance (checked against max_connections) 2. CPU work phase (serialized via queue) 3. Optional I/O phase (concurrent)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
Event
|
The incoming request event. |
required |
Returns:
| Type | Description |
|---|---|
Generator[float, None, list[Event] | Event | None] | list[Event] | Event | None
|
Events or generator for async processing. |
get_cpu_time_percentile ¶
Calculate a percentile of observed CPU times.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
percentile
|
float
|
Percentile value between 0 and 1. |
required |
Returns:
| Type | Description |
|---|---|
float
|
CPU time at the given percentile, or 0 if no data. |
AsyncServerStats
dataclass
¶
AsyncServerStats(
requests_completed: int = 0,
requests_rejected: int = 0,
total_cpu_time: float = 0.0,
total_io_time: float = 0.0,
)
Statistics tracked by AsyncServer.
ConcurrencyModel ¶
Bases: Protocol
Protocol for concurrency control strategies.
Implementations manage a pool of processing slots that requests must acquire before processing and release when complete.
acquire ¶
Attempt to acquire processing capacity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Amount of capacity to acquire (default 1). |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if capacity was acquired, False if unavailable. |
release ¶
Release previously acquired processing capacity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Amount of capacity to release (default 1). |
1
|
has_capacity ¶
Check if capacity is available without acquiring.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Amount of capacity needed (default 1). |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the requested capacity is available. |
DynamicConcurrency ¶
Adjustable concurrency limit for autoscaling simulation.
Allows the concurrency limit to be changed at runtime within configured bounds. Useful for simulating autoscaling behavior where capacity adjusts based on load.
Attributes:
| Name | Type | Description |
|---|---|---|
current_limit |
int
|
Current concurrency limit. |
min_limit |
int
|
Minimum allowed limit. |
max_limit |
int | None
|
Maximum allowed limit. |
Initialize with adjustable concurrency bounds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial
|
int
|
Starting concurrency limit. |
required |
min_limit
|
int
|
Minimum concurrency (default 1). |
1
|
max_limit
|
int | None
|
Maximum concurrency (default None = unlimited). |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If initial < min_limit or initial > max_limit. |
set_limit ¶
Adjust the concurrency limit.
The new limit is clamped to [min_limit, max_limit]. If active requests exceed the new limit, they continue processing but no new requests are admitted.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
new_limit
|
int
|
Desired new concurrency limit. |
required |
scale_up ¶
Increase the concurrency limit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
amount
|
int
|
How much to increase (default 1). |
1
|
scale_down ¶
Decrease the concurrency limit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
amount
|
int
|
How much to decrease (default 1). |
1
|
acquire ¶
Acquire a processing slot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Ignored for DynamicConcurrency (always 1). |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if a slot was acquired, False if at capacity. |
release ¶
Release a processing slot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Ignored for DynamicConcurrency (always 1). |
1
|
has_capacity ¶
Check if a slot is available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Ignored for DynamicConcurrency. |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if active < current_limit. |
FixedConcurrency ¶
Fixed number of concurrent slots.
The simplest concurrency model with a static limit that cannot change during simulation. Each request consumes exactly one slot.
Attributes:
| Name | Type | Description |
|---|---|---|
max_concurrent |
Maximum number of simultaneous requests. |
Initialize with a fixed concurrency limit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_concurrent
|
int
|
Maximum simultaneous requests. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If max_concurrent < 1. |
acquire ¶
Acquire a processing slot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Ignored for FixedConcurrency (always 1). |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if a slot was acquired, False if at capacity. |
release ¶
Release a processing slot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Ignored for FixedConcurrency (always 1). |
1
|
has_capacity ¶
Check if a slot is available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Ignored for FixedConcurrency. |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if active < max_concurrent. |
WeightedConcurrency ¶
Requests consume variable weight from a capacity pool.
Instead of fixed slots, this model provides a capacity pool where different requests can consume different amounts. Useful for modeling resources like memory, CPU cores, or database connections where operations have varying resource requirements.
Example
pool = WeightedConcurrency(total_capacity=100) pool.acquire(weight=10) # Heavy query pool.acquire(weight=1) # Light query pool.available # Returns 89
Attributes:
| Name | Type | Description |
|---|---|---|
total_capacity |
int
|
Total capacity units available. |
Initialize with a total capacity pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
total_capacity
|
int
|
Total capacity units available. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If total_capacity < 1. |
acquire ¶
Acquire capacity from the pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Amount of capacity to acquire. |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if capacity was acquired, False if insufficient. |
release ¶
Release capacity back to the pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Amount of capacity to release. |
1
|
has_capacity ¶
Check if capacity is available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Amount of capacity needed. |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the requested capacity is available. |
Server ¶
Server(
name: str,
concurrency: int | ConcurrencyModel = 1,
service_time: LatencyDistribution | None = None,
queue_policy: QueuePolicy | None = None,
queue_capacity: int | None = None,
downstream: Entity | None = None,
)
Bases: QueuedResource
Server with configurable concurrency and service time distribution.
Processes requests with simulated service time drawn from a distribution. Supports multiple concurrent requests up to the configured limit. Excess requests are queued according to the configured policy.
Concurrency can be specified as: - An integer (wrapped in FixedConcurrency) - A ConcurrencyModel instance for advanced control (Dynamic, Weighted, etc.)
The server uses completion hooks internally to signal when a request slot becomes available, allowing the queue driver to dispatch the next waiting request.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Identifier for logging and debugging. |
|
concurrency_model |
ConcurrencyModel
|
The concurrency control strategy. |
service_time |
LatencyDistribution
|
Distribution for request processing time. |
Initialize the server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Server identifier. |
required |
concurrency
|
int | ConcurrencyModel
|
Maximum concurrent requests as int, or a ConcurrencyModel instance for advanced control (default 1). |
1
|
service_time
|
LatencyDistribution | None
|
Service time distribution (default 10ms constant). |
None
|
queue_policy
|
QueuePolicy | None
|
Queue ordering policy (default FIFO). |
None
|
queue_capacity
|
int | None
|
Maximum queue size (default unlimited). |
None
|
downstream
|
Entity | None
|
Optional entity to forward completed requests to. When set, completed events are automatically sent to this entity with the original context preserved. |
None
|
downstream
property
writable
¶
Optional downstream entity for event forwarding.
utilization
property
¶
Current utilization as fraction of concurrency.
Returns:
| Type | Description |
|---|---|
float
|
Value between 0.0 and 1.0 representing active/concurrency. |
average_service_time
property
¶
Average observed service time in seconds.
has_capacity ¶
Check if server can accept another request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
weight
|
int
|
Capacity weight for weighted concurrency models (default 1). |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if sufficient capacity is available. |
handle_queued_event ¶
Process a request from the queue.
Simulates request processing by yielding the service time. Updates statistics on completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
Event
|
The request event to process. |
required |
Yields:
| Type | Description |
|---|---|
float
|
Service time delay. |
Returns:
| Type | Description |
|---|---|
list[Event] | Event | None
|
Any completion events (typically None). |
get_service_time_percentile ¶
Calculate a percentile of observed service times.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
percentile
|
float
|
Percentile value between 0 and 1 (e.g., 0.99 for p99). |
required |
Returns:
| Type | Description |
|---|---|
float
|
Service time at the given percentile, or 0 if no data. |
ServerStats
dataclass
¶
ServerStats(
requests_completed: int = 0,
requests_rejected: int = 0,
total_service_time: float = 0.0,
)
Statistics tracked by Server.
ThreadPool ¶
ThreadPool(
name: str,
num_workers: int,
queue_policy: QueuePolicy | None = None,
queue_capacity: int | None = None,
processing_time_extractor: Callable[[Event], float]
| None = None,
default_processing_time: float = 0.01,
)
Bases: QueuedResource
Simulates a pool of worker threads processing tasks.
Tasks are events with processing time specified in their context metadata. Workers pick tasks from the queue when idle and process them for the specified duration.
Processing time can be specified in task context as: - task.context["metadata"]["processing_time"] = 0.05 # 50ms - Or via a custom extractor function
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Identifier for logging and debugging. |
|
num_workers |
int
|
Number of worker threads in the pool. |
Initialize the thread pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Pool identifier. |
required |
num_workers
|
int
|
Number of worker threads (must be >= 1). |
required |
queue_policy
|
QueuePolicy | None
|
Queue ordering policy (default FIFO). |
None
|
queue_capacity
|
int | None
|
Maximum queue size (default unlimited). |
None
|
processing_time_extractor
|
Callable[[Event], float] | None
|
Optional function to extract processing time from a task event. If None, looks for task.context["metadata"]["processing_time"]. |
None
|
default_processing_time
|
float
|
Default processing time in seconds when not specified in task context (default 10ms). |
0.01
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If num_workers < 1. |
worker_utilization
property
¶
Current worker utilization as fraction of total workers.
Returns:
| Type | Description |
|---|---|
float
|
Value between 0.0 and 1.0 representing active/total workers. |
average_processing_time
property
¶
Average observed processing time in seconds.
has_capacity ¶
Check if the pool has an idle worker available.
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one worker is idle. |
submit ¶
Submit a task to the pool for processing.
This is a convenience method that creates a properly formatted task event and returns it for scheduling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Event
|
The task event to process. Processing time should be in task.context["metadata"]["processing_time"] or will use the default. |
required |
Returns:
| Type | Description |
|---|---|
Event | None
|
The task event targeted at this pool, ready for scheduling. |
handle_queued_event ¶
Process a task from the queue.
Acquires a worker, processes for the task's specified time, then releases the worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
Event
|
The task event to process. |
required |
Yields:
| Type | Description |
|---|---|
float
|
Processing time delay. |
Returns:
| Type | Description |
|---|---|
list[Event] | Event | None
|
Any completion events (typically None). |
get_processing_time_percentile ¶
Calculate a percentile of observed processing times.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
percentile
|
float
|
Percentile value between 0 and 1 (e.g., 0.99 for p99). |
required |
Returns:
| Type | Description |
|---|---|
float
|
Processing time at the given percentile, or 0 if no data. |
ThreadPoolStats
dataclass
¶
ThreadPoolStats(
tasks_completed: int = 0,
tasks_rejected: int = 0,
total_processing_time: float = 0.0,
)
Statistics tracked by ThreadPool.