Runners
Runner APIs prepare STATEK workers, execute ready jobs, unsuspend jobs whose futures are ready, process push queues, and create jobs for supervised agents.
| Symbol | Import path | Purpose | Stability |
|---|---|---|---|
start_statek | statek or statek.runner | Blocking high-level worker startup | Core operational |
start_statek_async | statek or statek.runner | Async high-level worker startup | Core operational |
run_job_step | statek.executors.utils | Execute one job step | Core operational |
run_jobs_loop | statek.executors.utils | Main worker loop | Core operational |
run_agentic_loop | statek.executors.utils | Single supervised-agent queue loop | Operational |
run_agentic_fleet | statek.executors.utils | Multiple supervised-agent loops in one worker | Operational |
AgentLoopDef | statek.executors.utils | Fleet loop definition | Operational |
process_push_notifications | statek.executors.utils | Deliver queued messages to jobs | Provisional |
process_agent_events | statek.executors.utils | Create jobs from queued agent events | Provisional |
unsuspend_jobs | statek.executors.utils | Move ready suspended jobs to started | Advanced |
High-level startup
def start_statek(
agents: Sequence[Agent] | None = None,
push_queues: Sequence[StatekPushQueue] | None = None,
settings: StatekSettings | None = None,
max_concurrency: int = 100,
provider: str = None,
)
async def start_statek_async(
agents: Sequence[Agent] | None = None,
push_queues: Sequence[StatekPushQueue] | None = None,
settings: StatekSettings | None = None,
max_concurrency: int = 100,
provider: str = None,
)Prepares a STATEK worker process and starts the agentic processing loop. start_statek(...) is a blocking wrapper around start_statek_async(...).
Import path: from statek import start_statek, start_statek_async
Parameters
| Name | Description |
|---|---|
agents | Optional sequence of agents to activate. When omitted, STATEK finds persisted Agent objects in the current dbzero prefix. Duplicate objects are ignored while preserving order. |
push_queues | Optional sequence of StatekPushQueue objects supplied by the hosting application. STATEK derives queue prefixes from these queues. When omitted, it monitors the current dbzero prefix. |
settings | Optional StatekSettings. When omitted, STATEK uses get_statek_settings(), including environment-derived settings. |
max_concurrency | Maximum number of active jobs scheduled by the underlying loop. |
provider | Optional default provider name passed to the underlying loop. Job metadata can still choose a provider. |
Preparation behavior
| Step | Behavior |
|---|---|
| Initialize settings | Calls statek.init(settings) before starting the loop. |
| Client API | Instantiates StatekClientAPI() so the worker process has the external job-submission API object available. |
| Prompt files | Applies settings.prompt_defs to active agents when prompt definitions are loaded. |
| Warmup files | Applies warmup definitions from settings.warmup_defs_dir when configured and the path is a directory. |
| Loop choice | Uses run_agentic_loop(...) for one active agent and run_agentic_fleet(...) for multiple active agents. |
Returns: start_statek_async(...) runs until the underlying loop returns. start_statek(...) returns the result of asyncio.run(start_statek_async(...)).
Raises: start_statek(...) raises RuntimeError when called inside an active event loop. Use await start_statek_async(...) from async application code.
from statek import start_statek
def main():
start_statek(
agents=[dispatcher, researcher],
push_queues=[main_queue],
max_concurrency=50,
)from statek import start_statek_async
async def main():
await start_statek_async(
agents=[dispatcher],
push_queues=[main_queue],
)run_job_step
async def run_job_step(job: Job, provider: str = None) -> boolExecutes one iteration of the STATEK job pipeline: warmup or pending code, tool calls, future suspension, LLM request, chat-log update, harness checks, and completion detection.
Returns: True when the job completed during the step; otherwise False.
Errors: catches most job execution errors into job console/status paths. Provider and harness errors may be handled by job_worker in loop execution.
from statek.executors.utils import run_job_step
completed = await run_job_step(job)run_jobs_loop
async def run_jobs_loop(
max_concurrency: int = 100,
provider: str = None,
*,
queue_prefixes: Sequence[str | int],
start_jobs_func: Callable = None,
auto_terminate: bool = False,
)Runs the process-level worker loop. Each iteration unsuspends jobs, processes push notifications, processes queued agent events, optionally starts jobs, and schedules ready jobs up to max_concurrency.
Parameters
| Name | Description |
|---|---|
max_concurrency | Maximum concurrently executing jobs. |
provider | Default provider name when job metadata does not choose one. |
queue_prefixes | dbzero prefixes where StatekPushQueue objects may exist. |
start_jobs_func | Optional callback receiving available capacity. |
auto_terminate | Test-oriented option to exit once no active jobs remain. |
from statek.executors.utils import run_jobs_loop
await run_jobs_loop(
max_concurrency=10,
queue_prefixes=["statek_worker"],
)Agentic loops
@dataclass
class AgentLoopDef:
agent: SupervisedAgent
warmup_code: str | Sequence[str] | None
task_queue_size_func: Callable
async def run_agentic_loop(
agent,
warmup_code,
task_queue_size_func,
queue_prefixes,
max_concurrency=100,
provider=None,
auto_terminate=False,
)
async def run_agentic_fleet(
agent_loop_defs,
queue_prefixes,
max_concurrency=100,
provider=None,
auto_terminate=False,
)run_agentic_loop(...) creates or reuses a matching JobDef for one agent and starts jobs based on queue size. run_agentic_fleet(...) creates one start function per AgentLoopDef and shares a single run_jobs_loop(...).
Queue helpers
| Function | Signature | Description |
|---|---|---|
process_push_notifications | process_push_notifications(step_size=100, max_count=500, *, queue_prefixes, job_prefix=None) | Pops queued job messages and calls job.push_user_message(...). |
process_agent_events | process_agent_events(agents=None, max_count=100, *, queue_prefixes) | Pops events for supervised agents and creates jobs. Agent warmup must reference exactly one external local. |
unsuspend_jobs | unsuspend_jobs() | Checks suspended jobs and sets ready continuations to STARTED. |
Related APIs: Operations, Jobs, Agents.