Synchronization¶
Barriers, latches, and coordination primitives for multi-entity synchronization.
Synchronization primitives for concurrent resource access.
This module provides simulation-aware synchronization primitives that model concurrency control mechanisms like mutexes, semaphores, and read-write locks.
Example
from happysimulator.components.sync import Mutex, Semaphore, RWLock
Protect a critical section¶
mutex = Mutex(name="db_lock")
def handle_event(self, event): yield from mutex.acquire() try: yield 0.01 # Critical section finally: return mutex.release()
Barrier ¶
Bases: Entity
Synchronization point for multiple parties.
A barrier blocks all arriving parties until the specified number of parties have called wait(). Once all parties arrive, they are all released simultaneously and the barrier resets for the next round.
This is useful for phased computations where all workers must complete one phase before any can start the next.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
|
parties |
int
|
Number of parties required to break the barrier. |
waiting |
int
|
Number of parties currently waiting. |
broken |
bool
|
Whether the barrier has been broken (error state). |
generation |
int
|
Current barrier generation (increments each break). |
Initialize the barrier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this barrier entity. |
required |
parties
|
int
|
Number of parties required to break the barrier. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If parties < 1. |
generation
property
¶
Current barrier generation (increments each time barrier breaks).
wait ¶
Wait at the barrier until all parties arrive.
This is a generator that yields control while waiting for other parties. Use with 'yield from' in an event handler.
Returns:
| Type | Description |
|---|---|
int
|
The arrival index (0 to parties-1). The last party to arrive |
int
|
gets index 0, making it easy to designate a "leader". |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the barrier is broken. |
Yields:
| Type | Description |
|---|---|
float
|
0.0 while waiting for other parties. |
Example
def handle_event(self, event): arrival_index = yield from barrier.wait() if arrival_index == 0: # I'm the last to arrive, do leader tasks pass
reset ¶
Reset the barrier to its initial state.
Any parties currently waiting will be released with a RuntimeError on their next check (barrier broken).
This is useful for recovery scenarios or reusing the barrier after an error condition.
abort ¶
Permanently break the barrier.
All current and future wait() calls will raise RuntimeError. Use reset() to recover from this state.
BarrierStats
dataclass
¶
BarrierStats(
wait_calls: int = 0,
barrier_breaks: int = 0,
resets: int = 0,
total_wait_time_ns: int = 0,
)
Frozen snapshot of barrier statistics.
Condition ¶
Bases: Entity
Condition variable for complex synchronization.
A condition variable allows threads to wait for a condition to become true. It must be used with an associated mutex that protects the shared state being checked.
The wait() method atomically releases the mutex and waits. When woken, it reacquires the mutex before returning.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
|
lock |
Mutex
|
Associated mutex. |
waiters |
int
|
Number of threads waiting on this condition. |
Initialize the condition variable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this condition entity. |
required |
lock
|
Mutex
|
Associated mutex that protects the condition. |
required |
wait ¶
Wait for the condition to be signaled.
Atomically releases the associated mutex, waits for a signal, then reacquires the mutex before returning.
The caller should hold the mutex when calling wait(), and should check the actual condition in a loop (spurious wakeups are possible).
Yields:
| Type | Description |
|---|---|
Generator[float]
|
0.0 while waiting and during mutex reacquisition. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the mutex is not held. |
Example
yield from mutex.acquire() while not condition_is_true(): yield from condition.wait()
condition is now true, mutex is held¶
wait_for ¶
wait_for(
predicate: Callable[[], bool],
timeout: float | None = None,
) -> Generator[float, None, bool]
Wait for a predicate to become true.
A convenience method that handles the wait loop automatically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
predicate
|
Callable[[], bool]
|
Callable that returns True when condition is met. |
required |
timeout
|
float | None
|
Maximum time to wait in seconds (None = forever). |
None
|
Yields:
| Type | Description |
|---|---|
float
|
0.0 while waiting. |
Returns:
| Type | Description |
|---|---|
bool
|
True if predicate became true, False if timed out. |
notify ¶
Wake up to n waiting threads.
The woken threads will not run immediately; they will compete to reacquire the mutex when the current holder releases it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n
|
int
|
Maximum number of threads to wake. |
1
|
Returns:
| Type | Description |
|---|---|
list[Event]
|
Empty list (waking is handled internally). |
ConditionStats
dataclass
¶
ConditionStats(
waits: int = 0,
notifies: int = 0,
notify_alls: int = 0,
wakeups: int = 0,
total_wait_time_ns: int = 0,
)
Frozen snapshot of condition variable statistics.
Mutex ¶
Bases: Entity
Mutual exclusion lock with queued waiters.
Only one entity can hold the lock at a time. Other acquirers block until the lock is released, then wake in FIFO order.
The acquire() method is a generator that yields until the lock is obtained. The release() method returns events to wake the next waiter.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
|
is_locked |
bool
|
Whether the lock is currently held. |
waiters |
int
|
Number of entities waiting to acquire. |
Initialize the mutex.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this mutex entity. |
required |
try_acquire ¶
Try to acquire the lock without blocking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
owner
|
str | None
|
Optional owner identifier for debugging. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if lock was acquired, False if already held. |
acquire ¶
Acquire the lock, blocking if necessary.
This is a generator that yields control while waiting for the lock. Use with 'yield from' in an event handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
owner
|
str | None
|
Optional owner identifier for debugging. |
None
|
Yields:
| Type | Description |
|---|---|
Generator[float]
|
0.0 when lock is acquired (no additional delay). |
Example
def handle_event(self, event): yield from self.mutex.acquire() # ... critical section ... return self.mutex.release()
MutexStats
dataclass
¶
MutexStats(
acquisitions: int = 0,
releases: int = 0,
contentions: int = 0,
total_wait_time_ns: int = 0,
)
Frozen snapshot of mutex statistics.
RWLock ¶
Bases: Entity
Read-write lock allowing concurrent reads or exclusive write.
Multiple readers can hold the lock simultaneously. Writers get exclusive access - no readers or other writers allowed.
Write requests are prioritized to prevent writer starvation: once a writer is waiting, new readers must wait behind it.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
|
active_readers |
int
|
Number of readers currently holding the lock. |
is_write_locked |
bool
|
Whether a writer holds the lock. |
max_readers |
int | None
|
Maximum concurrent readers (None = unlimited). |
Initialize the read-write lock.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this lock entity. |
required |
max_readers
|
int | None
|
Maximum concurrent readers (None = unlimited). |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If max_readers < 1. |
try_acquire_read ¶
Try to acquire a read lock without blocking.
Returns:
| Type | Description |
|---|---|
bool
|
True if read lock was acquired, False otherwise. |
try_acquire_write ¶
Try to acquire a write lock without blocking.
Returns:
| Type | Description |
|---|---|
bool
|
True if write lock was acquired, False otherwise. |
acquire_read ¶
Acquire a read lock, blocking if necessary.
Blocks if a writer holds the lock or a writer is waiting.
Yields:
| Type | Description |
|---|---|
Generator[float]
|
0.0 when read lock is acquired. |
acquire_write ¶
Acquire a write lock, blocking if necessary.
Blocks if any readers or another writer holds the lock.
Yields:
| Type | Description |
|---|---|
Generator[float]
|
0.0 when write lock is acquired. |
release_read ¶
Release a read lock.
Returns:
| Type | Description |
|---|---|
list[Event]
|
Empty list (waking is handled internally). |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If no read lock is held. |
RWLockStats
dataclass
¶
RWLockStats(
read_acquisitions: int = 0,
write_acquisitions: int = 0,
read_releases: int = 0,
write_releases: int = 0,
read_contentions: int = 0,
write_contentions: int = 0,
total_read_wait_ns: int = 0,
total_write_wait_ns: int = 0,
peak_readers: int = 0,
)
Frozen snapshot of read-write lock statistics.
Semaphore ¶
Bases: Entity
Counting semaphore for resource limiting.
Allows up to N concurrent holders, where N is the initial count. Acquirers can request multiple permits at once.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Entity name for identification. |
|
available |
int
|
Number of permits currently available. |
capacity |
int
|
Maximum number of permits (initial count). |
Initialize the semaphore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this semaphore entity. |
required |
initial_count
|
int
|
Initial number of available permits. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If initial_count < 1. |
try_acquire ¶
Try to acquire permits without blocking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
Number of permits to acquire. |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
True if permits were acquired, False if not enough available. |
acquire ¶
Acquire permits, blocking if necessary.
This is a generator that yields control while waiting for permits. Use with 'yield from' in an event handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
Number of permits to acquire. |
1
|
Yields:
| Type | Description |
|---|---|
Generator[float]
|
0.0 when permits are acquired. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If count < 1 or count > capacity. |
SemaphoreStats
dataclass
¶
SemaphoreStats(
acquisitions: int = 0,
releases: int = 0,
contentions: int = 0,
total_wait_time_ns: int = 0,
peak_waiters: int = 0,
)
Frozen snapshot of semaphore statistics.