Skip to content

Microservice

Microservice patterns: sagas, sidecars, outbox relay, API gateway, idempotency.

Microservice pattern components for distributed system simulations.

APIGateway

APIGateway(
    name: str,
    routes: dict[str, RouteConfig],
    auth_latency: float = 0.001,
    auth_failure_rate: float = 0.0,
    route_extractor: Callable[[Event], str | None]
    | None = None,
)

Bases: Entity

API gateway with per-route rate limiting and backend routing.

Incoming requests are routed based on a route key extracted from the event. Each route can have its own rate limit policy, auth requirement, and pool of backends.

Request flow: 1. Extract route from event (via route_extractor) 2. Auth check (simulated latency + probabilistic failure) 3. Per-route rate limit check 4. Select backend via round-robin 5. Forward request to backend

Attributes:

Name Type Description
name

Gateway identifier.

stats APIGatewayStats

Accumulated statistics.

Initialize the API gateway.

Parameters:

Name Type Description Default
name str

Gateway identifier.

required
routes dict[str, RouteConfig]

Mapping from route key to RouteConfig.

required
auth_latency float

Simulated auth check latency in seconds.

0.001
auth_failure_rate float

Probability of auth failure (0.0 to 1.0).

0.0
route_extractor Callable[[Event], str | None] | None

Function to extract route key from event. Defaults to reading metadata.route.

None

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: APIGatewayStats

Return a frozen snapshot of current statistics.

routes property

routes: dict[str, RouteConfig]

The configured routes.

handle_event

handle_event(
    event: Event,
) -> (
    Generator[float, None, list[Event]] | list[Event] | None
)

Route incoming requests through the gateway pipeline.

Parameters:

Name Type Description Default
event Event

The incoming event.

required

Returns:

Type Description
Generator[float, None, list[Event]] | list[Event] | None

Events to schedule, generator for async auth, or None.

APIGatewayStats dataclass

APIGatewayStats(
    total_requests: int = 0,
    requests_routed: int = 0,
    requests_rejected_auth: int = 0,
    requests_rejected_rate_limit: int = 0,
    requests_no_route: int = 0,
    requests_no_backend: int = 0,
    per_route_requests: dict[str, int] = dict(),
)

Statistics tracked by APIGateway.

RouteConfig dataclass

RouteConfig(
    name: str,
    backends: list[Entity] = list(),
    rate_limit_policy: RateLimiterPolicy | None = None,
    auth_required: bool = True,
    timeout: float | None = None,
)

Configuration for a single API gateway route.

Parameters:

Name Type Description Default
name str

Human-readable route name.

required
backends list[Entity]

List of backend entities for round-robin selection.

list()
rate_limit_policy RateLimiterPolicy | None

Optional per-route rate limiter.

None
auth_required bool

Whether this route requires authentication.

True
timeout float | None

Optional per-route request timeout in seconds.

None

IdempotencyStore

IdempotencyStore(
    name: str,
    target: Entity,
    key_extractor: Callable[[Event], str | None],
    ttl: float = 300.0,
    max_entries: int = 10000,
    cleanup_interval: float = 60.0,
)

Bases: Entity

Deduplicates requests based on an idempotency key.

Incoming events have their idempotency key extracted via key_extractor. If the key is already cached, the event is silently dropped (duplicate). Otherwise, the event is forwarded to the target and the key is cached upon completion.

A periodic cleanup daemon expires entries older than ttl seconds.

Attributes:

Name Type Description
name

Store identifier.

stats IdempotencyStoreStats

Accumulated statistics.

Initialize the idempotency store.

Parameters:

Name Type Description Default
name str

Store identifier.

required
target Entity

Downstream entity to forward unique requests to.

required
key_extractor Callable[[Event], str | None]

Function that extracts an idempotency key from an event. Return None to forward unconditionally (no dedup).

required
ttl float

Time-to-live in seconds for cached entries.

300.0
max_entries int

Maximum cache size. Oldest entries evicted on overflow.

10000
cleanup_interval float

Seconds between TTL cleanup sweeps.

60.0

Raises:

Type Description
ValueError

If ttl, max_entries, or cleanup_interval are invalid.

stats property

stats: IdempotencyStoreStats

Return a frozen snapshot of current statistics.

target property

target: Entity

The protected target entity.

cache_size property

cache_size: int

Current number of cached entries.

in_flight_count property

in_flight_count: int

Number of requests currently being processed.

handle_event

handle_event(event: Event) -> list[Event] | None

Route events: cleanup daemon vs request dedup.

Parameters:

Name Type Description Default
event Event

The incoming event.

required

Returns:

Type Description
list[Event] | None

Events to schedule, or None for duplicates.

IdempotencyStoreStats dataclass

IdempotencyStoreStats(
    total_requests: int = 0,
    cache_hits: int = 0,
    cache_misses: int = 0,
    entries_expired: int = 0,
    entries_stored: int = 0,
)

Statistics tracked by IdempotencyStore.

OutboxEntry dataclass

OutboxEntry(
    entry_id: int,
    payload: dict[str, Any],
    written_at: Instant,
    relayed: bool = False,
)

A single entry in the outbox.

OutboxRelay

OutboxRelay(
    name: str,
    downstream: Entity,
    poll_interval: float = 0.1,
    batch_size: int = 100,
    relay_latency: float = 0.001,
)

Bases: Entity

Transactional outbox with poll-based relay to downstream.

Events are written to an in-memory outbox via write(). A self-scheduling poll daemon reads unrelayed entries in batches and forwards them to the downstream entity.

The relay lag (time between write and relay) is tracked for observability.

Attributes:

Name Type Description
name

Outbox identifier.

stats OutboxRelayStats

Accumulated statistics.

Initialize the outbox relay.

Parameters:

Name Type Description Default
name str

Outbox identifier.

required
downstream Entity

Entity to relay entries to.

required
poll_interval float

Seconds between poll cycles.

0.1
batch_size int

Maximum entries relayed per poll cycle.

100
relay_latency float

Simulated per-entry relay latency in seconds.

0.001

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: OutboxRelayStats

Return a frozen snapshot of current statistics.

downstream property

downstream: Entity

The relay target entity.

pending_count property

pending_count: int

Number of entries waiting to be relayed.

total_entries property

total_entries: int

Total entries in the outbox (including relayed).

write

write(payload: dict[str, Any]) -> int

Write an entry to the outbox.

This is a regular method (not a generator) intended to be called by user entities during their handle_event() processing.

Parameters:

Name Type Description Default
payload dict[str, Any]

The event payload to relay downstream.

required

Returns:

Type Description
int

The entry ID for tracking.

handle_event

handle_event(
    event: Event,
) -> Generator[float, None, list[Event]] | list[Event]

Handle poll events and prime the poll loop.

Parameters:

Name Type Description Default
event Event

The incoming event.

required

Returns:

Type Description
Generator[float, None, list[Event]] | list[Event]

Events to schedule.

prime_poll

prime_poll() -> Event

Create the initial poll event to start the relay loop.

Call this and schedule the returned event to begin relaying. Alternatively, sending any event to the outbox will auto-prime.

Returns:

Type Description
Event

The initial poll event.

OutboxRelayStats dataclass

OutboxRelayStats(
    entries_written: int = 0,
    entries_relayed: int = 0,
    relay_failures: int = 0,
    poll_cycles: int = 0,
    relay_lag_sum: float = 0.0,
    relay_lag_max: float = 0.0,
)

Statistics tracked by OutboxRelay.

avg_relay_lag property

avg_relay_lag: float

Average relay lag in seconds.

Saga

Saga(
    name: str,
    steps: list[SagaStep],
    on_complete: Callable[
        [int, SagaState, list[SagaStepResult]], None
    ]
    | None = None,
)

Bases: Entity

Distributed transaction orchestrator using the saga pattern.

Executes steps in sequence. Each step sends an action event to its target and waits for completion (via completion hooks). If a step times out, the saga enters compensating state and executes compensation for all completed steps in reverse order.

The same Saga entity can orchestrate multiple concurrent saga instances, each identified by a unique saga_id.

Attributes:

Name Type Description
name

Saga identifier.

stats SagaStats

Accumulated statistics.

Initialize the saga orchestrator.

Parameters:

Name Type Description Default
name str

Saga identifier.

required
steps list[SagaStep]

Ordered list of saga steps.

required
on_complete Callable[[int, SagaState, list[SagaStepResult]], None] | None

Optional callback fired when a saga finishes. Receives (saga_id, final_state, step_results).

None

Raises:

Type Description
ValueError

If steps list is empty.

stats property

stats: SagaStats

Return a frozen snapshot of current statistics.

steps property

steps: list[SagaStep]

The saga step definitions.

active_instances property

active_instances: int

Number of currently active saga instances.

handle_event

handle_event(event: Event) -> list[Event] | None

Route events to the appropriate saga handler.

Parameters:

Name Type Description Default
event Event

The incoming event.

required

Returns:

Type Description
list[Event] | None

Events to schedule.

get_instance_state

get_instance_state(saga_id: int) -> SagaState | None

Get the state of a saga instance.

Parameters:

Name Type Description Default
saga_id int

The saga instance ID.

required

Returns:

Type Description
SagaState | None

The saga state, or None if not found.

SagaState

Bases: Enum

State of a saga instance.

SagaStats dataclass

SagaStats(
    sagas_started: int = 0,
    sagas_completed: int = 0,
    sagas_compensated: int = 0,
    sagas_failed: int = 0,
    steps_executed: int = 0,
    steps_failed: int = 0,
    compensations_executed: int = 0,
)

Statistics tracked by Saga.

SagaStep dataclass

SagaStep(
    name: str,
    action_target: Entity,
    action_event_type: str,
    compensation_target: Entity,
    compensation_event_type: str,
    timeout: float | None = None,
)

Definition of a single saga step.

Parameters:

Name Type Description Default
name str

Human-readable step name.

required
action_target Entity

Entity to send the forward action to.

required
action_event_type str

Event type for the forward action.

required
compensation_target Entity

Entity to send compensation to on rollback.

required
compensation_event_type str

Event type for compensation.

required
timeout float | None

Seconds to wait for step completion. None for no timeout.

None

SagaStepResult dataclass

SagaStepResult(
    step_name: str,
    success: bool,
    started_at: Instant | None = None,
    completed_at: Instant | None = None,
)

Result of an individual saga step execution.

Sidecar

Sidecar(
    name: str,
    target: Entity,
    rate_limit_policy: RateLimiterPolicy | None = None,
    rate_limit_queue_capacity: int = 1000,
    circuit_failure_threshold: int = 5,
    circuit_success_threshold: int = 2,
    circuit_timeout: float = 30.0,
    request_timeout: float = 5.0,
    max_retries: int = 3,
    retry_base_delay: float = 0.1,
)

Bases: Entity

Service mesh sidecar proxy with integrated resilience.

Inlines rate limiting, circuit breaking, timeout, and retry logic so that users only need to register a single entity. Requests flow through each layer in order:

  1. Rate limit check (if policy provided)
  2. Circuit breaker check
  3. Forward to target with timeout
  4. On failure/timeout: retry with exponential backoff

Attributes:

Name Type Description
name

Sidecar identifier.

stats SidecarStats

Accumulated statistics.

Initialize the sidecar proxy.

Parameters:

Name Type Description Default
name str

Sidecar identifier.

required
target Entity

Downstream service to proxy requests to.

required
rate_limit_policy RateLimiterPolicy | None

Optional rate limiter policy. None disables rate limiting.

None
rate_limit_queue_capacity int

Max queued events when rate limited.

1000
circuit_failure_threshold int

Consecutive failures before opening circuit.

5
circuit_success_threshold int

Consecutive successes in half-open to close.

2
circuit_timeout float

Seconds before transitioning open to half-open.

30.0
request_timeout float

Per-request timeout in seconds.

5.0
max_retries int

Maximum retry attempts (0 = no retries).

3
retry_base_delay float

Base delay for exponential backoff.

0.1

Raises:

Type Description
ValueError

If parameters are invalid.

stats property

stats: SidecarStats

Return a frozen snapshot of current statistics.

target property

target: Entity

The proxied target entity.

circuit_state property

circuit_state: str

Current circuit breaker state as a string.

handle_event

handle_event(
    event: Event,
) -> (
    Generator[float, None, list[Event]] | list[Event] | None
)

Route incoming events through the sidecar pipeline.

Parameters:

Name Type Description Default
event Event

The incoming event.

required

Returns:

Type Description
Generator[float, None, list[Event]] | list[Event] | None

Events to schedule, generator for async processing, or None.

SidecarStats dataclass

SidecarStats(
    total_requests: int = 0,
    successful_requests: int = 0,
    failed_requests: int = 0,
    retries: int = 0,
    rate_limited: int = 0,
    circuit_broken: int = 0,
    timed_out: int = 0,
)

Statistics tracked by Sidecar.