Skip to content

Infrastructure

Infrastructure primitives: disk I/O, page cache, CPU scheduling, garbage collection, TCP, and DNS.

Low-level infrastructure primitives for distributed system simulation.

Provides components modeling OS and hardware-level behaviors that affect distributed system performance: disk I/O, page caching, CPU scheduling, garbage collection, TCP transport, and DNS resolution.

CPUScheduler

CPUScheduler(
    name: str,
    *,
    policy: SchedulingPolicy | None = None,
    context_switch_s: float = 5e-06,
)

Bases: Entity

CPU scheduler with time-slicing and context switch overhead.

Provides execute() to submit work that is scheduled according to the configured policy. Multiple concurrent execute() calls compete for CPU time through the scheduler.

Parameters:

Name Type Description Default
name str

Entity name.

required
policy SchedulingPolicy | None

Scheduling policy. Defaults to FairShare.

None
context_switch_s float

Time cost per context switch (default 5us).

5e-06

Example::

cpu = CPUScheduler("cpu", policy=PriorityPreemptive())
sim = Simulation(entities=[cpu, ...], ...)

# In another entity's handle_event:
yield from cpu.execute("task-1", cpu_time_s=0.05, priority=1)

ready_queue_depth property

ready_queue_depth: int

Number of tasks waiting in the ready queue.

stats property

stats: CPUSchedulerStats

Frozen snapshot of CPU scheduler statistics.

execute

execute(
    task_id: str, cpu_time_s: float, priority: int = 0
) -> Generator[float]

Submit a task for CPU execution, yielding until complete.

The task will be time-sliced according to the scheduling policy and may be preempted by higher-priority tasks.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required
cpu_time_s float

Total CPU time required.

required
priority int

Task priority (higher = more important).

0

handle_event

handle_event(event: Event) -> None

CPUScheduler does not process events directly.

CPUSchedulerStats dataclass

CPUSchedulerStats(
    tasks_completed: int = 0,
    context_switches: int = 0,
    total_cpu_time_s: float = 0.0,
    total_context_switch_overhead_s: float = 0.0,
    total_wait_time_s: float = 0.0,
    ready_queue_depth: int = 0,
    peak_queue_depth: int = 0,
)

Frozen snapshot of CPU scheduler statistics.

Attributes:

Name Type Description
tasks_completed int

Total tasks that ran to completion.

context_switches int

Total context switches performed.

total_cpu_time_s float

Total CPU time consumed by tasks.

total_context_switch_overhead_s float

Total time spent in context switches.

total_wait_time_s float

Total time tasks spent waiting in the ready queue.

ready_queue_depth int

Current number of tasks in the ready queue.

peak_queue_depth int

Maximum concurrent tasks observed.

overhead_fraction property

overhead_fraction: float

Fraction of total time spent in context switches.

CPUTask dataclass

CPUTask(
    task_id: str,
    priority: int = 0,
    remaining_s: float = 0.0,
    wait_time_s: float = 0.0,
)

A task registered with the CPU scheduler.

Attributes:

Name Type Description
task_id str

Unique identifier for the task.

priority int

Priority level (higher = more important).

remaining_s float

Remaining CPU time needed.

wait_time_s float

Total time spent waiting for CPU.

FairShare

FairShare(quantum_s: float = 0.01)

Bases: SchedulingPolicy

Equal time slices across all tasks (round-robin).

Parameters:

Name Type Description Default
quantum_s float

Time slice per task in seconds (default 10ms).

0.01

PriorityPreemptive

PriorityPreemptive(quantum_s: float = 0.01)

Bases: SchedulingPolicy

Highest priority task runs first, preempts lower priority.

Tasks with higher priority values are selected first. Equal-priority tasks are served in FIFO order.

Parameters:

Name Type Description Default
quantum_s float

Time slice in seconds (default 10ms).

0.01

SchedulingPolicy

Bases: ABC

Strategy for ordering tasks on the CPU.

select_next abstractmethod

select_next(tasks: list[CPUTask]) -> CPUTask | None

Select the next task to run from the ready queue.

Parameters:

Name Type Description Default
tasks list[CPUTask]

List of ready tasks (non-empty).

required

Returns:

Type Description
CPUTask | None

The selected task, or None if no tasks.

time_quantum_s abstractmethod

time_quantum_s(task: CPUTask) -> float

Return the time slice for the selected task.

HDD

HDD(
    *,
    seek_time_s: float = 0.008,
    rotational_latency_s: float = 0.004,
    transfer_rate_mbps: float = 150.0,
    queue_depth_penalty: float = 0.3,
)

Bases: DiskProfile

Spinning disk: seek time + rotational latency + transfer.

Parameters:

Name Type Description Default
seek_time_s float

Average seek time (default 8ms).

0.008
rotational_latency_s float

Average rotational latency (default 4ms for 7200rpm).

0.004
transfer_rate_mbps float

Sequential transfer rate in MB/s (default 150).

150.0
queue_depth_penalty float

Multiplier per additional queued I/O (default 0.3).

0.3

SSD

SSD(
    *,
    base_read_latency_s: float = 2.5e-05,
    base_write_latency_s: float = 0.0001,
    transfer_rate_mbps: float = 550.0,
    queue_depth_factor: float = 0.15,
)

Bases: DiskProfile

NAND flash: uniform low latency with moderate queue scaling.

Parameters:

Name Type Description Default
base_read_latency_s float

Base read latency (default 25us).

2.5e-05
base_write_latency_s float

Base write latency (default 100us).

0.0001
transfer_rate_mbps float

Transfer rate in MB/s (default 550).

550.0
queue_depth_factor float

Log-based queue depth scaling (default 0.15).

0.15

DiskIO

DiskIO(name: str, *, profile: DiskProfile | None = None)

Bases: Entity

Disk I/O model with device profiles and queue depth effects.

Provides read() and write() generator methods that yield latency based on the configured disk profile and current queue depth.

Parameters:

Name Type Description Default
name str

Entity name.

required
profile DiskProfile | None

Disk profile (HDD, SSD, NVMe). Defaults to SSD.

None

Example::

disk = DiskIO("ssd", profile=SSD())
sim = Simulation(entities=[disk, ...], ...)

# In another entity's handle_event:
yield from disk.read(4096)
yield from disk.write(8192)

queue_depth property

queue_depth: int

Current number of in-flight I/O operations.

stats property

stats: DiskIOStats

Frozen snapshot of disk I/O statistics.

read

read(size_bytes: int = 4096) -> Generator[float]

Read from disk, yielding I/O latency.

Parameters:

Name Type Description Default
size_bytes int

Number of bytes to read.

4096

write

write(size_bytes: int = 4096) -> Generator[float]

Write to disk, yielding I/O latency.

Parameters:

Name Type Description Default
size_bytes int

Number of bytes to write.

4096

handle_event

handle_event(event: Event) -> None

DiskIO does not process events directly.

DiskIOStats dataclass

DiskIOStats(
    reads: int = 0,
    writes: int = 0,
    bytes_read: int = 0,
    bytes_written: int = 0,
    total_read_latency_s: float = 0.0,
    total_write_latency_s: float = 0.0,
    current_queue_depth: int = 0,
    peak_queue_depth: int = 0,
)

Frozen snapshot of DiskIO statistics.

Attributes:

Name Type Description
reads int

Total read operations completed.

writes int

Total write operations completed.

bytes_read int

Total bytes read.

bytes_written int

Total bytes written.

total_read_latency_s float

Cumulative read latency.

total_write_latency_s float

Cumulative write latency.

current_queue_depth int

Current number of in-flight I/O operations.

peak_queue_depth int

Maximum concurrent I/O observed.

DiskProfile

Bases: ABC

Strategy defining latency characteristics of a storage device.

read_latency_s abstractmethod

read_latency_s(size_bytes: int, queue_depth: int) -> float

Return read latency in seconds for the given size and queue depth.

write_latency_s abstractmethod

write_latency_s(size_bytes: int, queue_depth: int) -> float

Return write latency in seconds for the given size and queue depth.

NVMe

NVMe(
    *,
    base_read_latency_s: float = 1e-05,
    base_write_latency_s: float = 2e-05,
    transfer_rate_mbps: float = 3500.0,
    native_queue_depth: int = 32,
    overflow_penalty: float = 0.05,
)

Bases: DiskProfile

NVMe SSD: very low latency with high native parallelism.

Parameters:

Name Type Description Default
base_read_latency_s float

Base read latency (default 10us).

1e-05
base_write_latency_s float

Base write latency (default 20us).

2e-05
transfer_rate_mbps float

Transfer rate in MB/s (default 3500).

3500.0
native_queue_depth int

Device parallelism before penalties apply (default 32).

32
overflow_penalty float

Penalty per I/O beyond native queue depth (default 0.05).

0.05

DNSRecord dataclass

DNSRecord(
    hostname: str, ip_address: str, ttl_s: float = 300.0
)

A DNS record mapping a hostname to an IP address.

Attributes:

Name Type Description
hostname str

The queried hostname.

ip_address str

The resolved IP address.

ttl_s float

Time-to-live in seconds.

DNSResolver

DNSResolver(
    name: str,
    *,
    cache_capacity: int = 1000,
    root_latency_s: float = 0.02,
    tld_latency_s: float = 0.015,
    auth_latency_s: float = 0.01,
    records: dict[str, DNSRecord] | None = None,
)

Bases: Entity

DNS resolver with caching, TTL, and hierarchical lookup latency.

Provides resolve() to look up a hostname. Results are cached according to the record's TTL. Cache misses trigger a hierarchical lookup through root, TLD, and authoritative nameservers.

Parameters:

Name Type Description Default
name str

Entity name.

required
cache_capacity int

Maximum number of cached records (default 1000).

1000
root_latency_s float

Latency for root nameserver query (default 20ms).

0.02
tld_latency_s float

Latency for TLD nameserver query (default 15ms).

0.015
auth_latency_s float

Latency for authoritative nameserver query (default 10ms).

0.01
records dict[str, DNSRecord] | None

Pre-configured DNS records (hostname -> DNSRecord).

None

Example::

dns = DNSResolver("dns", records={
    "api.example.com": DNSRecord("api.example.com", "10.0.0.1", ttl_s=60),
})
sim = Simulation(entities=[dns, ...], ...)

# In another entity's handle_event:
ip = yield from dns.resolve("api.example.com")

cache_size property

cache_size: int

Number of entries currently in the DNS cache.

stats property

stats: DNSStats

Frozen snapshot of DNS resolver statistics.

add_record

add_record(record: DNSRecord) -> None

Add or update an authoritative DNS record.

Parameters:

Name Type Description Default
record DNSRecord

The DNS record to add.

required

resolve

resolve(
    hostname: str,
) -> Generator[float, None, str | None]

Resolve a hostname to an IP address.

Checks cache first, then performs hierarchical DNS lookup (root -> TLD -> authoritative). Returns the IP address or None if the hostname is not found.

Parameters:

Name Type Description Default
hostname str

The hostname to resolve.

required

handle_event

handle_event(event: Event) -> None

DNSResolver does not process events directly.

DNSStats dataclass

DNSStats(
    lookups: int = 0,
    cache_hits: int = 0,
    cache_misses: int = 0,
    cache_expirations: int = 0,
    cache_evictions: int = 0,
    cache_size: int = 0,
    total_resolution_latency_s: float = 0.0,
)

Frozen snapshot of DNS resolver statistics.

Attributes:

Name Type Description
lookups int

Total lookup requests.

cache_hits int

Lookups served from cache.

cache_misses int

Lookups requiring hierarchical resolution.

cache_expirations int

Cache entries that expired (TTL).

cache_evictions int

Cache entries evicted due to capacity.

cache_size int

Current number of entries in cache.

total_resolution_latency_s float

Cumulative resolution latency.

ConcurrentGC

ConcurrentGC(
    *, pause_s: float = 0.005, interval_s: float = 2.0
)

Bases: GCStrategy

Mostly concurrent collection with short pauses.

Most work happens concurrently; only marking/remarking phases pause.

Parameters:

Name Type Description Default
pause_s float

Pause duration per cycle (default 5ms).

0.005
interval_s float

Seconds between collections (default 2s).

2.0

GarbageCollector

GarbageCollector(
    name: str,
    *,
    strategy: GCStrategy | None = None,
    heap_pressure: float | None = None,
)

Bases: Entity

GC pause injection model.

Self-schedules periodic collection events and injects pauses that block processing of the target entity. The pause() generator method can be called from any entity's handle_event to inject a GC-aware delay.

Parameters:

Name Type Description Default
name str

Entity name.

required
strategy GCStrategy | None

GC strategy. Defaults to GenerationalGC.

None
heap_pressure float | None

Fixed heap pressure (0.0-1.0). If None, grows over time from 0.3 to 0.9 based on collection count.

None

Example::

gc = GarbageCollector("jvm_gc", strategy=StopTheWorld())
sim = Simulation(entities=[gc, ...], ...)

# In another entity's handle_event:
yield from gc.pause()  # inject GC pause if due

collection_count property

collection_count: int

Total number of GC collections performed.

stats property

stats: GCStats

Frozen snapshot of GC statistics.

pause

pause() -> Generator[float, None, float]

Inject a GC pause at the current time.

Call from any entity's handle_event to simulate GC overhead. Returns the pause duration.

handle_event

handle_event(
    event: Event,
) -> Generator[float, None, list[Event]]

Handle scheduled GC collection events.

prime

prime() -> Event

Create the initial GC scheduling event.

Call this and pass to sim.schedule() to start the GC cycle. Requires the entity to have a clock (be registered with a Simulation).

GCStats dataclass

GCStats(
    collections: int = 0,
    total_pause_s: float = 0.0,
    max_pause_s: float = 0.0,
    min_pause_s: float = 0.0,
    minor_collections: int = 0,
    major_collections: int = 0,
    strategy_name: str = "",
)

Frozen snapshot of garbage collector statistics.

Attributes:

Name Type Description
collections int

Total collection cycles.

total_pause_s float

Cumulative pause time.

max_pause_s float

Longest single pause.

min_pause_s float

Shortest single pause.

minor_collections int

Minor GC cycles (GenerationalGC only).

major_collections int

Major GC cycles (GenerationalGC only).

strategy_name str

Name of the active GC strategy.

GCStrategy

Bases: ABC

Strategy defining GC pause characteristics.

name abstractmethod property

name: str

Human-readable name for this GC strategy.

pause_duration_s abstractmethod

pause_duration_s(heap_pressure: float) -> float

Return pause duration in seconds for current heap pressure.

Parameters:

Name Type Description Default
heap_pressure float

Fraction of heap used (0.0 to 1.0).

required

collection_interval_s abstractmethod

collection_interval_s() -> float

Return seconds between collections.

GenerationalGC

GenerationalGC(
    *,
    minor_pause_s: float = 0.002,
    major_pause_s: float = 0.03,
    minor_interval_s: float = 1.0,
    major_threshold: float = 0.75,
)

Bases: GCStrategy

Generational collection with minor and major cycles.

Minor collections are fast and frequent. Major collections are slower and triggered based on heap pressure.

Parameters:

Name Type Description Default
minor_pause_s float

Minor GC pause duration (default 2ms).

0.002
major_pause_s float

Major GC pause duration (default 30ms).

0.03
minor_interval_s float

Seconds between minor collections (default 1s).

1.0
major_threshold float

Heap pressure threshold for major GC (default 0.75).

0.75

StopTheWorld

StopTheWorld(
    *,
    base_pause_s: float = 0.05,
    interval_s: float = 10.0,
    pressure_multiplier: float = 3.0,
)

Bases: GCStrategy

Full heap collection with long pauses.

Parameters:

Name Type Description Default
base_pause_s float

Base pause duration (default 50ms).

0.05
interval_s float

Seconds between collections (default 10s).

10.0
pressure_multiplier float

How much heap pressure extends pause (default 3.0).

3.0

PageCache

PageCache(
    name: str,
    *,
    capacity_pages: int = 1000,
    page_size_bytes: int = 4096,
    readahead_pages: int = 0,
    disk_read_latency_s: float = 0.0001,
    disk_write_latency_s: float = 0.0002,
)

Bases: Entity

OS page cache with LRU eviction, read-ahead, and writeback.

Provides read_page() and write_page() generator methods that check the cache first, falling back to disk I/O on miss.

Parameters:

Name Type Description Default
name str

Entity name.

required
capacity_pages int

Maximum number of pages in cache.

1000
page_size_bytes int

Size of each page in bytes (default 4096).

4096
readahead_pages int

Number of pages to prefetch on miss (default 0).

0
disk_read_latency_s float

Latency for reading a page from disk.

0.0001
disk_write_latency_s float

Latency for writing a page to disk.

0.0002

Example::

cache = PageCache("os_cache", capacity_pages=1000, readahead_pages=4)
sim = Simulation(entities=[cache, ...], ...)

# In another entity's handle_event:
yield from cache.read_page(page_id=42)
yield from cache.write_page(page_id=42)

pages_cached property

pages_cached: int

Number of pages currently in cache.

dirty_pages property

dirty_pages: int

Number of dirty pages in cache.

stats property

stats: PageCacheStats

Frozen snapshot of page cache statistics.

read_page

read_page(page_id: int) -> Generator[float]

Read a page, serving from cache if present.

On cache miss, loads from disk and optionally prefetches adjacent pages via read-ahead.

write_page

write_page(page_id: int) -> Generator[float]

Write a page to cache, marking it dirty.

If the page is already cached, it is updated in place. Otherwise, space is made and a new dirty page is inserted.

flush

flush() -> Generator[float, None, int]

Flush all dirty pages to disk.

Returns the number of pages flushed.

handle_event

handle_event(event: Event) -> None

PageCache does not process events directly.

PageCacheStats dataclass

PageCacheStats(
    hits: int = 0,
    misses: int = 0,
    evictions: int = 0,
    dirty_writebacks: int = 0,
    readaheads: int = 0,
    pages_cached: int = 0,
    dirty_pages: int = 0,
)

Frozen snapshot of page cache statistics.

Attributes:

Name Type Description
hits int

Cache hit count.

misses int

Cache miss count.

evictions int

Pages evicted from cache.

dirty_writebacks int

Dirty pages written back to disk.

readaheads int

Pages prefetched via read-ahead.

pages_cached int

Current number of cached pages.

dirty_pages int

Current number of dirty pages.

AIMD

AIMD(
    *,
    additive_increase: float = 1.0,
    multiplicative_decrease: float = 0.5,
)

Bases: CongestionControl

Additive Increase / Multiplicative Decrease (TCP Reno).

Parameters:

Name Type Description Default
additive_increase float

Segments added per RTT in congestion avoidance (default 1).

1.0
multiplicative_decrease float

Factor to multiply cwnd on loss (default 0.5).

0.5

BBR

BBR(*, gain: float = 1.0, drain_gain: float = 0.75)

Bases: CongestionControl

Bottleneck Bandwidth and Round-trip propagation time.

Estimates bottleneck bandwidth and min RTT to set cwnd optimally. Simplified model: paces sending rate based on estimated BDP.

Parameters:

Name Type Description Default
gain float

Pacing gain during steady state (default 1.0).

1.0
drain_gain float

Gain during drain phase (default 0.75).

0.75

CongestionControl

Bases: ABC

Strategy for TCP congestion window management.

on_ack abstractmethod

on_ack(cwnd: float, ssthresh: float) -> float

Update cwnd after receiving an ACK.

Parameters:

Name Type Description Default
cwnd float

Current congestion window (segments).

required
ssthresh float

Slow start threshold.

required

Returns:

Type Description
float

New cwnd value.

on_loss abstractmethod

on_loss(cwnd: float) -> tuple[float, float]

Update cwnd and ssthresh after packet loss.

Returns:

Type Description
tuple[float, float]

Tuple of (new_cwnd, new_ssthresh).

Cubic

Cubic(*, beta: float = 0.7, c: float = 0.4)

Bases: CongestionControl

CUBIC congestion control (Linux default).

Uses a cubic function for window growth, providing better performance on high-bandwidth, high-latency networks.

Parameters:

Name Type Description Default
beta float

Multiplicative decrease factor (default 0.7).

0.7
c float

Cubic scaling constant (default 0.4).

0.4

TCPConnection

TCPConnection(
    name: str,
    *,
    congestion_control: CongestionControl | None = None,
    base_rtt_s: float = 0.05,
    loss_rate: float = 0.001,
    mss_bytes: int = 1460,
    initial_cwnd: float = 10.0,
    initial_ssthresh: float = 64.0,
    retransmit_timeout_s: float = 1.0,
)

Bases: Entity

TCP-like transport with congestion control.

Models a TCP connection between two endpoints with configurable congestion control. Provides send() to transmit data, which yields appropriate delays based on cwnd and RTT.

Parameters:

Name Type Description Default
name str

Entity name.

required
congestion_control CongestionControl | None

Congestion control algorithm. Defaults to AIMD.

None
base_rtt_s float

Base round-trip time (default 50ms).

0.05
loss_rate float

Probability of packet loss per segment (default 0.001).

0.001
mss_bytes int

Maximum segment size in bytes (default 1460).

1460
initial_cwnd float

Initial congestion window in segments (default 10).

10.0
initial_ssthresh float

Initial slow-start threshold (default 64).

64.0
retransmit_timeout_s float

Retransmission timeout (default 1.0s).

1.0

Example::

tcp = TCPConnection("conn", congestion_control=Cubic(), base_rtt_s=0.02)
sim = Simulation(entities=[tcp, ...], ...)

# In another entity's handle_event:
yield from tcp.send(65536)  # send 64KB

cwnd property

cwnd: float

Current congestion window in segments.

rtt_s property

rtt_s: float

Current estimated RTT in seconds.

throughput_segments_per_s property

throughput_segments_per_s: float

Estimated throughput in segments per second.

stats property

stats: TCPStats

Frozen snapshot of TCP connection statistics.

send

send(size_bytes: int) -> Generator[float]

Send data over the TCP connection.

Segments data into MSS-sized chunks, applies congestion control, and yields appropriate delays. Simulates loss and retransmission.

Parameters:

Name Type Description Default
size_bytes int

Total bytes to send.

required

handle_event

handle_event(event: Event) -> None

TCPConnection does not process events directly.

TCPStats dataclass

TCPStats(
    segments_sent: int = 0,
    segments_acked: int = 0,
    retransmissions: int = 0,
    cwnd: float = 0.0,
    ssthresh: float = 0.0,
    rtt_s: float = 0.0,
    throughput_segments_per_s: float = 0.0,
    total_bytes_sent: int = 0,
    algorithm: str = "",
)

Frozen snapshot of TCP connection statistics.

Attributes:

Name Type Description
segments_sent int

Total segments transmitted.

segments_acked int

Total acknowledgements received.

retransmissions int

Total segments retransmitted.

cwnd float

Current congestion window (segments).

ssthresh float

Current slow start threshold.

rtt_s float

Current estimated round-trip time.

throughput_segments_per_s float

Current throughput estimate.

total_bytes_sent int

Total bytes transmitted.

algorithm str

Name of the congestion control algorithm.