Queue Policies¶
Pluggable queue ordering strategies including FIFO, LIFO, and priority.
Advanced queue policies for sophisticated queue management.
This module provides active queue management (AQM) algorithms and specialized queuing strategies beyond basic FIFO/LIFO.
Policies
CoDelQueue: Controlled Delay - manages queue delay, not size REDQueue: Random Early Detection - probabilistic early dropping FairQueue: Per-flow fair queuing with round-robin WeightedFairQueue: Weighted fair queuing with priority classes DeadlineQueue: Priority by deadline with expiration AdaptiveLIFO: LIFO under congestion, FIFO otherwise
Example
from happysimulator.components.queue_policies import ( CoDelQueue, REDQueue, FairQueue, DeadlineQueue, )
CoDel for controlling latency¶
codel = CoDelQueue(target_delay=0.005, interval=0.100)
RED for congestion control¶
red = REDQueue(min_threshold=10, max_threshold=30)
Fair queuing for multi-tenant systems¶
fair = FairQueue(get_flow_id=lambda e: e.context["tenant_id"])
AdaptiveLIFO ¶
Bases: QueuePolicy[T]
Adaptive queue that switches between FIFO and LIFO based on congestion.
When queue depth is below the congestion threshold, items are dequeued in FIFO order (fair to all requests). When congestion exceeds the threshold, switches to LIFO to prioritize recent requests.
The rationale is that under congestion, older requests have likely already timed out from the client's perspective, so processing newer requests provides better overall user experience.
Attributes:
| Name | Type | Description |
|---|---|---|
congestion_threshold |
int
|
Queue depth to switch to LIFO mode. |
capacity |
float
|
Maximum queue capacity. |
Initialize the adaptive queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
congestion_threshold
|
int
|
Queue depth at which to switch to LIFO. |
required |
capacity
|
int | None
|
Maximum queue capacity (None = unlimited). |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If parameters are invalid. |
push ¶
Add item to queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
T
|
The item to enqueue. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if accepted, False if capacity exceeded. |
pop ¶
Remove and return the next item.
Uses FIFO when below congestion threshold, LIFO when above.
Returns:
| Type | Description |
|---|---|
T | None
|
The next item, or None if empty. |
peek ¶
Return the next item without removing it.
Returns item based on current mode (FIFO or LIFO).
AdaptiveLIFOStats
dataclass
¶
AdaptiveLIFOStats(
enqueued: int = 0,
dequeued_fifo: int = 0,
dequeued_lifo: int = 0,
capacity_rejected: int = 0,
mode_switches: int = 0,
)
Statistics tracked by AdaptiveLIFO.
CoDelQueue ¶
CoDelQueue(
target_delay: float = 0.005,
interval: float = 0.1,
capacity: int | None = None,
clock_func: Callable[[], Instant] | None = None,
)
Bases: QueuePolicy[T]
CoDel (Controlled Delay) active queue management.
CoDel monitors the minimum queue delay over an interval. When this minimum delay exceeds the target, it begins dropping packets. The drop rate increases if congestion persists.
Key properties: - Controls delay, not queue length - Self-tuning (no configuration based on bandwidth/queue size) - Fair to bursty traffic - Distinguishes between good queue (temporary burst) and bad queue (persistent congestion)
Attributes:
| Name | Type | Description |
|---|---|---|
target_delay |
float
|
Target minimum delay in seconds. |
interval |
float
|
Measurement interval in seconds. |
capacity |
float
|
Maximum queue size. |
Initialize the CoDel queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_delay
|
float
|
Target delay threshold in seconds (default 5ms). |
0.005
|
interval
|
float
|
Measurement interval in seconds (default 100ms). |
0.1
|
capacity
|
int | None
|
Maximum queue capacity (None = unlimited). |
None
|
clock_func
|
Callable[[], Instant] | None
|
Function returning current time as Instant. Required for time-based operations. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If parameters are invalid. |
set_clock ¶
Set the clock function for time-based operations.
push ¶
Add item to queue with timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
T
|
The item to enqueue. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if accepted, False if capacity exceeded. |
pop ¶
Remove and return the next item, applying CoDel algorithm.
May drop items if delay exceeds target for too long.
Returns:
| Type | Description |
|---|---|
T | None
|
The next item, or None if empty. |
CoDelStats
dataclass
¶
CoDelStats(
enqueued: int = 0,
dequeued: int = 0,
dropped: int = 0,
capacity_rejected: int = 0,
drop_intervals: int = 0,
)
Statistics tracked by CoDelQueue.
DeadlineQueue ¶
DeadlineQueue(
get_deadline: Callable[[T], Instant],
capacity: int | None = None,
clock_func: Callable[[], Instant] | None = None,
)
Bases: QueuePolicy[T]
Priority queue by deadline that drops expired items.
Items are dequeued in deadline order (earliest first). Expired items (deadline < current time) are automatically dropped.
This is useful for request queues where stale requests should be discarded rather than processed late.
Attributes:
| Name | Type | Description |
|---|---|---|
get_deadline |
Function to extract deadline from items. |
|
capacity |
float
|
Maximum queue capacity. |
Initialize the deadline queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
get_deadline
|
Callable[[T], Instant]
|
Function that extracts deadline from an item. |
required |
capacity
|
int | None
|
Maximum queue capacity (None = unlimited). |
None
|
clock_func
|
Callable[[], Instant] | None
|
Function returning current time as Instant. Required for expiration checking. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If parameters are invalid. |
set_clock ¶
Set the clock function for expiration checking.
push ¶
Add item to queue with its deadline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
T
|
The item to enqueue. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if accepted, False if capacity exceeded. |
pop ¶
Remove and return the item with earliest deadline.
Automatically drops expired items until finding a valid one.
Returns:
| Type | Description |
|---|---|
T | None
|
The next non-expired item, or None if all expired/empty. |
peek ¶
Return the next non-expired item without removing it.
Note: Does not remove expired items (use pop for that).
purge_expired ¶
Remove all expired items from the queue.
Returns:
| Type | Description |
|---|---|
int
|
Number of items removed. |
is_empty ¶
Return True if no items in queue.
Note: May include expired items. Use pop() to skip expired.
DeadlineQueueStats
dataclass
¶
DeadlineQueueStats(
enqueued: int = 0,
dequeued: int = 0,
expired: int = 0,
capacity_rejected: int = 0,
)
Statistics tracked by DeadlineQueue.
FairQueue ¶
FairQueue(
get_flow_id: Callable[[T], str],
max_flows: int | None = None,
per_flow_capacity: int | None = None,
)
Bases: QueuePolicy[T]
Per-flow fair queuing.
Maintains separate queues for each flow and dequeues in round-robin fashion, ensuring fair bandwidth allocation across flows.
This prevents a single aggressive flow from monopolizing the queue and starving other flows.
Attributes:
| Name | Type | Description |
|---|---|---|
get_flow_id |
Function to extract flow ID from items. |
|
max_flows |
int | None
|
Maximum number of concurrent flows. |
per_flow_capacity |
float
|
Maximum items per flow queue. |
Initialize the fair queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
get_flow_id
|
Callable[[T], str]
|
Function that extracts flow ID from an item. |
required |
max_flows
|
int | None
|
Maximum number of concurrent flows (None = unlimited). |
None
|
per_flow_capacity
|
int | None
|
Maximum items per flow (None = unlimited). |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If parameters are invalid. |
push ¶
Add item to its flow's queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
T
|
The item to enqueue. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if accepted, False if rejected. |
pop ¶
Remove and return the next item using round-robin.
Returns:
| Type | Description |
|---|---|
T | None
|
The next item, or None if empty. |
FairQueueStats
dataclass
¶
FairQueueStats(
enqueued: int = 0,
dequeued: int = 0,
rejected_flow_capacity: int = 0,
rejected_max_flows: int = 0,
flows_created: int = 0,
flows_removed: int = 0,
)
Statistics tracked by FairQueue.
REDQueue ¶
REDQueue(
min_threshold: int,
max_threshold: int,
max_probability: float = 0.1,
capacity: int | None = None,
weight: float = 0.002,
)
Bases: QueuePolicy[T]
Random Early Detection queue management.
RED maintains an exponentially-weighted moving average of queue size and drops packets probabilistically when average queue exceeds thresholds.
Drop probability: - avg_queue < min_threshold: 0% (no drops) - min_threshold <= avg_queue < max_threshold: linear 0% to max_probability - avg_queue >= max_threshold: 100% (drop all)
The averaging smooths out burst traffic, preventing unnecessary drops during temporary spikes.
Attributes:
| Name | Type | Description |
|---|---|---|
min_threshold |
int
|
Queue length to start probabilistic drops. |
max_threshold |
int
|
Queue length for 100% drop rate. |
max_probability |
float
|
Maximum drop probability (at max_threshold). |
capacity |
float
|
Hard queue capacity limit. |
Initialize the RED queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
min_threshold
|
int
|
Minimum average queue length to start dropping. |
required |
max_threshold
|
int
|
Average queue length for 100% drop rate. |
required |
max_probability
|
float
|
Maximum drop probability (default 0.1 = 10%). |
0.1
|
capacity
|
int | None
|
Hard capacity limit. Defaults to 2 * max_threshold. |
None
|
weight
|
float
|
Weight for exponential moving average (default 0.002). |
0.002
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If parameters are invalid. |
avg_queue_length
property
¶
Current exponential moving average of queue length.
push ¶
Add item to queue, applying RED algorithm.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
T
|
The item to enqueue. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if accepted, False if dropped. |
pop ¶
Remove and return the next item.
Returns:
| Type | Description |
|---|---|
T | None
|
The next item, or None if empty. |
REDStats
dataclass
¶
REDStats(
enqueued: int = 0,
dequeued: int = 0,
dropped_probabilistic: int = 0,
dropped_forced: int = 0,
capacity_rejected: int = 0,
)
Statistics tracked by REDQueue.
WeightedFairQueue ¶
WeightedFairQueue(
get_flow_id: Callable[[T], str],
get_weight: Callable[[str], int],
capacity: int | None = None,
per_flow_capacity: int | None = None,
)
Bases: QueuePolicy[T]
Weighted fair queuing with priority classes.
Similar to FairQueue but flows are assigned weights that determine how many items they can dequeue per round-robin cycle.
A flow with weight 3 gets to dequeue 3 items for every 1 item dequeued from a flow with weight 1.
This implements a deficit round-robin (DRR) style algorithm where each flow accumulates credits based on its weight.
Attributes:
| Name | Type | Description |
|---|---|---|
get_flow_id |
Function to extract flow ID from items. |
|
get_weight |
Function to get weight for a flow ID. |
|
capacity |
float
|
Total queue capacity. |
Initialize the weighted fair queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
get_flow_id
|
Callable[[T], str]
|
Function that extracts flow ID from an item. |
required |
get_weight
|
Callable[[str], int]
|
Function that returns weight for a flow ID. Higher weight = more bandwidth share. |
required |
capacity
|
int | None
|
Total queue capacity (None = unlimited). |
None
|
per_flow_capacity
|
int | None
|
Maximum items per flow (None = unlimited). |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If parameters are invalid. |
push ¶
Add item to its flow's queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
T
|
The item to enqueue. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if accepted, False if rejected. |
pop ¶
Remove and return the next item using weighted round-robin.
Returns:
| Type | Description |
|---|---|
T | None
|
The next item, or None if empty. |
WeightedFairQueueStats
dataclass
¶
WeightedFairQueueStats(
enqueued: int = 0,
dequeued: int = 0,
rejected_capacity: int = 0,
flows_created: int = 0,
flows_removed: int = 0,
)
Statistics tracked by WeightedFairQueue.