Replication¶
Primary-backup, multi-leader, and chain replication strategies.
Replication protocol components for distributed systems simulation.
Provides components that model how data moves between replicas:
- ConflictResolver: Protocol + implementations (LWW, VectorClockMerge, Custom)
- PrimaryBackupReplication: Master-slave with sync/semi-sync/async modes
- ChainReplication: Head-to-tail writes, tail reads, CRAQ variant
- MultiLeaderReplication: Any-node writes with vector clock conflicts
ChainNode ¶
ChainNode(
name: str,
store: KVStore,
network: Entity,
role: ChainNodeRole = ChainNodeRole.MIDDLE,
craq_enabled: bool = False,
)
Bases: Entity
A node in a chain replication topology.
Depending on its role: - HEAD: Accepts Write events, applies locally, sends Propagate to next. Parks on a SimFuture until TAIL sends WriteAck. - MIDDLE: Receives Propagate, applies, forwards to next. - TAIL: Receives Propagate, applies, sends WriteAck back to HEAD.
With CRAQ enabled, any node can serve Read events for "clean" keys (keys that have been fully committed through the chain). Dirty keys are forwarded to the TAIL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Entity name. |
required |
store
|
KVStore
|
KVStore for local data. |
required |
network
|
Entity
|
Network for inter-node communication. |
required |
role
|
ChainNodeRole
|
This node's role in the chain. |
MIDDLE
|
craq_enabled
|
bool
|
Enable CRAQ read optimization. |
False
|
handle_event ¶
handle_event(
event: Event,
) -> Generator[
float | SimFuture | tuple[float, list[Event] | Event],
None,
list[Event] | Event | None,
]
Route events by type.
ChainNodeRole ¶
Bases: Enum
Role of a node in the replication chain.
ChainReplicationStats
dataclass
¶
ChainReplicationStats(
writes_received: int = 0,
propagations_sent: int = 0,
propagations_received: int = 0,
acks_sent: int = 0,
reads_served: int = 0,
)
Statistics for a ChainNode.
Attributes:
| Name | Type | Description |
|---|---|---|
writes_received |
int
|
Write events received (HEAD only). |
propagations_sent |
int
|
Propagate events forwarded to next node. |
propagations_received |
int
|
Propagate events received from prev node. |
acks_sent |
int
|
WriteAck events sent (TAIL only). |
reads_served |
int
|
Read events served. |
ConflictResolver ¶
Bases: Protocol
Protocol for resolving conflicting versions of a key.
Implementations choose a winning version when multiple replicas have divergent values for the same key.
resolve ¶
Choose a winning version from conflicting versions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key being resolved. |
required |
versions
|
list[VersionedValue]
|
Two or more conflicting versions (never empty). |
required |
Returns:
| Type | Description |
|---|---|
VersionedValue
|
The chosen winning version. |
CustomResolver ¶
Conflict resolver wrapping a user-supplied function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resolve_fn
|
Callable[[str, list[VersionedValue]], VersionedValue]
|
Function with signature (key, versions) -> VersionedValue. |
required |
resolve ¶
Delegate to the user-supplied function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key being resolved. |
required |
versions
|
list[VersionedValue]
|
Conflicting versions. |
required |
Returns:
| Type | Description |
|---|---|
VersionedValue
|
The version chosen by the user function. |
LastWriterWins ¶
Conflict resolver that picks the version with the highest timestamp.
Supports both float timestamps and HLCTimestamp. For HLCTimestamp, uses the built-in comparison (physical_ns, logical, node_id). For float timestamps, the highest value wins. Ties are broken by writer_id for determinism.
This is the simplest strategy, used by Cassandra and DynamoDB. Data loss is possible when concurrent writes have close timestamps.
resolve ¶
Pick the version with the highest timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key being resolved. |
required |
versions
|
list[VersionedValue]
|
Conflicting versions. |
required |
Returns:
| Type | Description |
|---|---|
VersionedValue
|
The version with the highest timestamp. |
VectorClockMerge ¶
VectorClockMerge(
merge_fn: Callable[
[str, VersionedValue, VersionedValue],
VersionedValue,
]
| None = None,
)
Conflict resolver using vector clock causal ordering.
If one version's vector clock causally dominates, that version wins.
If the versions are concurrent (neither dominates), delegates to a
user-supplied merge_fn. If no merge function is provided,
falls back to the version with the highest timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
merge_fn
|
Callable[[str, VersionedValue, VersionedValue], VersionedValue] | None
|
Optional function called for concurrent versions. Signature: (key, version_a, version_b) -> VersionedValue. |
None
|
resolve ¶
Resolve using vector clock ordering, merging concurrent versions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key being resolved. |
required |
versions
|
list[VersionedValue]
|
Conflicting versions (must have vector_clock set). |
required |
Returns:
| Type | Description |
|---|---|
VersionedValue
|
The causally dominant version, or the merge result for concurrent versions. |
VersionedValue
dataclass
¶
VersionedValue(
value: Any,
timestamp: float | HLCTimestamp,
writer_id: str,
vector_clock: dict[str, int] | None = None,
)
A value with version metadata for conflict resolution.
Attributes:
| Name | Type | Description |
|---|---|---|
value |
Any
|
The stored value. |
timestamp |
float | HLCTimestamp
|
Write timestamp (float seconds or HLCTimestamp). |
writer_id |
str
|
Identifier of the node that wrote this version. |
vector_clock |
dict[str, int] | None
|
Optional vector clock snapshot at write time. |
LeaderNode ¶
LeaderNode(
name: str,
store: KVStore,
network: Entity,
conflict_resolver: ConflictResolver | None = None,
anti_entropy_interval: float = 0.0,
)
Bases: Entity
A leader node in a multi-leader replication scheme.
Any node can accept writes. Writes are stamped with a VectorClock for conflict detection. Replication is asynchronous — writes are sent to all peers after local apply. Concurrent writes (detected via vector clock comparison) are resolved using the configured ConflictResolver.
Periodic anti-entropy compares MerkleTree root hashes with a random peer and repairs divergent keys.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Entity name (also used as VectorClock node ID). |
required |
store
|
KVStore
|
KVStore for local data. |
required |
network
|
Entity
|
Network for inter-node communication. |
required |
conflict_resolver
|
ConflictResolver | None
|
Strategy for resolving concurrent writes. |
None
|
anti_entropy_interval
|
float
|
Seconds between anti-entropy rounds (0 to disable). |
0.0
|
add_peers ¶
Set peer nodes for replication.
Also initializes the VectorClock with all node IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peers
|
list[Entity]
|
List of peer LeaderNode entities. |
required |
get_anti_entropy_event ¶
handle_event ¶
handle_event(
event: Event,
) -> Generator[
float | SimFuture | tuple[float, list[Event] | Event],
None,
list[Event] | Event | None,
]
Route events by type.
MultiLeaderStats
dataclass
¶
MultiLeaderStats(
writes: int = 0,
reads: int = 0,
replications_sent: int = 0,
replications_received: int = 0,
conflicts_detected: int = 0,
conflicts_resolved: int = 0,
anti_entropy_syncs: int = 0,
anti_entropy_keys_repaired: int = 0,
)
Statistics for a LeaderNode.
Attributes:
| Name | Type | Description |
|---|---|---|
writes |
int
|
Local write requests processed. |
reads |
int
|
Local read requests processed. |
replications_sent |
int
|
Replication messages sent to peers. |
replications_received |
int
|
Replication messages received from peers. |
conflicts_detected |
int
|
Number of concurrent write conflicts detected. |
conflicts_resolved |
int
|
Number of conflicts resolved. |
anti_entropy_syncs |
int
|
Number of anti-entropy synchronization rounds. |
anti_entropy_keys_repaired |
int
|
Keys repaired via anti-entropy. |
BackupNode ¶
BackupNode(
name: str,
store: KVStore,
network: Entity,
primary: Entity,
serve_reads: bool = True,
)
Bases: Entity
Backup node in a primary-backup replication scheme.
Receives Replicate events from the primary, applies them locally, and sends ReplicationAck. Can optionally serve Read events (stale data).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Entity name. |
required |
store
|
KVStore
|
KVStore for local data. |
required |
network
|
Entity
|
Network for sending ack messages. |
required |
primary
|
Entity
|
The primary node entity (for ack routing). |
required |
serve_reads
|
bool
|
Whether to serve Read events. |
True
|
handle_event ¶
handle_event(
event: Event,
) -> Generator[
float | tuple[float, list[Event] | Event],
None,
list[Event] | Event | None,
]
Handle Replicate and Read events.
BackupStats
dataclass
¶
Statistics for BackupNode.
Attributes:
| Name | Type | Description |
|---|---|---|
replications_applied |
int
|
Total replication events applied. |
reads |
int
|
Total read requests served. |
last_applied_seq |
int
|
Last sequence number applied from primary. |
PrimaryBackupStats
dataclass
¶
PrimaryBackupStats(
writes: int = 0,
reads: int = 0,
replications_sent: int = 0,
acks_received: int = 0,
write_latency_sum: float = 0.0,
)
Statistics for PrimaryNode.
Attributes:
| Name | Type | Description |
|---|---|---|
writes |
int
|
Total write requests received. |
reads |
int
|
Total read requests received. |
replications_sent |
int
|
Total replication messages sent to backups. |
acks_received |
int
|
Total acknowledgments received from backups. |
write_latency_sum |
float
|
Sum of write latencies for averaging. |
PrimaryNode ¶
PrimaryNode(
name: str,
store: KVStore,
backups: list[Entity],
network: Entity,
mode: ReplicationMode = ReplicationMode.ASYNC,
)
Bases: Entity
Primary node in a primary-backup replication scheme.
Accepts Write and Read events. On Write, applies locally and replicates to backups according to the configured mode. Tracks per-backup replication lag via monotonic sequence numbers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Entity name. |
required |
store
|
KVStore
|
KVStore for local data. |
required |
backups
|
list[Entity]
|
List of BackupNode entities. |
required |
network
|
Entity
|
Network for sending replication messages. |
required |
mode
|
ReplicationMode
|
Replication consistency mode. |
ASYNC
|
handle_event ¶
handle_event(
event: Event,
) -> Generator[
float | SimFuture | tuple[float, list[Event] | Event],
None,
list[Event] | Event | None,
]
Handle Write, Read, and ReplicationAck events.
ReplicationMode ¶
Bases: Enum
Write durability mode for primary-backup replication.
build_chain ¶
build_chain(
names: list[str],
network: Entity,
store_factory: Callable[[str], KVStore],
craq_enabled: bool = False,
) -> list[ChainNode]
Build a chain of ChainNodes with topology wired.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
names
|
list[str]
|
Names for each node. First is HEAD, last is TAIL. |
required |
network
|
Entity
|
Network entity for inter-node communication. |
required |
store_factory
|
Callable[[str], KVStore]
|
Factory function creating a KVStore for each node. |
required |
craq_enabled
|
bool
|
Enable CRAQ read optimization on all nodes. |
False
|
Returns:
| Type | Description |
|---|---|
list[ChainNode]
|
List of ChainNode instances with topology wired. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If fewer than 2 names are provided. |