Skip to content

Industrial

Components for industrial and operations research simulation: conveyor belts, inspection stations, batch processors, shift schedules, inventory management, and more.

Industrial simulation components.

Reusable building blocks for operations research / industrial engineering simulations: bank tellers, manufacturing lines, warehouses, etc.

AppointmentScheduler

AppointmentScheduler(
    name: str,
    target: Entity,
    appointments: list[float],
    no_show_rate: float = 0.0,
    event_type: str = "Appointment",
)

Bases: Entity

Source-like entity that generates arrivals at fixed appointment times.

Appointments are defined as a list of times (in seconds). At each appointment time, the entity generates an arrival event with probability (1 - no_show_rate).

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
target Entity

Entity to receive appointment arrivals.

required
appointments list[float]

List of appointment times in seconds.

required
no_show_rate float

Probability of a no-show (0.0-1.0).

0.0
event_type str

Event type string for generated events.

'Appointment'

start_events

start_events() -> list[Event]

Create events for all appointments.

Schedule these into the simulation::

for e in scheduler.start_events():
    sim.schedule(e)

AppointmentStats dataclass

AppointmentStats(
    total_scheduled: int = 0,
    arrivals: int = 0,
    no_shows: int = 0,
)

Snapshot of appointment scheduler statistics.

BalkingQueue

BalkingQueue(
    inner: QueuePolicy[T],
    balk_threshold: int = 5,
    balk_probability: float = 1.0,
)

Bases: QueuePolicy[T]

Queue policy that probabilistically rejects items when depth exceeds a threshold.

When the inner queue's depth is at or above balk_threshold, new items are rejected with probability balk_probability. Items below the threshold are always accepted (subject to inner policy capacity).

Parameters:

Name Type Description Default
inner QueuePolicy[T]

The underlying queue policy to delegate to.

required
balk_threshold int

Queue depth at which balking begins.

5
balk_probability float

Probability of balking when at/above threshold (0.0-1.0).

1.0

BatchProcessor

BatchProcessor(
    name: str,
    downstream: Entity,
    batch_size: int = 10,
    process_time: float = 1.0,
    timeout_s: float = 0.0,
)

Bases: Entity

Entity that accumulates items into batches before processing.

Items are buffered until batch_size is reached or timeout_s elapses since the first item in the current batch arrived. The batch is then processed (yielding process_time) and all items forwarded to downstream.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
downstream Entity

Entity to receive processed batch items.

required
batch_size int

Number of items per batch.

10
process_time float

Seconds to process one complete batch.

1.0
timeout_s float

Maximum wait time before flushing a partial batch. Use 0 to disable timeout (only flush on full batch).

0.0

BatchProcessorStats dataclass

BatchProcessorStats(
    batches_processed: int = 0,
    items_processed: int = 0,
    timeouts: int = 0,
)

Snapshot of batch processor statistics.

Breakable

Bases: Protocol

Protocol for entities that can be broken down.

BreakdownScheduler

BreakdownScheduler(
    name: str,
    target: Entity,
    mean_time_to_failure: float = 100.0,
    mean_repair_time: float = 5.0,
)

Bases: Entity

Entity that schedules random breakdowns for a target.

Alternates between UP (exponential time-to-failure) and DOWN (repair time) states. During DOWN, sets target._broken = True so that has_capacity() implementations can check it.

The scheduler must be registered as an entity in the simulation and needs to be started by scheduling an initial event to it (or by calling start_event()).

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
target Entity

The entity subject to breakdowns.

required
mean_time_to_failure float

Mean time between breakdowns (seconds).

100.0
mean_repair_time float

Mean time to repair (seconds).

5.0

start_event

start_event() -> Event

Create the initial event that starts the breakdown cycle.

Schedule this event in the simulation to activate breakdowns::

sim.schedule(breakdown_scheduler.start_event())

BreakdownStats dataclass

BreakdownStats(
    breakdown_count: int = 0,
    total_downtime_s: float = 0.0,
    total_uptime_s: float = 0.0,
)

Snapshot of breakdown statistics.

availability property

availability: float

Fraction of time the machine was operational (0.0-1.0).

ConditionalRouter

ConditionalRouter(
    name: str,
    routes: list[tuple[Callable[[Event], bool], Entity]],
    default: Entity | None = None,
    drop_unmatched: bool = False,
)

Bases: Entity

Entity that routes events based on ordered predicate matching.

Each incoming event is tested against routes in order. The first (predicate, target) pair whose predicate returns True forwards the event to that target. If no predicate matches, the event goes to default (if set) or is dropped.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
routes list[tuple[Callable[[Event], bool], Entity]]

Ordered list of (predicate, target) pairs.

required
default Entity | None

Fallback target when no predicate matches.

None
drop_unmatched bool

If True (and no default), silently drop unmatched events. If False, log a warning on drop.

False

by_context_field classmethod

by_context_field(
    name: str,
    field: str,
    mapping: dict[str, Entity],
    default: Entity | None = None,
) -> ConditionalRouter

Create a router that dispatches based on a context field value.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
field str

The context key to inspect.

required
mapping dict[str, Entity]

Dict of {field_value: target_entity}.

required
default Entity | None

Fallback target for unmapped values.

None

RouterStats dataclass

RouterStats(
    total_routed: int = 0,
    dropped: int = 0,
    by_target: dict[str, int] = dict(),
)

Snapshot of router statistics.

ConveyorBelt

ConveyorBelt(
    name: str,
    downstream: Entity,
    transit_time: float,
    capacity: int = 0,
)

Bases: Entity

Entity that models fixed transit time between stations.

Receives events, holds them for transit_time seconds, then forwards to downstream. An optional capacity limits how many items can be in transit simultaneously.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
downstream Entity

Entity to forward events to after transit.

required
transit_time float

Seconds each item spends on the conveyor.

required
capacity int

Maximum items in transit at once (default unlimited).

0

ConveyorStats dataclass

ConveyorStats(
    items_transported: int = 0,
    items_in_transit: int = 0,
    items_rejected: int = 0,
)

Snapshot of conveyor belt statistics.

GateController

GateController(
    name: str,
    downstream: Entity,
    schedule: list[tuple[float, float]] | None = None,
    initially_open: bool = True,
    queue_capacity: int = 0,
)

Bases: Entity

Entity that opens/closes on schedule or programmatically.

When open, events pass through immediately. When closed, events are queued. On opening, the queue is flushed to downstream.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
downstream Entity

Entity to forward events to.

required
schedule list[tuple[float, float]] | None

List of (open_at_s, close_at_s) intervals.

None
initially_open bool

Whether the gate starts open.

True
queue_capacity int

Maximum queue size (0 = unlimited).

0

start_events

start_events() -> list[Event]

Create schedule-based open/close events.

open

open() -> list[Event]

Programmatically open the gate and flush queued events.

close

close() -> list[Event]

Programmatically close the gate.

GateStats dataclass

GateStats(
    passed_through: int = 0,
    queued_while_closed: int = 0,
    rejected: int = 0,
    open_cycles: int = 0,
    is_open: bool = True,
)

Snapshot of gate controller statistics.

InspectionStation

InspectionStation(
    name: str,
    pass_target: Entity,
    fail_target: Entity,
    inspection_time: float = 0.1,
    pass_rate: float = 0.95,
    policy: QueuePolicy | None = None,
)

Bases: QueuedResource

QueuedResource that inspects items and routes by pass/fail outcome.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
pass_target Entity

Entity to receive items that pass inspection.

required
fail_target Entity

Entity to receive items that fail inspection.

required
inspection_time float

Seconds per inspection.

0.1
pass_rate float

Probability of passing (0.0-1.0).

0.95
policy QueuePolicy | None

Queue ordering policy (default FIFO).

None

InspectionStats dataclass

InspectionStats(
    inspected: int = 0, passed: int = 0, failed: int = 0
)

Snapshot of inspection station statistics.

InventoryBuffer

InventoryBuffer(
    name: str,
    initial_stock: int = 100,
    reorder_point: int = 20,
    order_quantity: int = 50,
    lead_time: float = 5.0,
    supplier: Entity | None = None,
    downstream: Entity | None = None,
    stockout_target: Entity | None = None,
)

Bases: Entity

Entity managing stock with (s, Q) reorder policy.

When stock falls to or below reorder_point, a replenishment order of order_quantity is placed with the supplier, arriving after lead_time seconds.

Consume events are forwarded to downstream if provided and stock is available. If stock is zero, the event is counted as a stockout and optionally forwarded to stockout_target.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
initial_stock int

Starting inventory level.

100
reorder_point int

Stock level that triggers reorder (s).

20
order_quantity int

Amount to order on each reorder (Q).

50
lead_time float

Seconds for replenishment to arrive.

5.0
supplier Entity | None

Entity to receive replenishment orders (optional).

None
downstream Entity | None

Entity to forward satisfied demand to (optional).

None
stockout_target Entity | None

Entity to forward stockout events to (optional).

None

InventoryStats dataclass

InventoryStats(
    current_stock: int = 0,
    stockouts: int = 0,
    reorders: int = 0,
    items_consumed: int = 0,
    items_replenished: int = 0,
)

Snapshot of inventory statistics.

fill_rate property

fill_rate: float

Fraction of demand satisfied from stock (0.0-1.0).

PerishableInventory

PerishableInventory(
    name: str,
    initial_stock: int = 100,
    shelf_life_s: float = 3600.0,
    spoilage_check_interval_s: float = 60.0,
    reorder_point: int = 20,
    order_quantity: int = 50,
    lead_time: float = 5.0,
    downstream: Entity | None = None,
    waste_target: Entity | None = None,
    initial_stock_time: float | None = None,
)

Bases: Entity

Entity managing perishable stock with periodic spoilage checks.

Items are stored as (arrival_time, quantity) batches in a FIFO deque. A self-perpetuating spoilage check event periodically sweeps expired batches. The (s, Q) reorder policy triggers restocking when stock drops below reorder_point.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
initial_stock int

Starting inventory level.

100
shelf_life_s float

Seconds before items expire.

3600.0
spoilage_check_interval_s float

Seconds between spoilage sweeps.

60.0
reorder_point int

Stock level that triggers reorder.

20
order_quantity int

Amount to order on each reorder.

50
lead_time float

Seconds for replenishment to arrive.

5.0
downstream Entity | None

Entity to forward fulfilled demand to (optional).

None
waste_target Entity | None

Entity to notify of spoilage events (optional).

None

start_event

start_event() -> Event

Create the initial spoilage check event.

PerishableInventoryStats dataclass

PerishableInventoryStats(
    current_stock: int = 0,
    total_consumed: int = 0,
    total_spoiled: int = 0,
    stockouts: int = 0,
    reorders: int = 0,
)

Snapshot of perishable inventory statistics.

waste_rate property

waste_rate: float

Fraction of items spoiled vs total received (0.0-1.0).

PooledCycleResource

PooledCycleResource(
    name: str,
    pool_size: int,
    cycle_time: float,
    downstream: Entity | None = None,
    queue_capacity: int = 0,
)

Bases: Entity

Entity modeling a pool of identical units with fixed cycle times.

Each unit is discrete: when an event arrives, if a unit is available it begins a fixed-duration cycle. After cycle_time seconds the unit is released back to the pool and the event is forwarded downstream. If no unit is available, the event is queued (or rejected if at capacity).

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
pool_size int

Number of units in the pool.

required
cycle_time float

Duration of each use cycle in seconds.

required
downstream Entity | None

Entity to forward completed items to (optional).

None
queue_capacity int

Maximum queue size (0 = unlimited).

0

PooledCycleStats dataclass

PooledCycleStats(
    pool_size: int = 0,
    available: int = 0,
    active: int = 0,
    queued: int = 0,
    completed: int = 0,
    rejected: int = 0,
    utilization: float = 0.0,
)

Snapshot of pooled cycle resource statistics.

PreemptibleGrant

PreemptibleGrant(
    resource: PreemptibleResource,
    amount: int,
    priority: float,
    on_preempt: Callable[[], None] | None = None,
)

Handle to acquired preemptible resource capacity.

Like Grant but with preemption support. When preempted, the on_preempt callback fires and preempted becomes True.

Attributes:

Name Type Description
amount int

The amount of capacity held.

priority float

The priority of this grant (lower = higher priority).

released bool

Whether this grant has been released.

preempted bool

Whether this grant was preempted.

release

release() -> None

Return capacity to the resource. Idempotent.

PreemptibleResource

PreemptibleResource(name: str, capacity: int)

Bases: Entity

Resource where higher-priority requests can preempt lower-priority holders.

Capacity is allocated by priority (lower value = higher priority). When capacity is insufficient, a higher-priority request can evict the lowest-priority holder if preempt=True.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
capacity int

Total capacity of the resource (integer units).

required

acquire

acquire(
    amount: int = 1,
    priority: float = 0.0,
    preempt: bool = True,
    on_preempt: Callable[[], None] | None = None,
) -> SimFuture

Acquire capacity, returning a SimFuture resolving with a PreemptibleGrant.

Parameters:

Name Type Description Default
amount int

Amount of capacity to acquire.

1
priority float

Priority level (lower = higher priority).

0.0
preempt bool

If True, try to preempt lower-priority holders.

True
on_preempt Callable[[], None] | None

Callback fired if this grant is later preempted.

None

Returns:

Type Description
SimFuture

SimFuture resolving with a PreemptibleGrant.

handle_event

handle_event(event: Event) -> None

PreemptibleResource does not process events directly.

PreemptibleResourceStats dataclass

PreemptibleResourceStats(
    capacity: int = 0,
    available: int = 0,
    acquisitions: int = 0,
    releases: int = 0,
    preemptions: int = 0,
    contentions: int = 0,
)

Snapshot of preemptible resource statistics.

RenegingQueuedResource

RenegingQueuedResource(
    name: str,
    reneged_target: Entity | None = None,
    default_patience_s: float = float("inf"),
    policy: QueuePolicy | None = None,
)

Bases: QueuedResource

Abstract QueuedResource where items can renege (leave) if they wait too long.

When an item is dequeued, the resource checks whether (now - created_at) > patience. If so, the item is routed to reneged_target instead of being processed.

Patience is determined from event.context["patience_s"] if present, otherwise default_patience_s is used.

Subclasses implement _handle_served_event() for items that are still within their patience window.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
reneged_target Entity | None

Entity to receive reneged items (or None to discard).

None
default_patience_s float

Default patience in seconds.

float('inf')
policy QueuePolicy | None

Queue ordering policy (default FIFO).

None

RenegingStats dataclass

RenegingStats(served: int = 0, reneged: int = 0)

Snapshot of reneging statistics.

Shift dataclass

Shift(start_s: float, end_s: float, capacity: int)

A single shift defining capacity over a time window.

Parameters:

Name Type Description Default
start_s float

Start time in seconds.

required
end_s float

End time in seconds.

required
capacity int

Number of concurrent workers during this shift.

required

ShiftedServer

ShiftedServer(
    name: str,
    schedule: ShiftSchedule,
    service_time: float = 0.1,
    downstream: Entity | None = None,
    policy: QueuePolicy | None = None,
)

Bases: QueuedResource

QueuedResource whose concurrency varies according to a ShiftSchedule.

At each shift boundary, the server adjusts its concurrency to match the schedule. Uses a self-perpetuating pattern: each shift change event schedules the next one.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
schedule ShiftSchedule

ShiftSchedule defining capacity over time.

required
service_time float

Seconds per item processed.

0.1
downstream Entity | None

Entity to forward processed items to (or None).

None
policy QueuePolicy | None

Queue ordering policy (default FIFO).

None

ShiftSchedule

ShiftSchedule(
    shifts: list[Shift], default_capacity: int = 0
)

Collection of shifts that defines time-varying capacity.

Shifts should be non-overlapping and sorted by start time. Gaps between shifts default to default_capacity.

Parameters:

Name Type Description Default
shifts list[Shift]

List of Shift definitions.

required
default_capacity int

Capacity outside of any defined shift.

0

capacity_at

capacity_at(time_s: float) -> int

Return the capacity at the given time.

next_transition_after

next_transition_after(time_s: float) -> float | None

Return the next transition time strictly after time_s, or None.

transition_times

transition_times() -> list[float]

Return all shift boundary times (sorted, deduplicated).

SplitMerge

SplitMerge(
    name: str,
    targets: list[Entity],
    downstream: Entity,
    split_event_type: str = "SubTask",
    merge_event_type: str = "Merged",
)

Bases: Entity

Entity that fans out events to parallel targets and merges results.

For each incoming event, creates N sub-events (one per target) with a reply_future in context. Uses all_of to wait for all targets to resolve their futures, then forwards the merged result downstream with context["sub_results"].

Targets must call event.context["reply_future"].resolve(value) when their work is complete.

Parameters:

Name Type Description Default
name str

Identifier for logging.

required
targets list[Entity]

List of entities to fan out to.

required
downstream Entity

Entity to forward merged results to.

required
split_event_type str

Event type for sub-task events.

'SubTask'
merge_event_type str

Event type for the merged result event.

'Merged'

SplitMergeStats dataclass

SplitMergeStats(
    splits_initiated: int = 0,
    merges_completed: int = 0,
    fan_out: int = 0,
)

Snapshot of split-merge statistics.