Skip to content

CRDTs

Conflict-free Replicated Data Types: GCounter, PNCounter, LWWRegister, ORSet.

Conflict-free Replicated Data Types (CRDTs) for distributed simulation.

CRDTs are data structures that converge automatically after network partitions heal, without requiring consensus. They guarantee eventual consistency by ensuring merge operations are commutative, associative, and idempotent.

Provided CRDTs:

  • GCounter: Grow-only counter (increment only)
  • PNCounter: Positive-negative counter (increment and decrement)
  • LWWRegister: Last-writer-wins register (HLC timestamp ordering)
  • ORSet: Observed-remove set (add-wins semantics)

The CRDTStore entity wraps CRDTs in a key-value store with gossip-based replication for use in simulations.

CRDTStore

CRDTStore(
    name: str,
    network: Entity,
    crdt_factory: Callable[[str], CRDT] = lambda node_id: (
        LWWRegister(node_id)
    ),
    gossip_interval: float = 1.0,
)

Bases: Entity

Key-value store backed by CRDTs with gossip replication.

Each key maps to a CRDT instance created by crdt_factory. Writes apply directly to the local CRDT (no coordination). Gossip periodically synchronizes state with a random peer using full-state push-pull.

Parameters:

Name Type Description Default
name str

Entity name (also used as the node_id for CRDTs).

required
network Entity

Network entity for inter-node communication.

required
crdt_factory Callable[[str], CRDT]

Callable that creates a new CRDT given a node_id. Defaults to creating LWWRegister instances.

lambda node_id: LWWRegister(node_id)
gossip_interval float

Seconds between gossip rounds (0 to disable).

1.0

stats property

stats: CRDTStoreStats

Return a frozen snapshot of store statistics.

crdts property

crdts: dict[str, CRDT]

The local CRDT instances keyed by name.

convergence_lag property

convergence_lag: bool

True if local state hash differs from last-seen peer hash.

A rough indicator — not authoritative across all peers.

add_peers

add_peers(peers: list[Entity]) -> None

Set peer nodes for gossip replication.

Parameters:

Name Type Description Default
peers list[Entity]

List of peer CRDTStore entities.

required

get_gossip_event

get_gossip_event() -> Event | None

Create the first gossip daemon event.

Returns:

Type Description
Event | None

A daemon event for the first gossip tick, or None if

Event | None

gossip is disabled or there are no peers.

get_or_create

get_or_create(key: str) -> CRDT

Get or create a CRDT for the given key.

Parameters:

Name Type Description Default
key str

The key to look up or create.

required

Returns:

Type Description
CRDT

The CRDT instance for this key.

handle_event

handle_event(
    event: Event,
) -> Generator[
    float | SimFuture | tuple[float, list[Event] | Event],
    None,
    list[Event] | Event | None,
]

Route events by type.

CRDTStoreStats dataclass

CRDTStoreStats(
    writes: int = 0,
    reads: int = 0,
    gossip_sent: int = 0,
    gossip_received: int = 0,
    keys_merged: int = 0,
    convergence_checks: int = 0,
)

Statistics for a CRDTStore node.

Attributes:

Name Type Description
writes int

Local write operations processed.

reads int

Local read operations processed.

gossip_sent int

Gossip push messages sent.

gossip_received int

Gossip messages received (push or response).

keys_merged int

Total key-level merge operations performed.

convergence_checks int

Number of convergence hash comparisons.

GCounter

GCounter(node_id: str)

Grow-only counter CRDT.

Each node has its own monotonically increasing count. The total value is the sum across all nodes. Merge uses element-wise max, which is commutative, associative, and idempotent.

Parameters:

Name Type Description Default
node_id str

Identifier for this replica.

required

node_id property

node_id: str

This replica's identifier.

value property

value: int

Total count across all nodes.

increment

increment(n: int = 1) -> None

Increment this node's count.

Parameters:

Name Type Description Default
n int

Amount to increment (must be positive).

1

Raises:

Type Description
ValueError

If n is not positive.

node_value

node_value(node_id: str) -> int

Get a specific node's count.

Parameters:

Name Type Description Default
node_id str

The node to query.

required

Returns:

Type Description
int

The node's count, or 0 if unknown.

merge

merge(other: GCounter) -> None

Merge another G-Counter into this one (element-wise max).

Parameters:

Name Type Description Default
other GCounter

Another GCounter to merge from.

required

to_dict

to_dict() -> dict

Serialize to a plain dict.

from_dict classmethod

from_dict(data: dict) -> Self

Deserialize from a plain dict.

Parameters:

Name Type Description Default
data dict

Dict produced by to_dict().

required

LWWRegister

LWWRegister(
    node_id: str,
    value: Any = None,
    timestamp: HLCTimestamp | None = None,
)

Last-Writer-Wins register CRDT.

Stores a single value tagged with an HLCTimestamp. On merge, the value with the higher timestamp wins. HLCTimestamp provides total ordering via (physical_ns, logical, node_id).

Parameters:

Name Type Description Default
node_id str

Identifier for this replica.

required
value Any

Initial value (default None).

None
timestamp HLCTimestamp | None

Initial timestamp (default None = never written).

None

node_id property

node_id: str

This replica's identifier.

value property

value: Any

Current value of the register.

timestamp property

timestamp: HLCTimestamp | None

Timestamp of the current value.

get

get() -> Any

Return the current value (alias for value property).

set

set(value: Any, timestamp: HLCTimestamp) -> None

Set the value if the timestamp is newer than the current one.

If the register has never been written (timestamp is None), the value is always accepted.

Parameters:

Name Type Description Default
value Any

The new value.

required
timestamp HLCTimestamp

The HLC timestamp for this write.

required

merge

merge(other: LWWRegister) -> None

Merge another register into this one (highest timestamp wins).

If the other register has never been written, this is a no-op.

Parameters:

Name Type Description Default
other LWWRegister

Another LWWRegister to merge from.

required

to_dict

to_dict() -> dict

Serialize to a plain dict.

from_dict classmethod

from_dict(data: dict) -> Self

Deserialize from a plain dict.

Parameters:

Name Type Description Default
data dict

Dict produced by to_dict().

required

ORSet

ORSet(node_id: str)

Observed-Remove Set CRDT.

Maintains a dict mapping elements to sets of tags. Each tag is a (node_id, sequence_number) tuple, generated deterministically (no UUIDs) for reproducible tests.

Add-wins semantics: a concurrent add and remove of the same element results in the element being present (the new tag from the add survives the remove of old tags).

Parameters:

Name Type Description Default
node_id str

Identifier for this replica.

required

node_id property

node_id: str

This replica's identifier.

value property

value: frozenset

Current elements in the set (alias for elements).

elements property

elements: frozenset

Frozenset of elements that have at least one tag.

add

add(element: Any) -> None

Add an element with a new unique tag.

Parameters:

Name Type Description Default
element Any

The element to add.

required

remove

remove(element: Any) -> None

Remove an element by clearing all its observed tags.

If the element is not present, this is a no-op.

Parameters:

Name Type Description Default
element Any

The element to remove.

required

contains

contains(element: Any) -> bool

Check if an element is in the set.

Parameters:

Name Type Description Default
element Any

The element to check.

required

Returns:

Type Description
bool

True if the element has at least one tag.

merge

merge(other: ORSet) -> None

Merge another OR-Set into this one.

For each element, the resulting tag set is the union of both replicas' tags. This means: - Elements added on either side are present. - An element removed on one side but concurrently added on the other survives (add-wins).

Parameters:

Name Type Description Default
other ORSet

Another ORSet to merge from.

required

to_dict

to_dict() -> dict

Serialize to a plain dict.

from_dict classmethod

from_dict(data: dict) -> Self

Deserialize from a plain dict.

Parameters:

Name Type Description Default
data dict

Dict produced by to_dict().

required

PNCounter

PNCounter(node_id: str)

Positive-Negative counter CRDT.

Wraps two G-Counters: _p for positive increments and _n for negative decrements. The value is _p.value - _n.value.

Parameters:

Name Type Description Default
node_id str

Identifier for this replica.

required

node_id property

node_id: str

This replica's identifier.

value property

value: int

Net count (increments - decrements).

increments property

increments: int

Total increments across all nodes.

decrements property

decrements: int

Total decrements across all nodes.

increment

increment(n: int = 1) -> None

Increment the counter.

Parameters:

Name Type Description Default
n int

Amount to increment (must be positive).

1

decrement

decrement(n: int = 1) -> None

Decrement the counter.

Parameters:

Name Type Description Default
n int

Amount to decrement (must be positive).

1

merge

merge(other: PNCounter) -> None

Merge another PN-Counter into this one.

Merges both the P and N G-Counters independently.

Parameters:

Name Type Description Default
other PNCounter

Another PNCounter to merge from.

required

to_dict

to_dict() -> dict

Serialize to a plain dict.

from_dict classmethod

from_dict(data: dict) -> Self

Deserialize from a plain dict.

Parameters:

Name Type Description Default
data dict

Dict produced by to_dict().

required

CRDT

Bases: Protocol

Protocol for all CRDT types.

All CRDTs must support: - value: Read the current resolved value. - merge(other): Merge another replica's state (in-place). - to_dict() / from_dict(): Serialization for gossip.

value property

value: Any

The current resolved value of this CRDT.

merge

merge(other: Self) -> None

Merge another replica's state into this one (in-place).

Must be commutative, associative, and idempotent.

Parameters:

Name Type Description Default
other Self

Another instance of the same CRDT type.

required

to_dict

to_dict() -> dict

Serialize this CRDT's state to a plain dict for gossip.

from_dict classmethod

from_dict(data: dict) -> Self

Deserialize a CRDT from a plain dict.

Parameters:

Name Type Description Default
data dict

Dict produced by to_dict().

required