Skip to content

Queue Policies

Pluggable queue ordering strategies including FIFO, LIFO, and priority.

Advanced queue policies for sophisticated queue management.

This module provides active queue management (AQM) algorithms and specialized queuing strategies beyond basic FIFO/LIFO.

Policies

CoDelQueue: Controlled Delay - manages queue delay, not size REDQueue: Random Early Detection - probabilistic early dropping FairQueue: Per-flow fair queuing with round-robin WeightedFairQueue: Weighted fair queuing with priority classes DeadlineQueue: Priority by deadline with expiration AdaptiveLIFO: LIFO under congestion, FIFO otherwise

Example

from happysimulator.components.queue_policies import ( CoDelQueue, REDQueue, FairQueue, DeadlineQueue, )

CoDel for controlling latency

codel = CoDelQueue(target_delay=0.005, interval=0.100)

RED for congestion control

red = REDQueue(min_threshold=10, max_threshold=30)

Fair queuing for multi-tenant systems

fair = FairQueue(get_flow_id=lambda e: e.context["tenant_id"])

AdaptiveLIFO

AdaptiveLIFO(
    congestion_threshold: int, capacity: int | None = None
)

Bases: QueuePolicy[T]

Adaptive queue that switches between FIFO and LIFO based on congestion.

When queue depth is below the congestion threshold, items are dequeued in FIFO order (fair to all requests). When congestion exceeds the threshold, switches to LIFO to prioritize recent requests.

The rationale is that under congestion, older requests have likely already timed out from the client's perspective, so processing newer requests provides better overall user experience.

Attributes:

Name Type Description
congestion_threshold int

Queue depth to switch to LIFO mode.

capacity float

Maximum queue capacity.

Initialize the adaptive queue.

Parameters:

Name Type Description Default
congestion_threshold int

Queue depth at which to switch to LIFO.

required
capacity int | None

Maximum queue capacity (None = unlimited).

None

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: AdaptiveLIFOStats

Return a frozen snapshot of queue statistics.

congestion_threshold property

congestion_threshold: int

Queue depth at which to switch to LIFO.

capacity property

capacity: float

Maximum queue capacity.

is_congested property

is_congested: bool

Whether queue is currently in congested (LIFO) mode.

mode property

mode: str

Current queue mode: 'FIFO' or 'LIFO'.

push

push(item: T) -> bool

Add item to queue.

Parameters:

Name Type Description Default
item T

The item to enqueue.

required

Returns:

Type Description
bool

True if accepted, False if capacity exceeded.

pop

pop() -> T | None

Remove and return the next item.

Uses FIFO when below congestion threshold, LIFO when above.

Returns:

Type Description
T | None

The next item, or None if empty.

peek

peek() -> T | None

Return the next item without removing it.

Returns item based on current mode (FIFO or LIFO).

is_empty

is_empty() -> bool

Return True if no items in queue.

__len__

__len__() -> int

Return number of items in queue.

AdaptiveLIFOStats dataclass

AdaptiveLIFOStats(
    enqueued: int = 0,
    dequeued_fifo: int = 0,
    dequeued_lifo: int = 0,
    capacity_rejected: int = 0,
    mode_switches: int = 0,
)

Statistics tracked by AdaptiveLIFO.

CoDelQueue

CoDelQueue(
    target_delay: float = 0.005,
    interval: float = 0.1,
    capacity: int | None = None,
    clock_func: Callable[[], Instant] | None = None,
)

Bases: QueuePolicy[T]

CoDel (Controlled Delay) active queue management.

CoDel monitors the minimum queue delay over an interval. When this minimum delay exceeds the target, it begins dropping packets. The drop rate increases if congestion persists.

Key properties: - Controls delay, not queue length - Self-tuning (no configuration based on bandwidth/queue size) - Fair to bursty traffic - Distinguishes between good queue (temporary burst) and bad queue (persistent congestion)

Attributes:

Name Type Description
target_delay float

Target minimum delay in seconds.

interval float

Measurement interval in seconds.

capacity float

Maximum queue size.

Initialize the CoDel queue.

Parameters:

Name Type Description Default
target_delay float

Target delay threshold in seconds (default 5ms).

0.005
interval float

Measurement interval in seconds (default 100ms).

0.1
capacity int | None

Maximum queue capacity (None = unlimited).

None
clock_func Callable[[], Instant] | None

Function returning current time as Instant. Required for time-based operations.

None

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: CoDelStats

Return a frozen snapshot of queue statistics.

target_delay property

target_delay: float

Target delay threshold in seconds.

interval property

interval: float

Measurement interval in seconds.

capacity property

capacity: float

Maximum queue capacity.

dropping property

dropping: bool

Whether queue is currently in dropping state.

set_clock

set_clock(clock_func: callable) -> None

Set the clock function for time-based operations.

push

push(item: T) -> bool

Add item to queue with timestamp.

Parameters:

Name Type Description Default
item T

The item to enqueue.

required

Returns:

Type Description
bool

True if accepted, False if capacity exceeded.

pop

pop() -> T | None

Remove and return the next item, applying CoDel algorithm.

May drop items if delay exceeds target for too long.

Returns:

Type Description
T | None

The next item, or None if empty.

peek

peek() -> T | None

Return the next item without removing it.

is_empty

is_empty() -> bool

Return True if no items in queue.

__len__

__len__() -> int

Return number of items in queue.

CoDelStats dataclass

CoDelStats(
    enqueued: int = 0,
    dequeued: int = 0,
    dropped: int = 0,
    capacity_rejected: int = 0,
    drop_intervals: int = 0,
)

Statistics tracked by CoDelQueue.

DeadlineQueue

DeadlineQueue(
    get_deadline: Callable[[T], Instant],
    capacity: int | None = None,
    clock_func: Callable[[], Instant] | None = None,
)

Bases: QueuePolicy[T]

Priority queue by deadline that drops expired items.

Items are dequeued in deadline order (earliest first). Expired items (deadline < current time) are automatically dropped.

This is useful for request queues where stale requests should be discarded rather than processed late.

Attributes:

Name Type Description
get_deadline

Function to extract deadline from items.

capacity float

Maximum queue capacity.

Initialize the deadline queue.

Parameters:

Name Type Description Default
get_deadline Callable[[T], Instant]

Function that extracts deadline from an item.

required
capacity int | None

Maximum queue capacity (None = unlimited).

None
clock_func Callable[[], Instant] | None

Function returning current time as Instant. Required for expiration checking.

None

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: DeadlineQueueStats

Return a frozen snapshot of queue statistics.

capacity property

capacity: float

Maximum queue capacity.

set_clock

set_clock(clock_func: Callable[[], Instant]) -> None

Set the clock function for expiration checking.

push

push(item: T) -> bool

Add item to queue with its deadline.

Parameters:

Name Type Description Default
item T

The item to enqueue.

required

Returns:

Type Description
bool

True if accepted, False if capacity exceeded.

pop

pop() -> T | None

Remove and return the item with earliest deadline.

Automatically drops expired items until finding a valid one.

Returns:

Type Description
T | None

The next non-expired item, or None if all expired/empty.

peek

peek() -> T | None

Return the next non-expired item without removing it.

Note: Does not remove expired items (use pop for that).

purge_expired

purge_expired() -> int

Remove all expired items from the queue.

Returns:

Type Description
int

Number of items removed.

is_empty

is_empty() -> bool

Return True if no items in queue.

Note: May include expired items. Use pop() to skip expired.

__len__

__len__() -> int

Return number of items in queue (including potentially expired).

count_expired

count_expired() -> int

Count currently expired items without removing them.

count_valid

count_valid() -> int

Count non-expired items.

DeadlineQueueStats dataclass

DeadlineQueueStats(
    enqueued: int = 0,
    dequeued: int = 0,
    expired: int = 0,
    capacity_rejected: int = 0,
)

Statistics tracked by DeadlineQueue.

FairQueue

FairQueue(
    get_flow_id: Callable[[T], str],
    max_flows: int | None = None,
    per_flow_capacity: int | None = None,
)

Bases: QueuePolicy[T]

Per-flow fair queuing.

Maintains separate queues for each flow and dequeues in round-robin fashion, ensuring fair bandwidth allocation across flows.

This prevents a single aggressive flow from monopolizing the queue and starving other flows.

Attributes:

Name Type Description
get_flow_id

Function to extract flow ID from items.

max_flows int | None

Maximum number of concurrent flows.

per_flow_capacity float

Maximum items per flow queue.

Initialize the fair queue.

Parameters:

Name Type Description Default
get_flow_id Callable[[T], str]

Function that extracts flow ID from an item.

required
max_flows int | None

Maximum number of concurrent flows (None = unlimited).

None
per_flow_capacity int | None

Maximum items per flow (None = unlimited).

None

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: FairQueueStats

Return a frozen snapshot of queue statistics.

capacity property

capacity: float

Total capacity (flows * per_flow_capacity).

flow_count property

flow_count: int

Number of active flows.

max_flows property

max_flows: int | None

Maximum number of concurrent flows.

per_flow_capacity property

per_flow_capacity: float

Maximum items per flow.

get_flow_depth

get_flow_depth(flow_id: str) -> int

Get current queue depth for a specific flow.

push

push(item: T) -> bool

Add item to its flow's queue.

Parameters:

Name Type Description Default
item T

The item to enqueue.

required

Returns:

Type Description
bool

True if accepted, False if rejected.

pop

pop() -> T | None

Remove and return the next item using round-robin.

Returns:

Type Description
T | None

The next item, or None if empty.

peek

peek() -> T | None

Return the next item without removing it.

is_empty

is_empty() -> bool

Return True if no items in queue.

__len__

__len__() -> int

Return total number of items across all flows.

FairQueueStats dataclass

FairQueueStats(
    enqueued: int = 0,
    dequeued: int = 0,
    rejected_flow_capacity: int = 0,
    rejected_max_flows: int = 0,
    flows_created: int = 0,
    flows_removed: int = 0,
)

Statistics tracked by FairQueue.

REDQueue

REDQueue(
    min_threshold: int,
    max_threshold: int,
    max_probability: float = 0.1,
    capacity: int | None = None,
    weight: float = 0.002,
)

Bases: QueuePolicy[T]

Random Early Detection queue management.

RED maintains an exponentially-weighted moving average of queue size and drops packets probabilistically when average queue exceeds thresholds.

Drop probability: - avg_queue < min_threshold: 0% (no drops) - min_threshold <= avg_queue < max_threshold: linear 0% to max_probability - avg_queue >= max_threshold: 100% (drop all)

The averaging smooths out burst traffic, preventing unnecessary drops during temporary spikes.

Attributes:

Name Type Description
min_threshold int

Queue length to start probabilistic drops.

max_threshold int

Queue length for 100% drop rate.

max_probability float

Maximum drop probability (at max_threshold).

capacity float

Hard queue capacity limit.

Initialize the RED queue.

Parameters:

Name Type Description Default
min_threshold int

Minimum average queue length to start dropping.

required
max_threshold int

Average queue length for 100% drop rate.

required
max_probability float

Maximum drop probability (default 0.1 = 10%).

0.1
capacity int | None

Hard capacity limit. Defaults to 2 * max_threshold.

None
weight float

Weight for exponential moving average (default 0.002).

0.002

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: REDStats

Return a frozen snapshot of queue statistics.

min_threshold property

min_threshold: int

Minimum queue length to start dropping.

max_threshold property

max_threshold: int

Queue length for 100% drop rate.

max_probability property

max_probability: float

Maximum drop probability.

capacity property

capacity: float

Hard queue capacity limit.

avg_queue_length property

avg_queue_length: float

Current exponential moving average of queue length.

push

push(item: T) -> bool

Add item to queue, applying RED algorithm.

Parameters:

Name Type Description Default
item T

The item to enqueue.

required

Returns:

Type Description
bool

True if accepted, False if dropped.

pop

pop() -> T | None

Remove and return the next item.

Returns:

Type Description
T | None

The next item, or None if empty.

peek

peek() -> T | None

Return the next item without removing it.

is_empty

is_empty() -> bool

Return True if no items in queue.

__len__

__len__() -> int

Return number of items in queue.

REDStats dataclass

REDStats(
    enqueued: int = 0,
    dequeued: int = 0,
    dropped_probabilistic: int = 0,
    dropped_forced: int = 0,
    capacity_rejected: int = 0,
)

Statistics tracked by REDQueue.

WeightedFairQueue

WeightedFairQueue(
    get_flow_id: Callable[[T], str],
    get_weight: Callable[[str], int],
    capacity: int | None = None,
    per_flow_capacity: int | None = None,
)

Bases: QueuePolicy[T]

Weighted fair queuing with priority classes.

Similar to FairQueue but flows are assigned weights that determine how many items they can dequeue per round-robin cycle.

A flow with weight 3 gets to dequeue 3 items for every 1 item dequeued from a flow with weight 1.

This implements a deficit round-robin (DRR) style algorithm where each flow accumulates credits based on its weight.

Attributes:

Name Type Description
get_flow_id

Function to extract flow ID from items.

get_weight

Function to get weight for a flow ID.

capacity float

Total queue capacity.

Initialize the weighted fair queue.

Parameters:

Name Type Description Default
get_flow_id Callable[[T], str]

Function that extracts flow ID from an item.

required
get_weight Callable[[str], int]

Function that returns weight for a flow ID. Higher weight = more bandwidth share.

required
capacity int | None

Total queue capacity (None = unlimited).

None
per_flow_capacity int | None

Maximum items per flow (None = unlimited).

None

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: WeightedFairQueueStats

Return a frozen snapshot of queue statistics.

capacity property

capacity: float

Total queue capacity.

flow_count property

flow_count: int

Number of active flows.

get_flow_depth

get_flow_depth(flow_id: str) -> int

Get current queue depth for a specific flow.

get_flow_weight

get_flow_weight(flow_id: str) -> int

Get weight for a specific flow.

push

push(item: T) -> bool

Add item to its flow's queue.

Parameters:

Name Type Description Default
item T

The item to enqueue.

required

Returns:

Type Description
bool

True if accepted, False if rejected.

pop

pop() -> T | None

Remove and return the next item using weighted round-robin.

Returns:

Type Description
T | None

The next item, or None if empty.

peek

peek() -> T | None

Return the next item without removing it.

is_empty

is_empty() -> bool

Return True if no items in queue.

__len__

__len__() -> int

Return total number of items across all flows.

WeightedFairQueueStats dataclass

WeightedFairQueueStats(
    enqueued: int = 0,
    dequeued: int = 0,
    rejected_capacity: int = 0,
    flows_created: int = 0,
    flows_removed: int = 0,
)

Statistics tracked by WeightedFairQueue.