Skip to content

Instrumentation

Data collection, probes, latency/throughput trackers, and simulation summaries.

Instrumentation, tracing, and measurement components.

LatencyTracker

LatencyTracker(name: str = 'LatencyTracker')

Bases: Entity

Records end-to-end latency from event context['created_at'].

Drop-in replacement for the custom LatencyTrackingSink that every example reimplements. Uses the 'created_at' field that Event.post_init sets automatically.

Stores (completion_time_s, latency_s) pairs in self.data.

Note: Memory usage grows linearly with the number of events processed (~72 bytes per event). For simulations with millions of events, consider using TDigest for approximate percentiles or periodic aggregation to bound memory usage.

p50

p50() -> float

50th percentile latency in seconds.

p99

p99() -> float

99th percentile latency in seconds.

mean_latency

mean_latency() -> float

Mean latency in seconds.

summary

summary(window_s: float = 1.0) -> BucketedData

Bucket latencies by time window.

ThroughputTracker

ThroughputTracker(name: str = 'ThroughputTracker')

Bases: Entity

Counts events per time window for throughput analysis.

Records one sample per event received. Use .throughput() to get events-per-window bucketed by time.

throughput

throughput(window_s: float = 1.0) -> BucketedData

Returns events-per-window bucketed by time.

The 'sum' field in each bucket equals the event count for that window.

BucketedData

BucketedData()

Time-windowed aggregation result from Data.bucket().

times

times() -> list[float]

Bucket start times in seconds.

means

means() -> list[float]

Mean value per bucket.

counts

counts() -> list[int]

Number of samples per bucket.

maxes

maxes() -> list[float]

Maximum value per bucket.

sums

sums() -> list[float]

Sum of values per bucket.

p50s

p50s() -> list[float]

50th percentile (median) per bucket.

p99s

p99s() -> list[float]

99th percentile per bucket.

to_dict

to_dict() -> dict[str, list]

Return dict with keys: time_s, mean, p50, p99, max, count, sum.

Data

Data()

Container for timestamped metric samples with analysis utilities.

Stores (time, value) pairs for post-simulation analysis. Values should be numeric (int or float) for aggregation methods to work.

Samples are stored in append order. For time-ordered data, ensure add_stat is called with non-decreasing times.

values property

values: list[tuple[float, Any]]

All recorded samples as (time_seconds, value) tuples.

add_stat

add_stat(value: Any, time: Instant) -> None

Record a data point at the given simulation time.

Parameters:

Name Type Description Default
value Any

The metric value to record.

required
time Instant

The simulation time of this sample.

required

clear

clear() -> None

Remove all recorded samples.

between

between(start_s: float, end_s: float) -> Data

Return a new Data with samples in [start, end).

Parameters:

Name Type Description Default
start_s float

Start time in seconds (inclusive).

required
end_s float

End time in seconds (exclusive).

required

mean

mean() -> float

Mean of sample values. Returns 0.0 if empty.

min

min() -> float

Minimum sample value. Returns 0.0 if empty.

max

max() -> float

Maximum sample value. Returns 0.0 if empty.

percentile

percentile(p: float) -> float

Calculate percentile from sample values.

Parameters:

Name Type Description Default
p float

Percentile in [0, 1]. E.g., 0.99 for p99.

required

Returns:

Type Description
float

Interpolated percentile value, or 0.0 if empty.

count

count() -> int

Number of recorded samples.

sum

sum() -> float

Sum of sample values. Returns 0.0 if empty.

std

std() -> float

Population standard deviation of sample values. Returns 0.0 if fewer than 2 samples.

bucket

bucket(window_s: float = 1.0) -> BucketedData

Group samples into fixed-width time windows.

Parameters:

Name Type Description Default
window_s float

Width of each bucket in seconds.

1.0

Returns:

Type Description
BucketedData

BucketedData with per-bucket aggregations.

times

times() -> list[float]

Just the timestamps from all samples.

raw_values

raw_values() -> list[float]

Just the values from all samples.

rate

rate(window_s: float = 1.0) -> Data

Compute rate of change (count per window) over time windows.

Useful for throughput data where each sample represents one event.

Parameters:

Name Type Description Default
window_s float

Width of each time window in seconds.

1.0

Returns:

Type Description
Data

New Data with (bucket_start, count/window_s) pairs.

Probe

Probe(
    target: Entity,
    metric: str,
    data: Data,
    interval: float = 1.0,
    start_time: Instant | None = None,
)

Bases: Source

Periodic metric sampler for monitoring entity state over time.

Extends Source to poll a metric from a target entity at fixed intervals. The sampled values are stored in a Data container for post-simulation analysis or visualization.

Probes run as daemon events, meaning they do not prevent the simulation from auto-terminating when all primary events are processed.

Parameters:

Name Type Description Default
target Entity

The entity to measure.

required
metric str

Attribute or property name to sample (accessed via getattr).

required
data Data

Data container to store samples.

required
interval float

Seconds between measurements. Defaults to 1.0.

1.0
start_time Instant | None

When to begin probing. Defaults to Instant.Epoch.

None

on classmethod

on(
    target: Entity, metric: str, interval: float = 1.0
) -> tuple[Probe, Data]

Create a Probe and its Data container in one call.

Parameters:

Name Type Description Default
target Entity

The entity to measure.

required
metric str

Attribute or property name to sample.

required
interval float

Seconds between measurements. Defaults to 1.0.

1.0

Returns:

Type Description
Probe

(probe, data) tuple. Pass probe to Simulation(probes=[...]),

Data

use data for post-simulation analysis.

on_many classmethod

on_many(
    target: Entity,
    metrics: list[str],
    interval: float = 1.0,
) -> tuple[list[Probe], dict[str, Data]]

Create Probes for multiple metrics on the same target.

Parameters:

Name Type Description Default
target Entity

The entity to measure.

required
metrics list[str]

List of attribute/property names to sample.

required
interval float

Seconds between measurements. Defaults to 1.0.

1.0

Returns:

Type Description
tuple[list[Probe], dict[str, Data]]

(probes_list, data_dict) where data_dict is keyed by metric name.

InMemoryTraceRecorder dataclass

InMemoryTraceRecorder(spans: list[dict[str, Any]] = list())

Trace recorder that stores spans in memory.

Useful for debugging, testing, and post-simulation analysis. Provides filtering methods to query specific span types or events.

Attributes:

Name Type Description
spans list[dict[str, Any]]

List of recorded spans as dictionaries.

clear

clear() -> None

Clear all recorded spans.

filter_by_kind

filter_by_kind(kind: str) -> list[dict[str, Any]]

Return spans matching the given kind.

filter_by_event

filter_by_event(event_id: str) -> list[dict[str, Any]]

Return spans for a specific event ID.

NullTraceRecorder dataclass

NullTraceRecorder()

No-op recorder that discards all traces.

Use when tracing is disabled for performance.

TraceRecorder

Bases: Protocol

Protocol defining the trace recording interface.

Implementations can store traces in memory, write to files, send to external monitoring systems, or discard them entirely.

record

record(
    *,
    time: Instant,
    kind: str,
    event_id: str | None = None,
    event_type: str | None = None,
    **data: Any,
) -> None

Record an engine-level trace span.

Parameters:

Name Type Description Default
time Instant

Simulation time when the span occurred.

required
kind str

Category (e.g., "heap.push", "heap.pop", "simulation.dequeue").

required
event_id str | None

ID of the associated event (from event.context["id"]).

None
event_type str | None

Type of the associated event.

None
**data Any

Additional structured data for the span.

{}

EntitySummary dataclass

EntitySummary(
    name: str,
    entity_type: str,
    events_handled: int,
    queue_stats: QueueStats | None = None,
)

Per-entity statistics from a simulation run.

QueueStats dataclass

QueueStats(
    peak_depth: int, total_accepted: int, total_dropped: int
)

Queue-specific statistics for QueuedResource entities.

SimulationSummary dataclass

SimulationSummary(
    duration_s: float,
    total_events_processed: int,
    events_cancelled: int = 0,
    events_per_second: float = 0.0,
    wall_clock_seconds: float = 0.0,
    entities: dict[str, EntitySummary] = dict(),
)

Auto-generated summary of a simulation run.

Returned by Simulation.run() and also accessible via Simulation.summary.