Skip to content

Consensus

Raft, Paxos, leader election, distributed locks, and failure detectors.

Consensus and coordination components for distributed systems simulation.

DistributedLock

DistributedLock(
    name: str,
    lease_duration: float = 10.0,
    max_waiters: int = 0,
)

Bases: Entity

Distributed lock manager with fencing tokens and leases.

Parameters:

Name Type Description Default
name str

Entity identifier.

required
lease_duration float

Default lease duration in seconds.

10.0
max_waiters int

Maximum queued waiters per lock (0 = unlimited).

0

acquire

acquire(lock_name: str, requester: str) -> SimFuture

Acquire a lock, blocking if held by another.

Parameters:

Name Type Description Default
lock_name str

Name of the lock.

required
requester str

Name of the requesting entity.

required

Returns:

Type Description
SimFuture

SimFuture resolving with LockGrant when acquired.

try_acquire

try_acquire(
    lock_name: str, requester: str
) -> LockGrant | None

Non-blocking lock acquisition.

Parameters:

Name Type Description Default
lock_name str

Name of the lock.

required
requester str

Name of the requesting entity.

required

Returns:

Type Description
LockGrant | None

LockGrant if acquired, None if lock is held.

release

release(lock_name: str, fencing_token: int) -> bool

Release a lock by fencing token.

Parameters:

Name Type Description Default
lock_name str

Name of the lock.

required
fencing_token int

The fencing token from the LockGrant.

required

Returns:

Type Description
bool

True if the lock was released, False if token doesn't match.

DistributedLockStats dataclass

DistributedLockStats(
    total_acquires: int = 0,
    total_releases: int = 0,
    total_expirations: int = 0,
    total_rejections: int = 0,
    active_locks: int = 0,
    total_waiters: int = 0,
)

Statistics snapshot from a DistributedLock.

Attributes:

Name Type Description
total_acquires int

Total successful acquisitions.

total_releases int

Total explicit releases.

total_expirations int

Total lease expirations.

total_rejections int

Total rejected acquisitions (queue full).

active_locks int

Number of currently held locks.

total_waiters int

Total queued waiters across all locks.

LockGrant dataclass

LockGrant(
    lock_name: str,
    fencing_token: int,
    holder: str,
    granted_at: float,
    lease_duration: float,
)

Represents a granted lock with fencing token.

Attributes:

Name Type Description
lock_name str

Name of the lock.

fencing_token int

Monotonically increasing token for fencing.

holder str

Name of the lock holder.

granted_at float

Simulation time when lock was granted (seconds).

lease_duration float

Duration of the lease in seconds.

expires_at property

expires_at: float

When this lease expires (seconds).

BullyStrategy

Bully election: highest ID wins.

When a node detects the leader is down, it sends Election messages to all nodes with higher IDs. If none respond, it declares itself leader. Nodes with higher IDs override lower ones by sending Bully messages.

ElectionStrategy

Bases: Protocol

Protocol for pluggable election algorithms.

should_start_election

should_start_election(
    node_id: str, alive_members: list[str]
) -> bool

Determine if this node should initiate an election.

Parameters:

Name Type Description Default
node_id str

This node's identifier.

required
alive_members list[str]

List of alive member identifiers.

required

Returns:

Type Description
bool

True if this node should start an election.

get_election_messages

get_election_messages(
    node_id: str, alive_members: list[str], term: int
) -> list[dict[str, Any]]

Generate election messages to send to peers.

Parameters:

Name Type Description Default
node_id str

This node's identifier.

required
alive_members list[str]

List of alive member identifiers.

required
term int

Current election term.

required

Returns:

Type Description
list[dict[str, Any]]

List of message dicts with 'target', 'event_type', 'payload'.

handle_election_message

handle_election_message(
    node_id: str,
    message_type: str,
    payload: dict[str, Any],
    alive_members: list[str],
) -> dict[str, Any]

Process an incoming election message.

Parameters:

Name Type Description Default
node_id str

This node's identifier.

required
message_type str

The type of election message.

required
payload dict[str, Any]

Message payload.

required
alive_members list[str]

List of alive member identifiers.

required

Returns:

Type Description
dict[str, Any]

Dict with 'response_messages' (list), 'leader' (str|None),

dict[str, Any]

'suppress_election' (bool).

RandomizedStrategy

RandomizedStrategy(ballot_range: int = 1000000)

Randomized election: random ballot numbers, highest wins.

Each participant draws a random ballot. After collecting ballots from all alive members, the highest ballot wins.

RingStrategy

Ring election: token circulates, highest ID in the ring wins.

The election token travels around a logical ring. Each node appends its ID. When the token returns to the initiator, the highest ID wins.

FlexiblePaxosNode

FlexiblePaxosNode(
    name: str,
    network: Any,
    peers: list[FlexiblePaxosNode] | None = None,
    state_machine: StateMachine | None = None,
    phase1_quorum: int | None = None,
    phase2_quorum: int | None = None,
    heartbeat_interval: float = 1.0,
)

Bases: Entity

Flexible Paxos participant with asymmetric quorums.

Parameters:

Name Type Description Default
name str

Node identifier.

required
network Any

Network for communication.

required
peers list[FlexiblePaxosNode] | None

List of peer nodes.

None
state_machine StateMachine | None

State machine for committed commands.

None
phase1_quorum int | None

Quorum size for Phase 1 (prepare).

None
phase2_quorum int | None

Quorum size for Phase 2 (accept).

None
heartbeat_interval float

Seconds between leader heartbeats.

1.0

Raises:

Type Description
ValueError

If quorum sizes violate Q1 + Q2 > N.

submit

submit(command: Any) -> SimFuture

Submit a command for consensus.

FlexiblePaxosStats dataclass

FlexiblePaxosStats(
    is_leader: bool = False,
    current_ballot: int = 0,
    log_length: int = 0,
    commit_index: int = 0,
    commands_committed: int = 0,
    phase1_quorum: int = 0,
    phase2_quorum: int = 0,
)

Statistics snapshot from a FlexiblePaxosNode.

Attributes:

Name Type Description
is_leader bool

Whether this node is the leader.

current_ballot int

Current ballot number.

log_length int

Number of entries in the log.

commit_index int

Highest committed log index.

commands_committed int

Total commands committed.

phase1_quorum int

Phase 1 quorum size.

phase2_quorum int

Phase 2 quorum size.

ElectionStats dataclass

ElectionStats(
    current_leader: str | None = None,
    current_term: int = 0,
    elections_started: int = 0,
    elections_won: int = 0,
    elections_participated: int = 0,
)

Statistics snapshot from a LeaderElection entity.

Attributes:

Name Type Description
current_leader str | None

Name of the current leader, or None.

current_term int

Current election term.

elections_started int

Total elections initiated.

elections_won int

Elections won by this node.

elections_participated int

Elections this node participated in.

LeaderElection

LeaderElection(
    name: str,
    network: Any,
    members: dict[str, Entity] | None = None,
    strategy: ElectionStrategy | None = None,
    election_timeout: float = 2.0,
    heartbeat_interval: float = 0.5,
)

Bases: Entity

Leader election entity with pluggable strategy.

Monitors a MembershipProtocol for leader failure and triggers elections using the configured ElectionStrategy.

Parameters:

Name Type Description Default
name str

Entity identifier.

required
network Any

Network for communication.

required
members dict[str, Entity] | None

Dict mapping member names to Entity references.

None
strategy ElectionStrategy | None

Election strategy to use. Defaults to BullyStrategy.

None
election_timeout float

Seconds to wait before starting an election.

2.0
heartbeat_interval float

Seconds between leader heartbeats.

0.5

current_leader property

current_leader: str | None

The current leader's name, or None.

current_term property

current_term: int

The current election term.

is_leader property

is_leader: bool

Whether this node is the current leader.

add_member

add_member(entity: Entity) -> None

Register a member for election participation.

start

start() -> list[Event]

Schedule the first election timeout check.

Log

Log()

In-memory replicated log with term and commit tracking.

Entries use 1-based indexing. The log supports append, truncation, and commit index advancement.

Attributes:

Name Type Description
commit_index int

Index of the highest committed entry (0 = none).

last_index property

last_index: int

Index of the last entry, or 0 if empty.

last_term property

last_term: int

Term of the last entry, or 0 if empty.

last_entry property

last_entry: LogEntry | None

The last entry, or None if empty.

append

append(term: int, command: object) -> LogEntry

Append a new entry to the log.

Parameters:

Name Type Description Default
term int

The term number for this entry.

required
command object

The command payload.

required

Returns:

Type Description
LogEntry

The newly created LogEntry.

append_entry

append_entry(entry: LogEntry) -> None

Append a pre-built entry, overriding its index to maintain sequence.

get

get(index: int) -> LogEntry | None

Get an entry by 1-based index.

Parameters:

Name Type Description Default
index int

1-based log position.

required

Returns:

Type Description
LogEntry | None

The LogEntry at that index, or None if out of range.

truncate_from

truncate_from(index: int) -> int

Remove all entries from the given index onward (inclusive).

Parameters:

Name Type Description Default
index int

1-based index to start truncation from.

required

Returns:

Type Description
int

Number of entries removed.

entries_after

entries_after(index: int) -> list[LogEntry]

Return all entries with index > the given index.

Parameters:

Name Type Description Default
index int

Return entries after this 1-based index. Use 0 for all.

required

Returns:

Type Description
list[LogEntry]

List of LogEntry objects.

entries_from

entries_from(index: int) -> list[LogEntry]

Return all entries with index >= the given index.

Parameters:

Name Type Description Default
index int

Return entries from this 1-based index.

required

Returns:

Type Description
list[LogEntry]

List of LogEntry objects.

committed_entries

committed_entries() -> list[LogEntry]

Return all committed entries (index <= commit_index).

uncommitted_entries

uncommitted_entries() -> list[LogEntry]

Return all uncommitted entries (index > commit_index).

advance_commit

advance_commit(new_commit_index: int) -> list[LogEntry]

Advance the commit index and return newly committed entries.

Parameters:

Name Type Description Default
new_commit_index int

The new commit index (must be >= current).

required

Returns:

Type Description
list[LogEntry]

List of entries that became committed.

LogEntry dataclass

LogEntry(index: int, term: int, command: object)

A single entry in the replicated log.

Attributes:

Name Type Description
index int

1-based position in the log.

term int

The leader term when this entry was created.

command object

The command to apply to the state machine.

MemberInfo dataclass

MemberInfo(
    name: str,
    entity: Entity,
    state: MemberState = MemberState.ALIVE,
    incarnation: int = 0,
    detector: PhiAccrualDetector = (
        lambda: PhiAccrualDetector(threshold=8.0)
    )(),
    state_change_time: float = 0.0,
)

Information about a cluster member.

Attributes:

Name Type Description
name str

The member's identifier.

entity Entity

Reference to the member's Entity (for network.send).

state MemberState

Current membership state.

incarnation int

Monotonically increasing counter to override stale suspicions.

detector PhiAccrualDetector

Phi accrual detector for this member.

state_change_time float

Simulation time when state last changed.

MembershipProtocol

MembershipProtocol(
    name: str,
    network: Any,
    probe_interval: float = 1.0,
    suspicion_timeout: float = 5.0,
    indirect_probe_count: int = 3,
    phi_threshold: float = 8.0,
)

Bases: Entity

SWIM membership protocol entity.

Periodically probes peers for liveness, suspects unresponsive nodes, and disseminates membership updates via piggybacking on messages.

Parameters:

Name Type Description Default
name str

Entity identifier.

required
network Any

Network used for communication.

required
probe_interval float

Seconds between probe rounds.

1.0
suspicion_timeout float

Seconds before a suspected member is declared dead.

5.0
indirect_probe_count int

Number of delegates for indirect probing.

3
phi_threshold float

Phi threshold for the accrual detector.

8.0

alive_members property

alive_members: list[str]

Names of all ALIVE members.

suspected_members property

suspected_members: list[str]

Names of all SUSPECT members.

dead_members property

dead_members: list[str]

Names of all DEAD members.

stats property

stats: MembershipStats

Current membership statistics.

add_member

add_member(entity: Entity) -> None

Register a peer member.

start

start() -> list[Event]

Schedule the first probe tick.

get_member_state

get_member_state(name: str) -> MemberState | None

Get the current state of a member by name.

MembershipStats dataclass

MembershipStats(
    alive_count: int = 0,
    suspect_count: int = 0,
    dead_count: int = 0,
    probes_sent: int = 0,
    indirect_probes_sent: int = 0,
    acks_received: int = 0,
    updates_disseminated: int = 0,
)

Snapshot of membership protocol statistics.

Attributes:

Name Type Description
alive_count int

Number of ALIVE members.

suspect_count int

Number of SUSPECT members.

dead_count int

Number of DEAD members.

probes_sent int

Total direct probes sent.

indirect_probes_sent int

Total indirect probes sent.

acks_received int

Total acks received.

updates_disseminated int

Total membership updates piggybacked.

MemberState

Bases: Enum

States a member can be in.

MultiPaxosNode

MultiPaxosNode(
    name: str,
    network: Any,
    peers: list[MultiPaxosNode] | None = None,
    state_machine: StateMachine | None = None,
    leader_lease_timeout: float = 5.0,
    heartbeat_interval: float = 1.0,
)

Bases: Entity

Multi-decree Paxos participant with stable leader optimization.

Parameters:

Name Type Description Default
name str

Node identifier.

required
network Any

Network for communication.

required
peers list[MultiPaxosNode] | None

List of peer nodes.

None
state_machine StateMachine | None

State machine to apply committed commands.

None
leader_lease_timeout float

Seconds before leader lease expires.

5.0
heartbeat_interval float

Seconds between leader heartbeats.

1.0

submit

submit(command: Any) -> SimFuture

Submit a command for consensus.

Returns a SimFuture resolving with (index, result) on commit.

start

start() -> list[Event]

Start by attempting to become leader.

MultiPaxosStats dataclass

MultiPaxosStats(
    is_leader: bool = False,
    current_ballot: int = 0,
    log_length: int = 0,
    commit_index: int = 0,
    commands_committed: int = 0,
    leader_changes: int = 0,
)

Statistics snapshot from a MultiPaxosNode.

Attributes:

Name Type Description
is_leader bool

Whether this node believes it is the leader.

current_ballot int

Current ballot number.

log_length int

Number of entries in the log.

commit_index int

Highest committed log index.

commands_committed int

Total commands committed.

leader_changes int

Number of leader changes observed.

Ballot dataclass

Ballot(number: int, node_id: str)

Totally ordered ballot number for Paxos proposals.

Ballots are ordered first by number, then by node_id for tie-breaking.

Attributes:

Name Type Description
number int

The ballot sequence number.

node_id str

The proposer's identifier for tie-breaking.

PaxosNode

PaxosNode(
    name: str,
    network: Any,
    peers: list[PaxosNode] | None = None,
    retry_delay: float = 0.5,
)

Bases: Entity

Single-decree Paxos participant (proposer + acceptor).

Parameters:

Name Type Description Default
name str

Node identifier.

required
network Any

Network for inter-node communication.

required
peers list[PaxosNode] | None

List of peer PaxosNode entities.

None
retry_delay float

Base delay before retrying with higher ballot.

0.5

quorum_size property

quorum_size: int

Majority quorum: (total_nodes // 2) + 1.

is_decided property

is_decided: bool

Whether this node has learned a decided value.

decided_value property

decided_value: Any

The decided value, or None if not yet decided.

set_peers

set_peers(peers: list[PaxosNode]) -> None

Set the peer list (excluding self).

propose

propose(value: Any) -> SimFuture

Start a proposal for a value.

Returns a SimFuture that resolves with the decided value when consensus is reached.

Parameters:

Name Type Description Default
value Any

The value to propose.

required

Returns:

Type Description
SimFuture

SimFuture that resolves with the decided value.

start_phase1

start_phase1() -> list[Event]

Send Prepare messages for the current ballot.

Call this after propose() to begin Phase 1.

PaxosStats dataclass

PaxosStats(
    proposals_started: int = 0,
    proposals_succeeded: int = 0,
    proposals_failed: int = 0,
    promises_received: int = 0,
    nacks_received: int = 0,
    accepts_received: int = 0,
    decided_value: Any = None,
)

Statistics snapshot from a PaxosNode.

Attributes:

Name Type Description
proposals_started int

Number of proposals initiated.

proposals_succeeded int

Number of successfully decided proposals.

proposals_failed int

Number of proposals that failed (nacked).

promises_received int

Number of promise responses received.

nacks_received int

Number of nack responses received.

accepts_received int

Number of accepted responses received.

decided_value Any

The decided value, or None if not yet decided.

PhiAccrualDetector

PhiAccrualDetector(
    threshold: float = 8.0,
    max_sample_size: int = 200,
    min_std: float = 0.1,
    initial_interval: float | None = None,
)

Phi accrual failure detector using a sliding window of heartbeat intervals.

The phi value represents -log10(P(no heartbeat | normal behavior)). Higher phi means more suspicion. A phi of 1 means ~10% chance the node is still alive; phi of 3 means ~0.1% chance.

Parameters:

Name Type Description Default
threshold float

Phi value above which the node is considered suspected. Default 8.0 (~1 in 100 million false positive rate).

8.0
max_sample_size int

Maximum number of inter-arrival intervals to keep.

200
min_std float

Minimum standard deviation to prevent division by zero.

0.1
initial_interval float | None

Expected heartbeat interval for bootstrapping.

None

stats property

stats: PhiAccrualStats

Current statistics snapshot.

Note: current_phi requires a timestamp; this returns 0.0 for phi. Use stats_at(now_s) for a phi-inclusive snapshot.

threshold property

threshold: float

The configured phi threshold.

last_heartbeat property

last_heartbeat: float | None

Timestamp of the last recorded heartbeat.

heartbeat

heartbeat(timestamp_s: float) -> None

Record a heartbeat arrival.

Parameters:

Name Type Description Default
timestamp_s float

The timestamp (in seconds) of the heartbeat.

required

phi

phi(now_s: float) -> float

Compute the current phi value.

Parameters:

Name Type Description Default
now_s float

Current time in seconds.

required

Returns:

Type Description
float

The phi suspicion level. Returns 0.0 if insufficient data.

is_available

is_available(now_s: float) -> bool

Check if the monitored node is considered available.

Parameters:

Name Type Description Default
now_s float

Current time in seconds.

required

Returns:

Type Description
bool

True if phi is below the threshold (node is available).

stats_at

stats_at(now_s: float) -> PhiAccrualStats

Statistics snapshot including current phi value.

Parameters:

Name Type Description Default
now_s float

Current time in seconds.

required

PhiAccrualStats dataclass

PhiAccrualStats(
    heartbeats_received: int = 0,
    current_phi: float = 0.0,
    mean_interval: float = 0.0,
    std_interval: float = 0.0,
    is_suspected: bool = False,
)

Statistics snapshot from a PhiAccrualDetector.

Attributes:

Name Type Description
heartbeats_received int

Total heartbeats recorded.

current_phi float

Most recent phi value (or 0 if unavailable).

mean_interval float

Mean inter-arrival time in seconds.

std_interval float

Standard deviation of inter-arrival times.

is_suspected bool

Whether phi exceeds the threshold.

RaftNode

RaftNode(
    name: str,
    network: Any,
    peers: list[RaftNode] | None = None,
    state_machine: StateMachine | None = None,
    election_timeout_min: float = 1.5,
    election_timeout_max: float = 3.0,
    heartbeat_interval: float = 0.5,
)

Bases: Entity

Raft consensus participant.

Parameters:

Name Type Description Default
name str

Node identifier.

required
network Any

Network for communication.

required
peers list[RaftNode] | None

List of peer RaftNode entities.

None
state_machine StateMachine | None

State machine to apply committed commands.

None
election_timeout_min float

Minimum election timeout in seconds.

1.5
election_timeout_max float

Maximum election timeout in seconds.

3.0
heartbeat_interval float

Seconds between leader heartbeats.

0.5

submit

submit(command: Any) -> SimFuture

Submit a command for consensus.

Returns a SimFuture resolving with (index, result) on commit.

start

start() -> list[Event]

Start the Raft node by scheduling initial election timeout.

RaftState

Bases: Enum

Raft node states.

RaftStats dataclass

RaftStats(
    state: RaftState = RaftState.FOLLOWER,
    current_term: int = 0,
    current_leader: str | None = None,
    log_length: int = 0,
    commit_index: int = 0,
    commands_committed: int = 0,
    elections_started: int = 0,
    votes_received: int = 0,
)

Statistics snapshot from a RaftNode.

Attributes:

Name Type Description
state RaftState

Current node state.

current_term int

Current term number.

current_leader str | None

Name of the known leader, or None.

log_length int

Number of entries in the log.

commit_index int

Highest committed index.

commands_committed int

Total commands committed via this node.

elections_started int

Total elections initiated.

votes_received int

Total votes received in current/past elections.

KVStateMachine

KVStateMachine()

Dictionary-backed key-value state machine.

Commands are dicts with the following format

{"op": "set", "key": k, "value": v} -> returns v {"op": "get", "key": k} -> returns value or None {"op": "delete", "key": k} -> returns deleted value or None {"op": "cas", "key": k, "expected": e, "value": v} -> returns bool

data property

data: dict[str, Any]

Read-only view of the current state.

apply

apply(command: Any) -> Any

Apply a key-value command.

Parameters:

Name Type Description Default
command Any

Dict with 'op' key and operation-specific fields.

required

Returns:

Type Description
Any

Operation result.

Raises:

Type Description
ValueError

If the command format is invalid.

snapshot

snapshot() -> dict[str, Any]

Return a copy of the current state.

restore

restore(snapshot: dict[str, Any]) -> None

Restore state from a snapshot.

StateMachine

Bases: Protocol

Protocol for deterministic state machines driven by consensus.

Commands from committed log entries are applied in order. The state machine must be deterministic: the same sequence of commands must always produce the same state.

apply

apply(command: Any) -> Any

Apply a command and return the result.

Parameters:

Name Type Description Default
command Any

The command to apply (implementation-defined format).

required

Returns:

Type Description
Any

The result of applying the command.

snapshot

snapshot() -> Any

Capture the current state for snapshotting.

Returns:

Type Description
Any

A serializable representation of the current state.

restore

restore(snapshot: Any) -> None

Restore state from a snapshot.

Parameters:

Name Type Description Default
snapshot Any

A previously captured snapshot.

required