Skip to content

Messaging

Message queues, pub/sub, and point-to-point messaging patterns.

Messaging components for pub/sub and message queue patterns.

This module provides: - MessageQueue: Persistent queue with acknowledgment and redelivery - Topic: Pub/sub topic for broadcasting to multiple subscribers - DeadLetterQueue: Storage for messages that failed processing

Example

from happysimulator.components.messaging import ( MessageQueue, Topic, DeadLetterQueue, )

Create a message queue with dead letter support

dlq = DeadLetterQueue(name="orders_dlq") orders = MessageQueue( name="orders", max_redeliveries=3, dead_letter_queue=dlq, )

Create a pub/sub topic

notifications = Topic(name="notifications") notifications.subscribe(email_service) notifications.subscribe(sms_service)

DeadLetterQueue

DeadLetterQueue(
    name: str,
    capacity: int | None = None,
    retention_period: float | None = None,
)

Bases: Entity

Stores messages that failed processing.

Dead letter queues collect messages that could not be processed after exhausting all retry attempts. They can be inspected for debugging or reprocessed later.

Attributes:

Name Type Description
name

Entity name for identification.

message_count int

Number of dead-lettered messages.

Initialize the dead letter queue.

Parameters:

Name Type Description Default
name str

Name for this DLQ entity.

required
capacity int | None

Maximum messages to store (None for unlimited).

None
retention_period float | None

Time in seconds to retain messages (None for forever).

None

stats property

stats: DeadLetterStats

Return a frozen snapshot of current statistics.

message_count property

message_count: int

Number of messages in the DLQ.

messages property

messages: list[Message]

List of all dead-lettered messages.

capacity property

capacity: int | None

Maximum capacity.

is_full property

is_full: bool

Whether the DLQ is at capacity.

add_message

add_message(message: Message) -> bool

Add a message to the dead letter queue.

Parameters:

Name Type Description Default
message Message

The failed message to store.

required

Returns:

Type Description
bool

True if added, False if at capacity.

get_message

get_message(index: int) -> Message | None

Get a message by index.

Parameters:

Name Type Description Default
index int

Index of the message (0 = oldest).

required

Returns:

Type Description
Message | None

The message, or None if index out of range.

peek

peek() -> Message | None

Peek at the oldest message.

Returns:

Type Description
Message | None

The oldest message, or None if empty.

pop

pop() -> Message | None

Remove and return the oldest message.

Returns:

Type Description
Message | None

The oldest message, or None if empty.

clear

clear() -> int

Clear all messages.

Returns:

Type Description
int

Number of messages cleared.

reprocess

reprocess(
    message: Message, target_queue: MessageQueue
) -> Event | None

Reprocess a single message by publishing to a queue.

Parameters:

Name Type Description Default
message Message

The message to reprocess.

required
target_queue MessageQueue

The queue to publish to.

required

Returns:

Type Description
Event | None

The reprocessed message event, or None if failed.

reprocess_all

reprocess_all(target_queue: MessageQueue) -> list[Event]

Reprocess all messages in the DLQ.

Parameters:

Name Type Description Default
target_queue MessageQueue

The queue to publish to.

required

Returns:

Type Description
list[Event]

List of republish events.

get_messages_by_age

get_messages_by_age(max_age: float) -> list[Message]

Get messages younger than max_age seconds.

Parameters:

Name Type Description Default
max_age float

Maximum age in seconds.

required

Returns:

Type Description
list[Message]

List of messages within age limit.

get_messages_by_delivery_count

get_messages_by_delivery_count(
    min_count: int,
) -> list[Message]

Get messages with at least min_count delivery attempts.

Parameters:

Name Type Description Default
min_count int

Minimum delivery count.

required

Returns:

Type Description
list[Message]

List of messages meeting criteria.

handle_event

handle_event(event: Event) -> list[Event]

Handle DLQ events.

DeadLetterStats dataclass

DeadLetterStats(
    messages_received: int = 0,
    messages_reprocessed: int = 0,
    messages_discarded: int = 0,
)

Statistics tracked by DeadLetterQueue.

Message dataclass

Message(
    id: str,
    payload: Event,
    created_at: Instant,
    state: MessageState = MessageState.PENDING,
    delivery_count: int = 0,
    last_delivered_at: Instant | None = None,
    consumer: Entity | None = None,
)

Represents a message in the queue.

MessageQueue

MessageQueue(
    name: str,
    delivery_latency: float = 0.001,
    redelivery_delay: float = 30.0,
    max_redeliveries: int = 3,
    capacity: int | None = None,
    dead_letter_queue: DeadLetterQueue | None = None,
)

Bases: Entity

Persistent message queue with acknowledgment.

Provides reliable message delivery with at-least-once semantics. Messages are redelivered if not acknowledged within timeout.

Attributes:

Name Type Description
name

Entity name for identification.

pending_count int

Number of messages waiting to be delivered.

in_flight_count int

Number of messages delivered but not yet acknowledged.

Initialize the message queue.

Parameters:

Name Type Description Default
name str

Name for this queue entity.

required
delivery_latency float

Latency to deliver message to consumer.

0.001
redelivery_delay float

Delay before redelivering unacknowledged message.

30.0
max_redeliveries int

Maximum redelivery attempts before dead-lettering.

3
capacity int | None

Maximum queue size (None for unlimited).

None
dead_letter_queue DeadLetterQueue | None

Optional DLQ for failed messages.

None

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: MessageQueueStats

Return a frozen snapshot of current statistics.

pending_count property

pending_count: int

Number of messages waiting to be delivered.

in_flight_count property

in_flight_count: int

Number of messages delivered but not acknowledged.

consumer_count property

consumer_count: int

Number of subscribed consumers.

capacity property

capacity: int | None

Maximum queue capacity.

is_full property

is_full: bool

Whether the queue is at capacity.

subscribe

subscribe(consumer: Entity) -> None

Subscribe a consumer to receive messages.

Parameters:

Name Type Description Default
consumer Entity

The entity to receive messages.

required

unsubscribe

unsubscribe(consumer: Entity) -> None

Unsubscribe a consumer.

Parameters:

Name Type Description Default
consumer Entity

The entity to remove.

required

publish

publish(message: Event) -> Generator[float, None, str]

Publish a message to the queue.

Parameters:

Name Type Description Default
message Event

The event to publish.

required

Yields:

Type Description
float

Publishing latency.

Returns:

Type Description
str

The message ID.

Raises:

Type Description
RuntimeError

If queue is at capacity.

acknowledge

acknowledge(message_id: str) -> None

Acknowledge successful processing of a message.

Parameters:

Name Type Description Default
message_id str

ID of the message to acknowledge.

required

reject

reject(message_id: str, requeue: bool = True) -> None

Reject a message.

Parameters:

Name Type Description Default
message_id str

ID of the message to reject.

required
requeue bool

If True, requeue for redelivery. If False, discard or DLQ.

True

poll

poll() -> Generator[float, None, Event | None]

Poll for the next message to deliver.

Yields:

Type Description
float

Polling/delivery latency.

Returns:

Type Description
Event | None

Delivery event or None if queue empty or no consumers.

schedule_redelivery

schedule_redelivery(message_id: str) -> Event | None

Schedule redelivery of an unacknowledged message.

Parameters:

Name Type Description Default
message_id str

ID of message to redeliver.

required

Returns:

Type Description
Event | None

Redelivery event, or None if message not found.

get_message

get_message(message_id: str) -> Message | None

Get a message by ID.

Parameters:

Name Type Description Default
message_id str

The message ID.

required

Returns:

Type Description
Message | None

The message, or None if not found.

handle_event

handle_event(
    event: Event,
) -> Generator[float, None, list[Event]] | list[Event]

Handle queue events.

MessageQueueStats dataclass

MessageQueueStats(
    messages_published: int = 0,
    messages_delivered: int = 0,
    messages_acknowledged: int = 0,
    messages_rejected: int = 0,
    messages_redelivered: int = 0,
    messages_dead_lettered: int = 0,
    delivery_latencies: tuple[float, ...] = (),
)

Statistics tracked by MessageQueue.

avg_delivery_latency property

avg_delivery_latency: float

Average delivery latency.

ack_rate property

ack_rate: float

Acknowledgment rate.

MessageState

Bases: Enum

State of a message in the queue.

Subscription dataclass

Subscription(
    subscriber: Entity,
    subscribed_at: Instant,
    messages_received: int = 0,
    active: bool = True,
)

Represents a subscription to a topic.

Topic

Topic(
    name: str,
    delivery_latency: float = 0.001,
    max_subscribers: int | None = None,
)

Bases: Entity

Pub/sub topic with multiple subscribers.

Broadcasts published messages to all active subscribers. Each subscriber receives a copy of every message.

Attributes:

Name Type Description
name

Entity name for identification.

subscriber_count int

Number of active subscribers.

Initialize the topic.

Parameters:

Name Type Description Default
name str

Name for this topic entity.

required
delivery_latency float

Latency to deliver message to each subscriber.

0.001
max_subscribers int | None

Maximum number of subscribers (None for unlimited).

None

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: TopicStats

Return a frozen snapshot of current statistics.

subscriber_count property

subscriber_count: int

Number of active subscribers.

subscribers property

subscribers: list[Entity]

List of active subscribers.

max_subscribers property

max_subscribers: int | None

Maximum number of subscribers.

subscribe

subscribe(
    subscriber: Entity, replay_history: bool = False
) -> list[Event]

Subscribe an entity to the topic.

Parameters:

Name Type Description Default
subscriber Entity

The entity to subscribe.

required
replay_history bool

If True, deliver historical messages.

False

Returns:

Type Description
list[Event]

List of historical message events (if replay_history=True).

Raises:

Type Description
RuntimeError

If max subscribers reached.

unsubscribe

unsubscribe(subscriber: Entity) -> None

Unsubscribe an entity from the topic.

Parameters:

Name Type Description Default
subscriber Entity

The entity to unsubscribe.

required

publish

publish(
    message: Event,
) -> Generator[float, None, list[Event]]

Publish a message to all subscribers.

Parameters:

Name Type Description Default
message Event

The event to publish.

required

Yields:

Type Description
float

Publishing latency (per subscriber delivery).

Returns:

Type Description
list[Event]

List of delivery events for each subscriber.

publish_sync

publish_sync(message: Event) -> list[Event]

Publish a message synchronously (no delay simulation).

Parameters:

Name Type Description Default
message Event

The event to publish.

required

Returns:

Type Description
list[Event]

List of delivery events for each subscriber.

set_retain_messages

set_retain_messages(
    retain: bool, max_history: int = 100
) -> None

Configure message retention.

Parameters:

Name Type Description Default
retain bool

Whether to retain messages.

required
max_history int

Maximum messages to retain.

100

get_subscription

get_subscription(subscriber: Entity) -> Subscription | None

Get subscription details for a subscriber.

Parameters:

Name Type Description Default
subscriber Entity

The subscriber entity.

required

Returns:

Type Description
Subscription | None

Subscription details, or None if not subscribed.

handle_event

handle_event(
    event: Event,
) -> Generator[float, None, list[Event]] | list[Event]

Handle topic events.

TopicStats dataclass

TopicStats(
    messages_published: int = 0,
    messages_delivered: int = 0,
    subscribers_added: int = 0,
    subscribers_removed: int = 0,
    delivery_latencies: tuple[float, ...] = (),
)

Statistics tracked by Topic.

avg_delivery_latency property

avg_delivery_latency: float

Average delivery latency.