Streaming¶
Event logs, consumer groups, stream processors, and windowing strategies.
Streaming infrastructure components for event-driven simulation.
Provides Kafka-inspired streaming primitives:
- EventLog: Append-only partitioned log with retention policies
- ConsumerGroup: Coordinated consumer management with partition assignment
- StreamProcessor: Stateful windowed stream processing
ConsumerGroup ¶
ConsumerGroup(
name: str,
event_log: EventLog,
assignment_strategy: PartitionAssignment | None = None,
rebalance_delay: float = 0.5,
poll_latency: float = 0.001,
session_timeout: float | None = None,
)
Bases: Entity
Coordinated consumer group over an EventLog.
Manages partition assignment, offset tracking, and rebalancing when consumers join or leave.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
Initialize the consumer group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this group entity. |
required |
event_log
|
EventLog
|
The EventLog to consume from. |
required |
assignment_strategy
|
PartitionAssignment | None
|
Strategy for assigning partitions. Defaults to RangeAssignment. |
None
|
rebalance_delay
|
float
|
Simulated rebalance latency in seconds. |
0.5
|
poll_latency
|
float
|
Simulated poll latency in seconds. |
0.001
|
session_timeout
|
float | None
|
Optional session timeout (not enforced in this implementation, reserved for future use). |
None
|
consumer_lag ¶
Per-partition lag for a consumer.
Lag = event_log high watermark - committed offset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer_name
|
str
|
The consumer to check. |
required |
Returns:
| Type | Description |
|---|---|
dict[int, int]
|
Mapping of partition ID to lag. |
join ¶
join(
consumer_name: str, consumer_entity: Entity
) -> Generator[
float | SimFuture | tuple[float, list[Event]],
None,
list[int],
]
Join a consumer to the group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer_name
|
str
|
Unique name for the consumer. |
required |
consumer_entity
|
Entity
|
The entity to receive polled records. |
required |
Yields:
| Type | Description |
|---|---|
float | SimFuture | tuple[float, list[Event]]
|
Rebalance delay. |
Returns:
| Type | Description |
|---|---|
list[int]
|
List of assigned partition IDs. |
leave ¶
poll ¶
poll(
consumer_name: str, max_records: int = 100
) -> Generator[
float | SimFuture | tuple[float, list[Event]],
None,
list[Record],
]
Poll for records from assigned partitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer_name
|
str
|
The polling consumer. |
required |
max_records
|
int
|
Maximum records to return. |
100
|
Yields:
| Type | Description |
|---|---|
float | SimFuture | tuple[float, list[Event]]
|
Poll latency. |
Returns:
| Type | Description |
|---|---|
list[Record]
|
List of records from assigned partitions. |
commit ¶
commit(
consumer_name: str, offsets: dict[int, int]
) -> Generator[float | tuple[float, list[Event]]]
Commit offsets for a consumer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer_name
|
str
|
The committing consumer. |
required |
offsets
|
dict[int, int]
|
Mapping of partition ID to committed offset. |
required |
Yields:
| Type | Description |
|---|---|
Generator[float | tuple[float, list[Event]]]
|
Minimal latency. |
handle_event ¶
Handle consumer group events.
ConsumerGroupStats
dataclass
¶
ConsumerGroupStats(
joins: int = 0,
leaves: int = 0,
rebalances: int = 0,
polls: int = 0,
commits: int = 0,
records_polled: int = 0,
)
Statistics tracked by ConsumerGroup.
Attributes:
| Name | Type | Description |
|---|---|---|
joins |
int
|
Number of consumer joins. |
leaves |
int
|
Number of consumer leaves. |
rebalances |
int
|
Number of rebalance events. |
polls |
int
|
Number of poll operations. |
commits |
int
|
Number of commit operations. |
records_polled |
int
|
Total records returned by polls. |
ConsumerState ¶
Bases: Enum
State of a consumer within the group.
PartitionAssignment ¶
Bases: Protocol
Protocol for partition assignment strategies.
assign
abstractmethod
¶
Assign partitions to consumers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partitions
|
list[int]
|
Available partition IDs. |
required |
consumers
|
list[str]
|
Consumer names (sorted). |
required |
Returns:
| Type | Description |
|---|---|
dict[str, list[int]]
|
Mapping of consumer name to assigned partition IDs. |
RangeAssignment ¶
Contiguous range assignment.
Divides partitions into contiguous ranges. Remainder partitions go to the earliest consumers.
RoundRobinAssignment ¶
Round-robin partition assignment.
Distributes partitions in a round-robin fashion across consumers.
StickyAssignment ¶
Sticky partition assignment.
Tracks the previous assignment and minimizes partition movement when consumers join or leave.
EventLog ¶
EventLog(
name: str,
num_partitions: int = 4,
sharding_strategy: Any | None = None,
retention_policy: RetentionPolicy | None = None,
append_latency: float = 0.001,
read_latency: float = 0.0005,
retention_check_interval: float = 60.0,
)
Bases: Entity
Append-only partitioned event log.
Records are routed to partitions via a sharding strategy (default: hash-based). Each partition maintains an ordered list of records and a monotonically increasing high-watermark offset.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
Initialize the event log.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this log entity. |
required |
num_partitions
|
int
|
Number of partitions. |
4
|
sharding_strategy
|
Any | None
|
Strategy for routing keys to partitions. Defaults to HashSharding. |
None
|
retention_policy
|
RetentionPolicy | None
|
Optional policy for expiring old records. |
None
|
append_latency
|
float
|
Simulated append latency in seconds. |
0.001
|
read_latency
|
float
|
Simulated read latency in seconds. |
0.0005
|
retention_check_interval
|
float
|
Seconds between retention sweeps. |
60.0
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If num_partitions < 1. |
high_watermark ¶
High watermark for a partition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partition_id
|
int
|
Partition index. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The next offset that will be assigned. |
append ¶
append(
key: str, value: Any
) -> Generator[
float | SimFuture | tuple[float, list[Event]],
None,
Record,
]
Append a record to the log.
Convenience generator for yield from in entity handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Routing key. |
required |
value
|
Any
|
Record payload. |
required |
Yields:
| Type | Description |
|---|---|
float | SimFuture | tuple[float, list[Event]]
|
Append latency. |
Returns:
| Type | Description |
|---|---|
Record
|
The appended Record. |
read ¶
read(
partition_id: int,
offset: int = 0,
max_records: int = 100,
) -> Generator[
float | SimFuture | tuple[float, list[Event]],
None,
list[Record],
]
Read records from a partition.
Convenience generator for yield from in entity handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partition_id
|
int
|
Partition to read from. |
required |
offset
|
int
|
Starting offset. |
0
|
max_records
|
int
|
Maximum records to return. |
100
|
Yields:
| Type | Description |
|---|---|
float | SimFuture | tuple[float, list[Event]]
|
Read latency. |
Returns:
| Type | Description |
|---|---|
list[Record]
|
List of records. |
handle_event ¶
Handle log events.
EventLogStats
dataclass
¶
EventLogStats(
records_appended: int = 0,
records_read: int = 0,
records_expired: int = 0,
per_partition_appends: dict[int, int] = dict(),
append_latencies: tuple[float, ...] = (),
)
Statistics tracked by EventLog.
Attributes:
| Name | Type | Description |
|---|---|---|
records_appended |
int
|
Total records appended across all partitions. |
records_read |
int
|
Total records returned by read operations. |
records_expired |
int
|
Total records removed by retention. |
per_partition_appends |
dict[int, int]
|
Append count per partition. |
append_latencies |
tuple[float, ...]
|
Tuple of individual append latencies. |
Partition
dataclass
¶
Internal state of a single partition.
Attributes:
| Name | Type | Description |
|---|---|---|
id |
int
|
Partition identifier. |
records |
list[Record]
|
Ordered list of records. |
high_watermark |
int
|
Offset of next record to be written. |
Record
dataclass
¶
An immutable record in the event log.
Attributes:
| Name | Type | Description |
|---|---|---|
offset |
int
|
Position within the partition. |
key |
str
|
Routing key used for partition assignment. |
value |
Any
|
The record payload. |
timestamp |
float
|
Simulation time when the record was appended. |
partition |
int
|
Partition ID this record belongs to. |
RetentionPolicy ¶
Bases: Protocol
Protocol for log retention policies.
SizeRetention ¶
Retain at most max_records per partition.
When a partition exceeds this limit, the oldest records are expired during the retention check cycle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_records
|
int
|
Maximum number of records per partition. |
required |
TimeRetention ¶
Retain records younger than a maximum age.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_age_s
|
float
|
Maximum record age in seconds. |
required |
LateEventPolicy ¶
Bases: Enum
How to handle events that arrive after their window has closed.
SessionWindow ¶
Dynamic windows based on activity gaps.
Events within the gap threshold are merged into the same session.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gap_s
|
float
|
Maximum inactivity gap in seconds. |
required |
SlidingWindow ¶
Overlapping windows with fixed size and slide interval.
Each event may belong to multiple windows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
size_s
|
float
|
Window size in seconds. |
required |
slide_s
|
float
|
Slide interval in seconds. |
required |
StreamProcessor ¶
StreamProcessor(
name: str,
window_type: WindowType,
aggregate_fn: Callable[[list[Any]], Any],
downstream: Entity,
allowed_lateness_s: float = 0.0,
late_event_policy: LateEventPolicy = LateEventPolicy.DROP,
side_output: Entity | None = None,
watermark_interval_s: float = 1.0,
)
Bases: Entity
Stateful windowed stream processor.
Receives events with event times, assigns them to windows, and emits aggregated results when windows close based on watermark progression.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
Initialize the stream processor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this processor entity. |
required |
window_type
|
WindowType
|
Window assignment strategy. |
required |
aggregate_fn
|
Callable[[list[Any]], Any]
|
Function to aggregate records in a window. |
required |
downstream
|
Entity
|
Entity to receive WindowResult events. |
required |
allowed_lateness_s
|
float
|
Grace period after window closes for late events. |
0.0
|
late_event_policy
|
LateEventPolicy
|
How to handle late events. |
DROP
|
side_output
|
Entity | None
|
Entity to receive LateEvent events (for SIDE_OUTPUT policy). |
None
|
watermark_interval_s
|
float
|
Interval between watermark self-scheduling. |
1.0
|
StreamProcessorStats
dataclass
¶
StreamProcessorStats(
events_processed: int = 0,
windows_emitted: int = 0,
late_events: int = 0,
late_events_dropped: int = 0,
late_events_updated: int = 0,
late_events_side_output: int = 0,
)
Statistics tracked by StreamProcessor.
Attributes:
| Name | Type | Description |
|---|---|---|
events_processed |
int
|
Total events received. |
windows_emitted |
int
|
Total windows emitted. |
late_events |
int
|
Total late events received. |
late_events_dropped |
int
|
Late events that were dropped. |
late_events_updated |
int
|
Late events applied to closed windows. |
late_events_side_output |
int
|
Late events sent to side output. |
TumblingWindow ¶
Non-overlapping fixed-size windows.
Each event belongs to exactly one window.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
size_s
|
float
|
Window size in seconds. |
required |
WindowState
dataclass
¶
State of an active window.
Attributes:
| Name | Type | Description |
|---|---|---|
start |
float
|
Window start time. |
end |
float
|
Window end time. |
records |
list[Any]
|
Records accumulated in this window. |
emitted |
bool
|
Whether the window result has been emitted. |
WindowType ¶
Bases: Protocol
Protocol for window assignment strategies.
assign_windows
abstractmethod
¶
Assign an event to windows based on its event time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_time_s
|
float
|
Event time in seconds. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[float, float]]
|
List of (window_start, window_end) tuples. |
should_close
abstractmethod
¶
Whether a window should close given the current watermark.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
window_end
|
float
|
End time of the window. |
required |
watermark_s
|
float
|
Current watermark in seconds. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the window should be closed and emitted. |