Skip to content

Queue

FIFO, LIFO, and priority queue policies for event buffering.

Bounded buffer queue with pluggable ordering policy.

Implements a queue entity that buffers incoming events and delivers them to a downstream driver on demand. Uses QueuePolicy implementations to control ordering (FIFO, LIFO, Priority).

The Queue/QueueDriver separation exists to decouple queue management from the target entity. The target entity does not need to know it is being fed by a queue.

QueuePollEvent

QueuePollEvent(
    *,
    time: Instant,
    target,
    requestor: Entity = None,
    **kwargs,
)

Bases: Event

Request from driver to queue for the next work item.

Sent when the driver's target has capacity and is ready for work. The queue responds with QueueDeliverEvent if items are available.

QueueNotifyEvent

QueueNotifyEvent(
    *,
    time: Instant,
    target,
    queue_entity: Entity = None,
    **kwargs,
)

Bases: Event

Notification from queue to driver that work is available.

Sent when an item is enqueued into a previously empty queue. The driver should check target capacity and poll if ready.

QueueDeliverEvent

QueueDeliverEvent(
    *,
    time: Instant,
    target,
    payload: Event | None = None,
    queue_entity: Entity | None = None,
    **kwargs,
)

Bases: Event

Delivery from queue to driver containing one work item.

The payload event is passed unmodified. The driver is responsible for retargeting it to the downstream entity before scheduling.

Queue dataclass

Queue(
    name: str = "Queue",
    egress: Entity = None,
    policy: QueuePolicy = None,
)

Bases: Entity

Bounded buffer that stores events and delivers them on demand.

Accepts incoming events and buffers them according to its QueuePolicy. Notifies the egress (typically a QueueDriver) when items are available. Responds to poll requests by delivering the next item.

The queue tracks acceptance and drop statistics for capacity analysis.

Attributes:

Name Type Description
name str

Identifier for logging.

egress Entity

Downstream entity to notify (usually QueueDriver).

policy QueuePolicy

Ordering strategy (FIFO, LIFO, Priority). Defaults to FIFO.

stats_dropped int

Count of items rejected due to capacity.

stats_accepted int

Count of items successfully enqueued.

has_capacity

has_capacity() -> bool

Return True if the queue can accept more items.