Skip to content

Scheduling

Job scheduling, DAG execution, and work-stealing thread pools.

Scheduling components for job management and work distribution.

JobDefinition dataclass

JobDefinition(
    name: str,
    target: Entity,
    event_type: str,
    interval: float,
    priority: int = 0,
    depends_on: list[str] = list(),
    context: dict[str, Any] = dict(),
    enabled: bool = True,
)

Definition of a scheduled job.

Attributes:

Name Type Description
name str

Unique job identifier.

target Entity

Entity to receive the job event.

event_type str

Event type string for the dispatched event.

interval float

Seconds between job executions.

priority int

Higher values execute first when multiple jobs are due.

depends_on list[str]

Names of jobs that must complete before this one runs.

context dict[str, Any]

Extra metadata to include in job events.

enabled bool

Whether the job is active.

JobScheduler

JobScheduler(name: str, tick_interval: float = 1.0)

Bases: Entity

Periodic job scheduler with priority and DAG dependencies.

Uses a self-perpetuating tick loop to check for due jobs at regular intervals. Jobs are sorted by priority (highest first) and checked for DAG dependency satisfaction before firing.

Attributes:

Name Type Description
name

Scheduler identifier.

tick_interval float

Seconds between scheduler ticks.

stats JobSchedulerStats

Frozen statistics snapshot (via property).

Initialize the job scheduler.

Parameters:

Name Type Description Default
name str

Scheduler identifier.

required
tick_interval float

Seconds between evaluation ticks.

1.0

Raises:

Type Description
ValueError

If tick_interval is not positive.

tick_interval property

tick_interval: float

Seconds between scheduler ticks.

job_names property

job_names: list[str]

Names of all registered jobs.

running_jobs property

running_jobs: list[str]

Names of currently running jobs.

is_running property

is_running: bool

Whether the scheduler is active.

stats property

stats: JobSchedulerStats

Frozen snapshot of current statistics.

add_job

add_job(job: JobDefinition) -> None

Register a job with the scheduler.

Parameters:

Name Type Description Default
job JobDefinition

Job definition to add.

required

Raises:

Type Description
ValueError

If a job with the same name already exists.

remove_job

remove_job(name: str) -> None

Remove a job from the scheduler.

Parameters:

Name Type Description Default
name str

Name of the job to remove.

required

enable_job

enable_job(name: str) -> None

Enable a disabled job.

disable_job

disable_job(name: str) -> None

Disable a job without removing it.

get_job_state

get_job_state(name: str) -> JobState | None

Get the runtime state of a job.

start

start() -> Event

Start the scheduler tick loop.

Returns:

Type Description
Event

The first tick event to schedule.

stop

stop() -> None

Stop the scheduler.

handle_event

handle_event(event: Event) -> list[Event] | None

Handle scheduler events.

Parameters:

Name Type Description Default
event Event

The event to handle.

required

Returns:

Type Description
list[Event] | None

Events to schedule.

JobSchedulerStats dataclass

JobSchedulerStats(
    ticks: int = 0,
    jobs_triggered: int = 0,
    jobs_completed: int = 0,
    jobs_skipped_dependency: int = 0,
    jobs_skipped_running: int = 0,
)

Statistics tracked by JobScheduler.

JobState dataclass

JobState(
    last_run_time: Instant | None = None,
    last_completion_time: Instant | None = None,
    is_running: bool = False,
    run_count: int = 0,
    failure_count: int = 0,
)

Runtime state for a scheduled job.

WorkerStats dataclass

WorkerStats(
    tasks_completed: int = 0,
    tasks_stolen: int = 0,
    total_processing_time: float = 0.0,
    idle_time: float = 0.0,
)

Frozen snapshot of per-worker statistics.

WorkStealingPool

WorkStealingPool(
    name: str,
    num_workers: int = 4,
    downstream: Entity | None = None,
    processing_time_key: str = "processing_time",
    default_processing_time: float = 0.1,
)

Bases: Entity

Work-stealing pool with N workers, each having a local deque.

New work goes to the worker with the shortest queue. Idle workers steal from the busiest neighbor's tail.

Attributes:

Name Type Description
name

Pool identifier.

num_workers int

Number of worker entities.

stats WorkStealingPoolStats

Frozen statistics snapshot (via property).

Initialize the work-stealing pool.

Parameters:

Name Type Description Default
name str

Pool identifier.

required
num_workers int

Number of workers.

4
downstream Entity | None

Entity to receive completed events.

None
processing_time_key str

Metadata key for task processing time.

'processing_time'
default_processing_time float

Fallback if key not found.

0.1

Raises:

Type Description
ValueError

If num_workers < 1.

num_workers property

num_workers: int

Number of workers in the pool.

workers property

workers: list[_Worker]

The worker entities (for registration with Simulation).

worker_stats property

worker_stats: list[WorkerStats]

Per-worker statistics (frozen snapshots).

stats property

stats: WorkStealingPoolStats

Frozen snapshot of aggregate pool statistics.

set_clock

set_clock(clock: Clock) -> None

Propagate clock to all workers.

handle_event

handle_event(event: Event) -> list[Event] | None

Accept incoming work and assign to shortest queue.

WorkStealingPoolStats dataclass

WorkStealingPoolStats(
    tasks_submitted: int = 0,
    tasks_completed: int = 0,
    total_steals: int = 0,
    total_steal_attempts: int = 0,
)

Frozen snapshot of aggregate pool statistics.