Skip to content

Replication

Primary-backup, multi-leader, and chain replication strategies.

Replication protocol components for distributed systems simulation.

Provides components that model how data moves between replicas:

  • ConflictResolver: Protocol + implementations (LWW, VectorClockMerge, Custom)
  • PrimaryBackupReplication: Master-slave with sync/semi-sync/async modes
  • ChainReplication: Head-to-tail writes, tail reads, CRAQ variant
  • MultiLeaderReplication: Any-node writes with vector clock conflicts

ChainNode

ChainNode(
    name: str,
    store: KVStore,
    network: Entity,
    role: ChainNodeRole = ChainNodeRole.MIDDLE,
    craq_enabled: bool = False,
)

Bases: Entity

A node in a chain replication topology.

Depending on its role: - HEAD: Accepts Write events, applies locally, sends Propagate to next. Parks on a SimFuture until TAIL sends WriteAck. - MIDDLE: Receives Propagate, applies, forwards to next. - TAIL: Receives Propagate, applies, sends WriteAck back to HEAD.

With CRAQ enabled, any node can serve Read events for "clean" keys (keys that have been fully committed through the chain). Dirty keys are forwarded to the TAIL.

Parameters:

Name Type Description Default
name str

Entity name.

required
store KVStore

KVStore for local data.

required
network Entity

Network for inter-node communication.

required
role ChainNodeRole

This node's role in the chain.

MIDDLE
craq_enabled bool

Enable CRAQ read optimization.

False

stats property

stats: ChainReplicationStats

Frozen snapshot of chain node statistics.

role property

role: ChainNodeRole

This node's role in the chain.

store property

store: KVStore

The underlying KVStore.

dirty_keys property

dirty_keys: set[str]

Keys with uncommitted writes (CRAQ).

handle_event

handle_event(
    event: Event,
) -> Generator[
    float | SimFuture | tuple[float, list[Event] | Event],
    None,
    list[Event] | Event | None,
]

Route events by type.

ChainNodeRole

Bases: Enum

Role of a node in the replication chain.

ChainReplicationStats dataclass

ChainReplicationStats(
    writes_received: int = 0,
    propagations_sent: int = 0,
    propagations_received: int = 0,
    acks_sent: int = 0,
    reads_served: int = 0,
)

Statistics for a ChainNode.

Attributes:

Name Type Description
writes_received int

Write events received (HEAD only).

propagations_sent int

Propagate events forwarded to next node.

propagations_received int

Propagate events received from prev node.

acks_sent int

WriteAck events sent (TAIL only).

reads_served int

Read events served.

ConflictResolver

Bases: Protocol

Protocol for resolving conflicting versions of a key.

Implementations choose a winning version when multiple replicas have divergent values for the same key.

resolve

resolve(
    key: str, versions: list[VersionedValue]
) -> VersionedValue

Choose a winning version from conflicting versions.

Parameters:

Name Type Description Default
key str

The key being resolved.

required
versions list[VersionedValue]

Two or more conflicting versions (never empty).

required

Returns:

Type Description
VersionedValue

The chosen winning version.

CustomResolver

CustomResolver(
    resolve_fn: Callable[
        [str, list[VersionedValue]], VersionedValue
    ],
)

Conflict resolver wrapping a user-supplied function.

Parameters:

Name Type Description Default
resolve_fn Callable[[str, list[VersionedValue]], VersionedValue]

Function with signature (key, versions) -> VersionedValue.

required

resolve

resolve(
    key: str, versions: list[VersionedValue]
) -> VersionedValue

Delegate to the user-supplied function.

Parameters:

Name Type Description Default
key str

The key being resolved.

required
versions list[VersionedValue]

Conflicting versions.

required

Returns:

Type Description
VersionedValue

The version chosen by the user function.

LastWriterWins

Conflict resolver that picks the version with the highest timestamp.

Supports both float timestamps and HLCTimestamp. For HLCTimestamp, uses the built-in comparison (physical_ns, logical, node_id). For float timestamps, the highest value wins. Ties are broken by writer_id for determinism.

This is the simplest strategy, used by Cassandra and DynamoDB. Data loss is possible when concurrent writes have close timestamps.

resolve

resolve(
    key: str, versions: list[VersionedValue]
) -> VersionedValue

Pick the version with the highest timestamp.

Parameters:

Name Type Description Default
key str

The key being resolved.

required
versions list[VersionedValue]

Conflicting versions.

required

Returns:

Type Description
VersionedValue

The version with the highest timestamp.

VectorClockMerge

VectorClockMerge(
    merge_fn: Callable[
        [str, VersionedValue, VersionedValue],
        VersionedValue,
    ]
    | None = None,
)

Conflict resolver using vector clock causal ordering.

If one version's vector clock causally dominates, that version wins. If the versions are concurrent (neither dominates), delegates to a user-supplied merge_fn. If no merge function is provided, falls back to the version with the highest timestamp.

Parameters:

Name Type Description Default
merge_fn Callable[[str, VersionedValue, VersionedValue], VersionedValue] | None

Optional function called for concurrent versions. Signature: (key, version_a, version_b) -> VersionedValue.

None

resolve

resolve(
    key: str, versions: list[VersionedValue]
) -> VersionedValue

Resolve using vector clock ordering, merging concurrent versions.

Parameters:

Name Type Description Default
key str

The key being resolved.

required
versions list[VersionedValue]

Conflicting versions (must have vector_clock set).

required

Returns:

Type Description
VersionedValue

The causally dominant version, or the merge result for concurrent versions.

VersionedValue dataclass

VersionedValue(
    value: Any,
    timestamp: float | HLCTimestamp,
    writer_id: str,
    vector_clock: dict[str, int] | None = None,
)

A value with version metadata for conflict resolution.

Attributes:

Name Type Description
value Any

The stored value.

timestamp float | HLCTimestamp

Write timestamp (float seconds or HLCTimestamp).

writer_id str

Identifier of the node that wrote this version.

vector_clock dict[str, int] | None

Optional vector clock snapshot at write time.

LeaderNode

LeaderNode(
    name: str,
    store: KVStore,
    network: Entity,
    conflict_resolver: ConflictResolver | None = None,
    anti_entropy_interval: float = 0.0,
)

Bases: Entity

A leader node in a multi-leader replication scheme.

Any node can accept writes. Writes are stamped with a VectorClock for conflict detection. Replication is asynchronous — writes are sent to all peers after local apply. Concurrent writes (detected via vector clock comparison) are resolved using the configured ConflictResolver.

Periodic anti-entropy compares MerkleTree root hashes with a random peer and repairs divergent keys.

Parameters:

Name Type Description Default
name str

Entity name (also used as VectorClock node ID).

required
store KVStore

KVStore for local data.

required
network Entity

Network for inter-node communication.

required
conflict_resolver ConflictResolver | None

Strategy for resolving concurrent writes.

None
anti_entropy_interval float

Seconds between anti-entropy rounds (0 to disable).

0.0

stats property

stats: MultiLeaderStats

Frozen snapshot of leader node statistics.

store property

store: KVStore

The underlying KVStore.

peers property

peers: list[Entity]

Current peer nodes.

merkle_tree property

merkle_tree: MerkleTree

The MerkleTree for anti-entropy synchronization.

versions property

versions: dict[str, VersionedValue]

Per-key versioned values.

add_peers

add_peers(peers: list[Entity]) -> None

Set peer nodes for replication.

Also initializes the VectorClock with all node IDs.

Parameters:

Name Type Description Default
peers list[Entity]

List of peer LeaderNode entities.

required

get_anti_entropy_event

get_anti_entropy_event() -> Event | None

Create the first anti-entropy daemon event.

Returns:

Type Description
Event | None

A daemon event scheduled for the first anti-entropy interval,

Event | None

or None if anti-entropy is disabled.

handle_event

handle_event(
    event: Event,
) -> Generator[
    float | SimFuture | tuple[float, list[Event] | Event],
    None,
    list[Event] | Event | None,
]

Route events by type.

MultiLeaderStats dataclass

MultiLeaderStats(
    writes: int = 0,
    reads: int = 0,
    replications_sent: int = 0,
    replications_received: int = 0,
    conflicts_detected: int = 0,
    conflicts_resolved: int = 0,
    anti_entropy_syncs: int = 0,
    anti_entropy_keys_repaired: int = 0,
)

Statistics for a LeaderNode.

Attributes:

Name Type Description
writes int

Local write requests processed.

reads int

Local read requests processed.

replications_sent int

Replication messages sent to peers.

replications_received int

Replication messages received from peers.

conflicts_detected int

Number of concurrent write conflicts detected.

conflicts_resolved int

Number of conflicts resolved.

anti_entropy_syncs int

Number of anti-entropy synchronization rounds.

anti_entropy_keys_repaired int

Keys repaired via anti-entropy.

BackupNode

BackupNode(
    name: str,
    store: KVStore,
    network: Entity,
    primary: Entity,
    serve_reads: bool = True,
)

Bases: Entity

Backup node in a primary-backup replication scheme.

Receives Replicate events from the primary, applies them locally, and sends ReplicationAck. Can optionally serve Read events (stale data).

Parameters:

Name Type Description Default
name str

Entity name.

required
store KVStore

KVStore for local data.

required
network Entity

Network for sending ack messages.

required
primary Entity

The primary node entity (for ack routing).

required
serve_reads bool

Whether to serve Read events.

True

stats property

stats: BackupStats

Frozen snapshot of backup node statistics.

store property

store: KVStore

The underlying KVStore.

last_applied_seq property

last_applied_seq: int

Last applied sequence number.

handle_event

handle_event(
    event: Event,
) -> Generator[
    float | tuple[float, list[Event] | Event],
    None,
    list[Event] | Event | None,
]

Handle Replicate and Read events.

BackupStats dataclass

BackupStats(
    replications_applied: int = 0,
    reads: int = 0,
    last_applied_seq: int = 0,
)

Statistics for BackupNode.

Attributes:

Name Type Description
replications_applied int

Total replication events applied.

reads int

Total read requests served.

last_applied_seq int

Last sequence number applied from primary.

PrimaryBackupStats dataclass

PrimaryBackupStats(
    writes: int = 0,
    reads: int = 0,
    replications_sent: int = 0,
    acks_received: int = 0,
    write_latency_sum: float = 0.0,
)

Statistics for PrimaryNode.

Attributes:

Name Type Description
writes int

Total write requests received.

reads int

Total read requests received.

replications_sent int

Total replication messages sent to backups.

acks_received int

Total acknowledgments received from backups.

write_latency_sum float

Sum of write latencies for averaging.

PrimaryNode

PrimaryNode(
    name: str,
    store: KVStore,
    backups: list[Entity],
    network: Entity,
    mode: ReplicationMode = ReplicationMode.ASYNC,
)

Bases: Entity

Primary node in a primary-backup replication scheme.

Accepts Write and Read events. On Write, applies locally and replicates to backups according to the configured mode. Tracks per-backup replication lag via monotonic sequence numbers.

Parameters:

Name Type Description Default
name str

Entity name.

required
store KVStore

KVStore for local data.

required
backups list[Entity]

List of BackupNode entities.

required
network Entity

Network for sending replication messages.

required
mode ReplicationMode

Replication consistency mode.

ASYNC

stats property

stats: PrimaryBackupStats

Frozen snapshot of primary node statistics.

mode property

mode: ReplicationMode

Current replication mode.

backup_lag property

backup_lag: dict[str, int]

Per-backup replication lag in sequence numbers.

store property

store: KVStore

The underlying KVStore.

handle_event

handle_event(
    event: Event,
) -> Generator[
    float | SimFuture | tuple[float, list[Event] | Event],
    None,
    list[Event] | Event | None,
]

Handle Write, Read, and ReplicationAck events.

ReplicationMode

Bases: Enum

Write durability mode for primary-backup replication.

build_chain

build_chain(
    names: list[str],
    network: Entity,
    store_factory: Callable[[str], KVStore],
    craq_enabled: bool = False,
) -> list[ChainNode]

Build a chain of ChainNodes with topology wired.

Parameters:

Name Type Description Default
names list[str]

Names for each node. First is HEAD, last is TAIL.

required
network Entity

Network entity for inter-node communication.

required
store_factory Callable[[str], KVStore]

Factory function creating a KVStore for each node.

required
craq_enabled bool

Enable CRAQ read optimization on all nodes.

False

Returns:

Type Description
list[ChainNode]

List of ChainNode instances with topology wired.

Raises:

Type Description
ValueError

If fewer than 2 names are provided.