STATEK
API Reference
Runners

Runners

Runner APIs prepare STATEK workers, execute ready jobs, unsuspend jobs whose futures are ready, process push queues, and create jobs for supervised agents.

SymbolImport pathPurposeStability
start_statekstatek or statek.runnerBlocking high-level worker startupCore operational
start_statek_asyncstatek or statek.runnerAsync high-level worker startupCore operational
run_job_stepstatek.executors.utilsExecute one job stepCore operational
run_jobs_loopstatek.executors.utilsMain worker loopCore operational
run_agentic_loopstatek.executors.utilsSingle supervised-agent queue loopOperational
run_agentic_fleetstatek.executors.utilsMultiple supervised-agent loops in one workerOperational
AgentLoopDefstatek.executors.utilsFleet loop definitionOperational
process_push_notificationsstatek.executors.utilsDeliver queued messages to jobsProvisional
process_agent_eventsstatek.executors.utilsCreate jobs from queued agent eventsProvisional
unsuspend_jobsstatek.executors.utilsMove ready suspended jobs to startedAdvanced

High-level startup

def start_statek(
    agents: Sequence[Agent] | None = None,
    push_queues: Sequence[StatekPushQueue] | None = None,
    settings: StatekSettings | None = None,
    max_concurrency: int = 100,
    provider: str = None,
)
 
async def start_statek_async(
    agents: Sequence[Agent] | None = None,
    push_queues: Sequence[StatekPushQueue] | None = None,
    settings: StatekSettings | None = None,
    max_concurrency: int = 100,
    provider: str = None,
)

Prepares a STATEK worker process and starts the agentic processing loop. start_statek(...) is a blocking wrapper around start_statek_async(...).

Import path: from statek import start_statek, start_statek_async

Parameters

NameDescription
agentsOptional sequence of agents to activate. When omitted, STATEK finds persisted Agent objects in the current dbzero prefix. Duplicate objects are ignored while preserving order.
push_queuesOptional sequence of StatekPushQueue objects supplied by the hosting application. STATEK derives queue prefixes from these queues. When omitted, it monitors the current dbzero prefix.
settingsOptional StatekSettings. When omitted, STATEK uses get_statek_settings(), including environment-derived settings.
max_concurrencyMaximum number of active jobs scheduled by the underlying loop.
providerOptional default provider name passed to the underlying loop. Job metadata can still choose a provider.

Preparation behavior

StepBehavior
Initialize settingsCalls statek.init(settings) before starting the loop.
Client APIInstantiates StatekClientAPI() so the worker process has the external job-submission API object available.
Prompt filesApplies settings.prompt_defs to active agents when prompt definitions are loaded.
Warmup filesApplies warmup definitions from settings.warmup_defs_dir when configured and the path is a directory.
Loop choiceUses run_agentic_loop(...) for one active agent and run_agentic_fleet(...) for multiple active agents.

Returns: start_statek_async(...) runs until the underlying loop returns. start_statek(...) returns the result of asyncio.run(start_statek_async(...)).

Raises: start_statek(...) raises RuntimeError when called inside an active event loop. Use await start_statek_async(...) from async application code.

from statek import start_statek
 
def main():
    start_statek(
        agents=[dispatcher, researcher],
        push_queues=[main_queue],
        max_concurrency=50,
    )
from statek import start_statek_async
 
async def main():
    await start_statek_async(
        agents=[dispatcher],
        push_queues=[main_queue],
    )

run_job_step

async def run_job_step(job: Job, provider: str = None) -> bool

Executes one iteration of the STATEK job pipeline: warmup or pending code, tool calls, future suspension, LLM request, chat-log update, harness checks, and completion detection.

Returns: True when the job completed during the step; otherwise False.

Errors: catches most job execution errors into job console/status paths. Provider and harness errors may be handled by job_worker in loop execution.

from statek.executors.utils import run_job_step
 
completed = await run_job_step(job)

run_jobs_loop

async def run_jobs_loop(
    max_concurrency: int = 100,
    provider: str = None,
    *,
    queue_prefixes: Sequence[str | int],
    start_jobs_func: Callable = None,
    auto_terminate: bool = False,
)

Runs the process-level worker loop. Each iteration unsuspends jobs, processes push notifications, processes queued agent events, optionally starts jobs, and schedules ready jobs up to max_concurrency.

Parameters

NameDescription
max_concurrencyMaximum concurrently executing jobs.
providerDefault provider name when job metadata does not choose one.
queue_prefixesdbzero prefixes where StatekPushQueue objects may exist.
start_jobs_funcOptional callback receiving available capacity.
auto_terminateTest-oriented option to exit once no active jobs remain.
from statek.executors.utils import run_jobs_loop
 
await run_jobs_loop(
    max_concurrency=10,
    queue_prefixes=["statek_worker"],
)

Agentic loops

@dataclass
class AgentLoopDef:
    agent: SupervisedAgent
    warmup_code: str | Sequence[str] | None
    task_queue_size_func: Callable
 
async def run_agentic_loop(
    agent,
    warmup_code,
    task_queue_size_func,
    queue_prefixes,
    max_concurrency=100,
    provider=None,
    auto_terminate=False,
)
 
async def run_agentic_fleet(
    agent_loop_defs,
    queue_prefixes,
    max_concurrency=100,
    provider=None,
    auto_terminate=False,
)

run_agentic_loop(...) creates or reuses a matching JobDef for one agent and starts jobs based on queue size. run_agentic_fleet(...) creates one start function per AgentLoopDef and shares a single run_jobs_loop(...).

Queue helpers

FunctionSignatureDescription
process_push_notificationsprocess_push_notifications(step_size=100, max_count=500, *, queue_prefixes, job_prefix=None)Pops queued job messages and calls job.push_user_message(...).
process_agent_eventsprocess_agent_events(agents=None, max_count=100, *, queue_prefixes)Pops events for supervised agents and creates jobs. Agent warmup must reference exactly one external local.
unsuspend_jobsunsuspend_jobs()Checks suspended jobs and sets ready continuations to STARTED.

Related APIs: Operations, Jobs, Agents.