Skip to content

Synchronization

Barriers, latches, and coordination primitives for multi-entity synchronization.

Synchronization primitives for concurrent resource access.

This module provides simulation-aware synchronization primitives that model concurrency control mechanisms like mutexes, semaphores, and read-write locks.

Example

from happysimulator.components.sync import Mutex, Semaphore, RWLock

Protect a critical section

mutex = Mutex(name="db_lock")

def handle_event(self, event): yield from mutex.acquire() try: yield 0.01 # Critical section finally: return mutex.release()

Barrier

Barrier(name: str, parties: int)

Bases: Entity

Synchronization point for multiple parties.

A barrier blocks all arriving parties until the specified number of parties have called wait(). Once all parties arrive, they are all released simultaneously and the barrier resets for the next round.

This is useful for phased computations where all workers must complete one phase before any can start the next.

Attributes:

Name Type Description
name

Entity name for identification.

parties int

Number of parties required to break the barrier.

waiting int

Number of parties currently waiting.

broken bool

Whether the barrier has been broken (error state).

generation int

Current barrier generation (increments each break).

Initialize the barrier.

Parameters:

Name Type Description Default
name str

Name for this barrier entity.

required
parties int

Number of parties required to break the barrier.

required

Raises:

Type Description
ValueError

If parties < 1.

parties property

parties: int

Number of parties required to break the barrier.

stats property

stats: BarrierStats

Frozen snapshot of current barrier statistics.

waiting property

waiting: int

Number of parties currently waiting at the barrier.

broken property

broken: bool

Whether the barrier is in a broken state.

generation property

generation: int

Current barrier generation (increments each time barrier breaks).

wait

wait() -> Generator[float, None, int]

Wait at the barrier until all parties arrive.

This is a generator that yields control while waiting for other parties. Use with 'yield from' in an event handler.

Returns:

Type Description
int

The arrival index (0 to parties-1). The last party to arrive

int

gets index 0, making it easy to designate a "leader".

Raises:

Type Description
RuntimeError

If the barrier is broken.

Yields:

Type Description
float

0.0 while waiting for other parties.

Example

def handle_event(self, event): arrival_index = yield from barrier.wait() if arrival_index == 0: # I'm the last to arrive, do leader tasks pass

reset

reset() -> None

Reset the barrier to its initial state.

Any parties currently waiting will be released with a RuntimeError on their next check (barrier broken).

This is useful for recovery scenarios or reusing the barrier after an error condition.

abort

abort() -> None

Permanently break the barrier.

All current and future wait() calls will raise RuntimeError. Use reset() to recover from this state.

handle_event

handle_event(event: Event) -> None

Barrier doesn't directly handle events.

BarrierStats dataclass

BarrierStats(
    wait_calls: int = 0,
    barrier_breaks: int = 0,
    resets: int = 0,
    total_wait_time_ns: int = 0,
)

Frozen snapshot of barrier statistics.

Condition

Condition(name: str, lock: Mutex)

Bases: Entity

Condition variable for complex synchronization.

A condition variable allows threads to wait for a condition to become true. It must be used with an associated mutex that protects the shared state being checked.

The wait() method atomically releases the mutex and waits. When woken, it reacquires the mutex before returning.

Attributes:

Name Type Description
name

Entity name for identification.

lock Mutex

Associated mutex.

waiters int

Number of threads waiting on this condition.

Initialize the condition variable.

Parameters:

Name Type Description Default
name str

Name for this condition entity.

required
lock Mutex

Associated mutex that protects the condition.

required

stats property

stats: ConditionStats

Frozen snapshot of current condition variable statistics.

lock property

lock: Mutex

The associated mutex.

waiters property

waiters: int

Number of threads waiting on this condition.

set_clock

set_clock(clock: Clock) -> None

Attach clock to this condition and its internal mutex.

wait

wait() -> Generator[float]

Wait for the condition to be signaled.

Atomically releases the associated mutex, waits for a signal, then reacquires the mutex before returning.

The caller should hold the mutex when calling wait(), and should check the actual condition in a loop (spurious wakeups are possible).

Yields:

Type Description
Generator[float]

0.0 while waiting and during mutex reacquisition.

Raises:

Type Description
RuntimeError

If the mutex is not held.

Example

yield from mutex.acquire() while not condition_is_true(): yield from condition.wait()

condition is now true, mutex is held

wait_for

wait_for(
    predicate: Callable[[], bool],
    timeout: float | None = None,
) -> Generator[float, None, bool]

Wait for a predicate to become true.

A convenience method that handles the wait loop automatically.

Parameters:

Name Type Description Default
predicate Callable[[], bool]

Callable that returns True when condition is met.

required
timeout float | None

Maximum time to wait in seconds (None = forever).

None

Yields:

Type Description
float

0.0 while waiting.

Returns:

Type Description
bool

True if predicate became true, False if timed out.

notify

notify(n: int = 1) -> list[Event]

Wake up to n waiting threads.

The woken threads will not run immediately; they will compete to reacquire the mutex when the current holder releases it.

Parameters:

Name Type Description Default
n int

Maximum number of threads to wake.

1

Returns:

Type Description
list[Event]

Empty list (waking is handled internally).

notify_all

notify_all() -> list[Event]

Wake all waiting threads.

Returns:

Type Description
list[Event]

Empty list (waking is handled internally).

handle_event

handle_event(event: Event) -> None

Condition doesn't directly handle events.

ConditionStats dataclass

ConditionStats(
    waits: int = 0,
    notifies: int = 0,
    notify_alls: int = 0,
    wakeups: int = 0,
    total_wait_time_ns: int = 0,
)

Frozen snapshot of condition variable statistics.

Mutex

Mutex(name: str)

Bases: Entity

Mutual exclusion lock with queued waiters.

Only one entity can hold the lock at a time. Other acquirers block until the lock is released, then wake in FIFO order.

The acquire() method is a generator that yields until the lock is obtained. The release() method returns events to wake the next waiter.

Attributes:

Name Type Description
name

Entity name for identification.

is_locked bool

Whether the lock is currently held.

waiters int

Number of entities waiting to acquire.

Initialize the mutex.

Parameters:

Name Type Description Default
name str

Name for this mutex entity.

required

is_locked property

is_locked: bool

Whether the lock is currently held.

waiters property

waiters: int

Number of entities waiting to acquire.

stats property

stats: MutexStats

Frozen snapshot of current mutex statistics.

owner property

owner: str | None

Current lock owner (if set via acquire with owner parameter).

try_acquire

try_acquire(owner: str | None = None) -> bool

Try to acquire the lock without blocking.

Parameters:

Name Type Description Default
owner str | None

Optional owner identifier for debugging.

None

Returns:

Type Description
bool

True if lock was acquired, False if already held.

acquire

acquire(owner: str | None = None) -> Generator[float]

Acquire the lock, blocking if necessary.

This is a generator that yields control while waiting for the lock. Use with 'yield from' in an event handler.

Parameters:

Name Type Description Default
owner str | None

Optional owner identifier for debugging.

None

Yields:

Type Description
Generator[float]

0.0 when lock is acquired (no additional delay).

Example

def handle_event(self, event): yield from self.mutex.acquire() # ... critical section ... return self.mutex.release()

release

release() -> list[Event]

Release the lock and wake the next waiter.

Returns:

Type Description
list[Event]

List of events to wake the next waiter, or empty if no waiters.

Raises:

Type Description
RuntimeError

If the lock is not currently held.

handle_event

handle_event(event: Event) -> None

Mutex doesn't directly handle events.

MutexStats dataclass

MutexStats(
    acquisitions: int = 0,
    releases: int = 0,
    contentions: int = 0,
    total_wait_time_ns: int = 0,
)

Frozen snapshot of mutex statistics.

RWLock

RWLock(name: str, max_readers: int | None = None)

Bases: Entity

Read-write lock allowing concurrent reads or exclusive write.

Multiple readers can hold the lock simultaneously. Writers get exclusive access - no readers or other writers allowed.

Write requests are prioritized to prevent writer starvation: once a writer is waiting, new readers must wait behind it.

Attributes:

Name Type Description
name

Entity name for identification.

active_readers int

Number of readers currently holding the lock.

is_write_locked bool

Whether a writer holds the lock.

max_readers int | None

Maximum concurrent readers (None = unlimited).

Initialize the read-write lock.

Parameters:

Name Type Description Default
name str

Name for this lock entity.

required
max_readers int | None

Maximum concurrent readers (None = unlimited).

None

Raises:

Type Description
ValueError

If max_readers < 1.

active_readers property

active_readers: int

Number of readers currently holding the lock.

is_write_locked property

is_write_locked: bool

Whether a writer holds the lock.

max_readers property

max_readers: int | None

Maximum concurrent readers.

stats property

stats: RWLockStats

Frozen snapshot of current read-write lock statistics.

waiters property

waiters: int

Total number of waiting readers and writers.

try_acquire_read

try_acquire_read() -> bool

Try to acquire a read lock without blocking.

Returns:

Type Description
bool

True if read lock was acquired, False otherwise.

try_acquire_write

try_acquire_write() -> bool

Try to acquire a write lock without blocking.

Returns:

Type Description
bool

True if write lock was acquired, False otherwise.

acquire_read

acquire_read() -> Generator[float]

Acquire a read lock, blocking if necessary.

Blocks if a writer holds the lock or a writer is waiting.

Yields:

Type Description
Generator[float]

0.0 when read lock is acquired.

acquire_write

acquire_write() -> Generator[float]

Acquire a write lock, blocking if necessary.

Blocks if any readers or another writer holds the lock.

Yields:

Type Description
Generator[float]

0.0 when write lock is acquired.

release_read

release_read() -> list[Event]

Release a read lock.

Returns:

Type Description
list[Event]

Empty list (waking is handled internally).

Raises:

Type Description
RuntimeError

If no read lock is held.

release_write

release_write() -> list[Event]

Release a write lock.

Returns:

Type Description
list[Event]

Empty list (waking is handled internally).

Raises:

Type Description
RuntimeError

If no write lock is held.

handle_event

handle_event(event: Event) -> None

RWLock doesn't directly handle events.

RWLockStats dataclass

RWLockStats(
    read_acquisitions: int = 0,
    write_acquisitions: int = 0,
    read_releases: int = 0,
    write_releases: int = 0,
    read_contentions: int = 0,
    write_contentions: int = 0,
    total_read_wait_ns: int = 0,
    total_write_wait_ns: int = 0,
    peak_readers: int = 0,
)

Frozen snapshot of read-write lock statistics.

Semaphore

Semaphore(name: str, initial_count: int)

Bases: Entity

Counting semaphore for resource limiting.

Allows up to N concurrent holders, where N is the initial count. Acquirers can request multiple permits at once.

Attributes:

Name Type Description
name

Entity name for identification.

available int

Number of permits currently available.

capacity int

Maximum number of permits (initial count).

Initialize the semaphore.

Parameters:

Name Type Description Default
name str

Name for this semaphore entity.

required
initial_count int

Initial number of available permits.

required

Raises:

Type Description
ValueError

If initial_count < 1.

available property

available: int

Number of permits currently available.

capacity property

capacity: int

Maximum number of permits.

stats property

stats: SemaphoreStats

Frozen snapshot of current semaphore statistics.

waiters property

waiters: int

Number of entities waiting to acquire.

try_acquire

try_acquire(count: int = 1) -> bool

Try to acquire permits without blocking.

Parameters:

Name Type Description Default
count int

Number of permits to acquire.

1

Returns:

Type Description
bool

True if permits were acquired, False if not enough available.

acquire

acquire(count: int = 1) -> Generator[float]

Acquire permits, blocking if necessary.

This is a generator that yields control while waiting for permits. Use with 'yield from' in an event handler.

Parameters:

Name Type Description Default
count int

Number of permits to acquire.

1

Yields:

Type Description
Generator[float]

0.0 when permits are acquired.

Raises:

Type Description
ValueError

If count < 1 or count > capacity.

release

release(count: int = 1) -> list[Event]

Release permits and wake waiting acquirers.

Parameters:

Name Type Description Default
count int

Number of permits to release.

1

Returns:

Type Description
list[Event]

Empty list (waking is handled internally).

Raises:

Type Description
ValueError

If count < 1 or would exceed capacity.

handle_event

handle_event(event: Event) -> None

Semaphore doesn't directly handle events.

SemaphoreStats dataclass

SemaphoreStats(
    acquisitions: int = 0,
    releases: int = 0,
    contentions: int = 0,
    total_wait_time_ns: int = 0,
    peak_waiters: int = 0,
)

Frozen snapshot of semaphore statistics.