Messaging¶
Message queues, pub/sub, and point-to-point messaging patterns.
Messaging components for pub/sub and message queue patterns.
This module provides: - MessageQueue: Persistent queue with acknowledgment and redelivery - Topic: Pub/sub topic for broadcasting to multiple subscribers - DeadLetterQueue: Storage for messages that failed processing
Example
from happysimulator.components.messaging import ( MessageQueue, Topic, DeadLetterQueue, )
Create a message queue with dead letter support¶
dlq = DeadLetterQueue(name="orders_dlq") orders = MessageQueue( name="orders", max_redeliveries=3, dead_letter_queue=dlq, )
Create a pub/sub topic¶
notifications = Topic(name="notifications") notifications.subscribe(email_service) notifications.subscribe(sms_service)
DeadLetterQueue ¶
Bases: Entity
Stores messages that failed processing.
Dead letter queues collect messages that could not be processed after exhausting all retry attempts. They can be inspected for debugging or reprocessed later.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
|
message_count |
int
|
Number of dead-lettered messages. |
Initialize the dead letter queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this DLQ entity. |
required |
capacity
|
int | None
|
Maximum messages to store (None for unlimited). |
None
|
retention_period
|
float | None
|
Time in seconds to retain messages (None for forever). |
None
|
add_message ¶
Add a message to the dead letter queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
The failed message to store. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if added, False if at capacity. |
get_message ¶
Get a message by index.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
index
|
int
|
Index of the message (0 = oldest). |
required |
Returns:
| Type | Description |
|---|---|
Message | None
|
The message, or None if index out of range. |
peek ¶
Peek at the oldest message.
Returns:
| Type | Description |
|---|---|
Message | None
|
The oldest message, or None if empty. |
pop ¶
Remove and return the oldest message.
Returns:
| Type | Description |
|---|---|
Message | None
|
The oldest message, or None if empty. |
clear ¶
Clear all messages.
Returns:
| Type | Description |
|---|---|
int
|
Number of messages cleared. |
reprocess ¶
Reprocess a single message by publishing to a queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
The message to reprocess. |
required |
target_queue
|
MessageQueue
|
The queue to publish to. |
required |
Returns:
| Type | Description |
|---|---|
Event | None
|
The reprocessed message event, or None if failed. |
reprocess_all ¶
Reprocess all messages in the DLQ.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_queue
|
MessageQueue
|
The queue to publish to. |
required |
Returns:
| Type | Description |
|---|---|
list[Event]
|
List of republish events. |
get_messages_by_age ¶
Get messages younger than max_age seconds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_age
|
float
|
Maximum age in seconds. |
required |
Returns:
| Type | Description |
|---|---|
list[Message]
|
List of messages within age limit. |
DeadLetterStats
dataclass
¶
DeadLetterStats(
messages_received: int = 0,
messages_reprocessed: int = 0,
messages_discarded: int = 0,
)
Statistics tracked by DeadLetterQueue.
Message
dataclass
¶
Message(
id: str,
payload: Event,
created_at: Instant,
state: MessageState = MessageState.PENDING,
delivery_count: int = 0,
last_delivered_at: Instant | None = None,
consumer: Entity | None = None,
)
Represents a message in the queue.
MessageQueue ¶
MessageQueue(
name: str,
delivery_latency: float = 0.001,
redelivery_delay: float = 30.0,
max_redeliveries: int = 3,
capacity: int | None = None,
dead_letter_queue: DeadLetterQueue | None = None,
)
Bases: Entity
Persistent message queue with acknowledgment.
Provides reliable message delivery with at-least-once semantics. Messages are redelivered if not acknowledged within timeout.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
|
pending_count |
int
|
Number of messages waiting to be delivered. |
in_flight_count |
int
|
Number of messages delivered but not yet acknowledged. |
Initialize the message queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this queue entity. |
required |
delivery_latency
|
float
|
Latency to deliver message to consumer. |
0.001
|
redelivery_delay
|
float
|
Delay before redelivering unacknowledged message. |
30.0
|
max_redeliveries
|
int
|
Maximum redelivery attempts before dead-lettering. |
3
|
capacity
|
int | None
|
Maximum queue size (None for unlimited). |
None
|
dead_letter_queue
|
DeadLetterQueue | None
|
Optional DLQ for failed messages. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If parameters are invalid. |
subscribe ¶
Subscribe a consumer to receive messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer
|
Entity
|
The entity to receive messages. |
required |
unsubscribe ¶
Unsubscribe a consumer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer
|
Entity
|
The entity to remove. |
required |
publish ¶
Publish a message to the queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Event
|
The event to publish. |
required |
Yields:
| Type | Description |
|---|---|
float
|
Publishing latency. |
Returns:
| Type | Description |
|---|---|
str
|
The message ID. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If queue is at capacity. |
acknowledge ¶
Acknowledge successful processing of a message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id
|
str
|
ID of the message to acknowledge. |
required |
reject ¶
Reject a message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id
|
str
|
ID of the message to reject. |
required |
requeue
|
bool
|
If True, requeue for redelivery. If False, discard or DLQ. |
True
|
poll ¶
Poll for the next message to deliver.
Yields:
| Type | Description |
|---|---|
float
|
Polling/delivery latency. |
Returns:
| Type | Description |
|---|---|
Event | None
|
Delivery event or None if queue empty or no consumers. |
schedule_redelivery ¶
Schedule redelivery of an unacknowledged message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id
|
str
|
ID of message to redeliver. |
required |
Returns:
| Type | Description |
|---|---|
Event | None
|
Redelivery event, or None if message not found. |
get_message ¶
Get a message by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id
|
str
|
The message ID. |
required |
Returns:
| Type | Description |
|---|---|
Message | None
|
The message, or None if not found. |
handle_event ¶
Handle queue events.
MessageQueueStats
dataclass
¶
MessageState ¶
Bases: Enum
State of a message in the queue.
Subscription
dataclass
¶
Subscription(
subscriber: Entity,
subscribed_at: Instant,
messages_received: int = 0,
active: bool = True,
)
Represents a subscription to a topic.
Topic ¶
Bases: Entity
Pub/sub topic with multiple subscribers.
Broadcasts published messages to all active subscribers. Each subscriber receives a copy of every message.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
|
subscriber_count |
int
|
Number of active subscribers. |
Initialize the topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this topic entity. |
required |
delivery_latency
|
float
|
Latency to deliver message to each subscriber. |
0.001
|
max_subscribers
|
int | None
|
Maximum number of subscribers (None for unlimited). |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If parameters are invalid. |
subscribe ¶
Subscribe an entity to the topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
subscriber
|
Entity
|
The entity to subscribe. |
required |
replay_history
|
bool
|
If True, deliver historical messages. |
False
|
Returns:
| Type | Description |
|---|---|
list[Event]
|
List of historical message events (if replay_history=True). |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If max subscribers reached. |
unsubscribe ¶
Unsubscribe an entity from the topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
subscriber
|
Entity
|
The entity to unsubscribe. |
required |
publish ¶
publish_sync ¶
set_retain_messages ¶
Configure message retention.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retain
|
bool
|
Whether to retain messages. |
required |
max_history
|
int
|
Maximum messages to retain. |
100
|
get_subscription ¶
Get subscription details for a subscriber.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
subscriber
|
Entity
|
The subscriber entity. |
required |
Returns:
| Type | Description |
|---|---|
Subscription | None
|
Subscription details, or None if not subscribed. |
handle_event ¶
Handle topic events.
TopicStats
dataclass
¶
TopicStats(
messages_published: int = 0,
messages_delivered: int = 0,
subscribers_added: int = 0,
subscribers_removed: int = 0,
delivery_latencies: tuple[float, ...] = (),
)
Statistics tracked by Topic.