Skip to content

Streaming

Event logs, consumer groups, stream processors, and windowing strategies.

Streaming infrastructure components for event-driven simulation.

Provides Kafka-inspired streaming primitives:

  • EventLog: Append-only partitioned log with retention policies
  • ConsumerGroup: Coordinated consumer management with partition assignment
  • StreamProcessor: Stateful windowed stream processing

ConsumerGroup

ConsumerGroup(
    name: str,
    event_log: EventLog,
    assignment_strategy: PartitionAssignment | None = None,
    rebalance_delay: float = 0.5,
    poll_latency: float = 0.001,
    session_timeout: float | None = None,
)

Bases: Entity

Coordinated consumer group over an EventLog.

Manages partition assignment, offset tracking, and rebalancing when consumers join or leave.

Attributes:

Name Type Description
name

Entity name for identification.

Initialize the consumer group.

Parameters:

Name Type Description Default
name str

Name for this group entity.

required
event_log EventLog

The EventLog to consume from.

required
assignment_strategy PartitionAssignment | None

Strategy for assigning partitions. Defaults to RangeAssignment.

None
rebalance_delay float

Simulated rebalance latency in seconds.

0.5
poll_latency float

Simulated poll latency in seconds.

0.001
session_timeout float | None

Optional session timeout (not enforced in this implementation, reserved for future use).

None

stats property

stats: ConsumerGroupStats

Return a frozen snapshot of current statistics.

consumer_count property

consumer_count: int

Number of active consumers.

consumers property

consumers: list[str]

Names of active consumers.

assignments property

assignments: dict[str, list[int]]

Current partition assignments.

generation property

generation: int

Current group generation (incremented on each rebalance).

consumer_lag

consumer_lag(consumer_name: str) -> dict[int, int]

Per-partition lag for a consumer.

Lag = event_log high watermark - committed offset.

Parameters:

Name Type Description Default
consumer_name str

The consumer to check.

required

Returns:

Type Description
dict[int, int]

Mapping of partition ID to lag.

total_lag

total_lag() -> int

Total lag across all consumers and partitions.

join

join(
    consumer_name: str, consumer_entity: Entity
) -> Generator[
    float | SimFuture | tuple[float, list[Event]],
    None,
    list[int],
]

Join a consumer to the group.

Parameters:

Name Type Description Default
consumer_name str

Unique name for the consumer.

required
consumer_entity Entity

The entity to receive polled records.

required

Yields:

Type Description
float | SimFuture | tuple[float, list[Event]]

Rebalance delay.

Returns:

Type Description
list[int]

List of assigned partition IDs.

leave

leave(
    consumer_name: str,
) -> Generator[
    float | SimFuture | tuple[float, list[Event]]
]

Remove a consumer from the group.

Parameters:

Name Type Description Default
consumer_name str

Name of the consumer to remove.

required

Yields:

Type Description
Generator[float | SimFuture | tuple[float, list[Event]]]

Rebalance delay.

poll

poll(
    consumer_name: str, max_records: int = 100
) -> Generator[
    float | SimFuture | tuple[float, list[Event]],
    None,
    list[Record],
]

Poll for records from assigned partitions.

Parameters:

Name Type Description Default
consumer_name str

The polling consumer.

required
max_records int

Maximum records to return.

100

Yields:

Type Description
float | SimFuture | tuple[float, list[Event]]

Poll latency.

Returns:

Type Description
list[Record]

List of records from assigned partitions.

commit

commit(
    consumer_name: str, offsets: dict[int, int]
) -> Generator[float | tuple[float, list[Event]]]

Commit offsets for a consumer.

Parameters:

Name Type Description Default
consumer_name str

The committing consumer.

required
offsets dict[int, int]

Mapping of partition ID to committed offset.

required

Yields:

Type Description
Generator[float | tuple[float, list[Event]]]

Minimal latency.

handle_event

handle_event(
    event: Event,
) -> Generator[float, None, list[Event] | None]

Handle consumer group events.

ConsumerGroupStats dataclass

ConsumerGroupStats(
    joins: int = 0,
    leaves: int = 0,
    rebalances: int = 0,
    polls: int = 0,
    commits: int = 0,
    records_polled: int = 0,
)

Statistics tracked by ConsumerGroup.

Attributes:

Name Type Description
joins int

Number of consumer joins.

leaves int

Number of consumer leaves.

rebalances int

Number of rebalance events.

polls int

Number of poll operations.

commits int

Number of commit operations.

records_polled int

Total records returned by polls.

ConsumerState

Bases: Enum

State of a consumer within the group.

PartitionAssignment

Bases: Protocol

Protocol for partition assignment strategies.

assign abstractmethod

assign(
    partitions: list[int], consumers: list[str]
) -> dict[str, list[int]]

Assign partitions to consumers.

Parameters:

Name Type Description Default
partitions list[int]

Available partition IDs.

required
consumers list[str]

Consumer names (sorted).

required

Returns:

Type Description
dict[str, list[int]]

Mapping of consumer name to assigned partition IDs.

RangeAssignment

Contiguous range assignment.

Divides partitions into contiguous ranges. Remainder partitions go to the earliest consumers.

RoundRobinAssignment

Round-robin partition assignment.

Distributes partitions in a round-robin fashion across consumers.

StickyAssignment

StickyAssignment()

Sticky partition assignment.

Tracks the previous assignment and minimizes partition movement when consumers join or leave.

EventLog

EventLog(
    name: str,
    num_partitions: int = 4,
    sharding_strategy: Any | None = None,
    retention_policy: RetentionPolicy | None = None,
    append_latency: float = 0.001,
    read_latency: float = 0.0005,
    retention_check_interval: float = 60.0,
)

Bases: Entity

Append-only partitioned event log.

Records are routed to partitions via a sharding strategy (default: hash-based). Each partition maintains an ordered list of records and a monotonically increasing high-watermark offset.

Attributes:

Name Type Description
name

Entity name for identification.

Initialize the event log.

Parameters:

Name Type Description Default
name str

Name for this log entity.

required
num_partitions int

Number of partitions.

4
sharding_strategy Any | None

Strategy for routing keys to partitions. Defaults to HashSharding.

None
retention_policy RetentionPolicy | None

Optional policy for expiring old records.

None
append_latency float

Simulated append latency in seconds.

0.001
read_latency float

Simulated read latency in seconds.

0.0005
retention_check_interval float

Seconds between retention sweeps.

60.0

Raises:

Type Description
ValueError

If num_partitions < 1.

stats property

stats: EventLogStats

Return a frozen snapshot of current statistics.

num_partitions property

num_partitions: int

Number of partitions.

partitions property

partitions: list[Partition]

The partition objects.

total_records property

total_records: int

Total records across all partitions.

high_watermark

high_watermark(partition_id: int) -> int

High watermark for a partition.

Parameters:

Name Type Description Default
partition_id int

Partition index.

required

Returns:

Type Description
int

The next offset that will be assigned.

high_watermarks

high_watermarks() -> dict[int, int]

High watermarks for all partitions.

append

append(
    key: str, value: Any
) -> Generator[
    float | SimFuture | tuple[float, list[Event]],
    None,
    Record,
]

Append a record to the log.

Convenience generator for yield from in entity handlers.

Parameters:

Name Type Description Default
key str

Routing key.

required
value Any

Record payload.

required

Yields:

Type Description
float | SimFuture | tuple[float, list[Event]]

Append latency.

Returns:

Type Description
Record

The appended Record.

read

read(
    partition_id: int,
    offset: int = 0,
    max_records: int = 100,
) -> Generator[
    float | SimFuture | tuple[float, list[Event]],
    None,
    list[Record],
]

Read records from a partition.

Convenience generator for yield from in entity handlers.

Parameters:

Name Type Description Default
partition_id int

Partition to read from.

required
offset int

Starting offset.

0
max_records int

Maximum records to return.

100

Yields:

Type Description
float | SimFuture | tuple[float, list[Event]]

Read latency.

Returns:

Type Description
list[Record]

List of records.

handle_event

handle_event(
    event: Event,
) -> Generator[float, None, list[Event] | None]

Handle log events.

EventLogStats dataclass

EventLogStats(
    records_appended: int = 0,
    records_read: int = 0,
    records_expired: int = 0,
    per_partition_appends: dict[int, int] = dict(),
    append_latencies: tuple[float, ...] = (),
)

Statistics tracked by EventLog.

Attributes:

Name Type Description
records_appended int

Total records appended across all partitions.

records_read int

Total records returned by read operations.

records_expired int

Total records removed by retention.

per_partition_appends dict[int, int]

Append count per partition.

append_latencies tuple[float, ...]

Tuple of individual append latencies.

Partition dataclass

Partition(
    id: int,
    records: list[Record] = list(),
    high_watermark: int = 0,
)

Internal state of a single partition.

Attributes:

Name Type Description
id int

Partition identifier.

records list[Record]

Ordered list of records.

high_watermark int

Offset of next record to be written.

Record dataclass

Record(
    offset: int,
    key: str,
    value: Any,
    timestamp: float,
    partition: int,
)

An immutable record in the event log.

Attributes:

Name Type Description
offset int

Position within the partition.

key str

Routing key used for partition assignment.

value Any

The record payload.

timestamp float

Simulation time when the record was appended.

partition int

Partition ID this record belongs to.

RetentionPolicy

Bases: Protocol

Protocol for log retention policies.

should_retain abstractmethod

should_retain(
    record: Record, current_time_s: float
) -> bool

Whether a record should be kept.

Parameters:

Name Type Description Default
record Record

The record to evaluate.

required
current_time_s float

Current simulation time in seconds.

required

Returns:

Type Description
bool

True if the record should be retained.

SizeRetention

SizeRetention(max_records: int)

Retain at most max_records per partition.

When a partition exceeds this limit, the oldest records are expired during the retention check cycle.

Parameters:

Name Type Description Default
max_records int

Maximum number of records per partition.

required

TimeRetention

TimeRetention(max_age_s: float)

Retain records younger than a maximum age.

Parameters:

Name Type Description Default
max_age_s float

Maximum record age in seconds.

required

LateEventPolicy

Bases: Enum

How to handle events that arrive after their window has closed.

SessionWindow

SessionWindow(gap_s: float)

Dynamic windows based on activity gaps.

Events within the gap threshold are merged into the same session.

Parameters:

Name Type Description Default
gap_s float

Maximum inactivity gap in seconds.

required

SlidingWindow

SlidingWindow(size_s: float, slide_s: float)

Overlapping windows with fixed size and slide interval.

Each event may belong to multiple windows.

Parameters:

Name Type Description Default
size_s float

Window size in seconds.

required
slide_s float

Slide interval in seconds.

required

StreamProcessor

StreamProcessor(
    name: str,
    window_type: WindowType,
    aggregate_fn: Callable[[list[Any]], Any],
    downstream: Entity,
    allowed_lateness_s: float = 0.0,
    late_event_policy: LateEventPolicy = LateEventPolicy.DROP,
    side_output: Entity | None = None,
    watermark_interval_s: float = 1.0,
)

Bases: Entity

Stateful windowed stream processor.

Receives events with event times, assigns them to windows, and emits aggregated results when windows close based on watermark progression.

Attributes:

Name Type Description
name

Entity name for identification.

Initialize the stream processor.

Parameters:

Name Type Description Default
name str

Name for this processor entity.

required
window_type WindowType

Window assignment strategy.

required
aggregate_fn Callable[[list[Any]], Any]

Function to aggregate records in a window.

required
downstream Entity

Entity to receive WindowResult events.

required
allowed_lateness_s float

Grace period after window closes for late events.

0.0
late_event_policy LateEventPolicy

How to handle late events.

DROP
side_output Entity | None

Entity to receive LateEvent events (for SIDE_OUTPUT policy).

None
watermark_interval_s float

Interval between watermark self-scheduling.

1.0

stats property

stats: StreamProcessorStats

Return a frozen snapshot of current statistics.

watermark_s property

watermark_s: float

Current watermark in seconds.

active_windows property

active_windows: int

Number of active (non-emitted) windows across all keys.

total_windows_emitted property

total_windows_emitted: int

Total windows emitted.

handle_event

handle_event(
    event: Event,
) -> Generator[float, None, list[Event] | None]

Handle stream processor events.

StreamProcessorStats dataclass

StreamProcessorStats(
    events_processed: int = 0,
    windows_emitted: int = 0,
    late_events: int = 0,
    late_events_dropped: int = 0,
    late_events_updated: int = 0,
    late_events_side_output: int = 0,
)

Statistics tracked by StreamProcessor.

Attributes:

Name Type Description
events_processed int

Total events received.

windows_emitted int

Total windows emitted.

late_events int

Total late events received.

late_events_dropped int

Late events that were dropped.

late_events_updated int

Late events applied to closed windows.

late_events_side_output int

Late events sent to side output.

TumblingWindow

TumblingWindow(size_s: float)

Non-overlapping fixed-size windows.

Each event belongs to exactly one window.

Parameters:

Name Type Description Default
size_s float

Window size in seconds.

required

WindowState dataclass

WindowState(
    start: float,
    end: float,
    records: list[Any] = list(),
    emitted: bool = False,
)

State of an active window.

Attributes:

Name Type Description
start float

Window start time.

end float

Window end time.

records list[Any]

Records accumulated in this window.

emitted bool

Whether the window result has been emitted.

WindowType

Bases: Protocol

Protocol for window assignment strategies.

assign_windows abstractmethod

assign_windows(
    event_time_s: float,
) -> list[tuple[float, float]]

Assign an event to windows based on its event time.

Parameters:

Name Type Description Default
event_time_s float

Event time in seconds.

required

Returns:

Type Description
list[tuple[float, float]]

List of (window_start, window_end) tuples.

should_close abstractmethod

should_close(window_end: float, watermark_s: float) -> bool

Whether a window should close given the current watermark.

Parameters:

Name Type Description Default
window_end float

End time of the window.

required
watermark_s float

Current watermark in seconds.

required

Returns:

Type Description
bool

True if the window should be closed and emitted.