Scheduling¶
Job scheduling, DAG execution, and work-stealing thread pools.
Scheduling components for job management and work distribution.
JobDefinition
dataclass
¶
JobDefinition(
name: str,
target: Entity,
event_type: str,
interval: float,
priority: int = 0,
depends_on: list[str] = list(),
context: dict[str, Any] = dict(),
enabled: bool = True,
)
Definition of a scheduled job.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Unique job identifier. |
target |
Entity
|
Entity to receive the job event. |
event_type |
str
|
Event type string for the dispatched event. |
interval |
float
|
Seconds between job executions. |
priority |
int
|
Higher values execute first when multiple jobs are due. |
depends_on |
list[str]
|
Names of jobs that must complete before this one runs. |
context |
dict[str, Any]
|
Extra metadata to include in job events. |
enabled |
bool
|
Whether the job is active. |
JobScheduler ¶
Bases: Entity
Periodic job scheduler with priority and DAG dependencies.
Uses a self-perpetuating tick loop to check for due jobs at regular intervals. Jobs are sorted by priority (highest first) and checked for DAG dependency satisfaction before firing.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Scheduler identifier. |
|
tick_interval |
float
|
Seconds between scheduler ticks. |
stats |
JobSchedulerStats
|
Frozen statistics snapshot (via property). |
Initialize the job scheduler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Scheduler identifier. |
required |
tick_interval
|
float
|
Seconds between evaluation ticks. |
1.0
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If tick_interval is not positive. |
add_job ¶
Register a job with the scheduler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
JobDefinition
|
Job definition to add. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If a job with the same name already exists. |
remove_job ¶
Remove a job from the scheduler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the job to remove. |
required |
start ¶
JobSchedulerStats
dataclass
¶
JobSchedulerStats(
ticks: int = 0,
jobs_triggered: int = 0,
jobs_completed: int = 0,
jobs_skipped_dependency: int = 0,
jobs_skipped_running: int = 0,
)
Statistics tracked by JobScheduler.
JobState
dataclass
¶
JobState(
last_run_time: Instant | None = None,
last_completion_time: Instant | None = None,
is_running: bool = False,
run_count: int = 0,
failure_count: int = 0,
)
Runtime state for a scheduled job.
WorkerStats
dataclass
¶
WorkerStats(
tasks_completed: int = 0,
tasks_stolen: int = 0,
total_processing_time: float = 0.0,
idle_time: float = 0.0,
)
Frozen snapshot of per-worker statistics.
WorkStealingPool ¶
WorkStealingPool(
name: str,
num_workers: int = 4,
downstream: Entity | None = None,
processing_time_key: str = "processing_time",
default_processing_time: float = 0.1,
)
Bases: Entity
Work-stealing pool with N workers, each having a local deque.
New work goes to the worker with the shortest queue. Idle workers steal from the busiest neighbor's tail.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Pool identifier. |
|
num_workers |
int
|
Number of worker entities. |
stats |
WorkStealingPoolStats
|
Frozen statistics snapshot (via property). |
Initialize the work-stealing pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Pool identifier. |
required |
num_workers
|
int
|
Number of workers. |
4
|
downstream
|
Entity | None
|
Entity to receive completed events. |
None
|
processing_time_key
|
str
|
Metadata key for task processing time. |
'processing_time'
|
default_processing_time
|
float
|
Fallback if key not found. |
0.1
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If num_workers < 1. |
handle_event ¶
Accept incoming work and assign to shortest queue.
WorkStealingPoolStats
dataclass
¶
WorkStealingPoolStats(
tasks_submitted: int = 0,
tasks_completed: int = 0,
total_steals: int = 0,
total_steal_attempts: int = 0,
)
Frozen snapshot of aggregate pool statistics.