STATEK
Temporal Tools

STATEK Temporal Tools

Temporal tools are the lower-level machinery behind durable waits in STATEK.

They let Python code return a value if it is ready now, or a FutureResult if the value must be checked again later. When the future is not ready, the job can suspend and later continue from the stored instruction instead of rerunning earlier Python statements.

⚠️

Temporal tools are advanced, polling-based durable waiting primitives. Use callbacks, pushed messages, events, and queues first for human input, webhooks, broad product interruptions, and high-scale event delivery. Reach for temporal tools when the wait belongs naturally inside controlled Python execution.

Core Mechanics

Temporal functions are regular sync or async Python functions decorated with @temporal(...).

from statek.future import FutureResult, temporal
from statek.exceptions import FutureError

A temporal function has two support functions:

  • condition(future): returns whether the value is ready
  • complement(future): returns the value, or raises FutureError when it is still pending

The decorated function may return either:

  • a normal ready value
  • a FutureResult(deps=..., state_num=...)

When it returns a FutureResult, the decorator attaches the complement and condition to that future. future.check_condition() calls the condition. future.value calls the complement.

def fetch_report(future: FutureResult) -> str:
    report = future.deps
    if report.status != "ready":
        raise FutureError(future)
    return report.url
 
def report_is_ready(future: FutureResult) -> bool:
    return future.deps.status == "ready"
 
@temporal(complement=fetch_report, condition=report_is_ready)
def wait_for_report(report) -> str | FutureResult:
    if report.status == "ready":
        return report.url
    return FutureResult(deps=report, state_num=0)

Agent code can read naturally:

report_url = wait_for_report(report)
print(report_url)

If the value is ready, report_url is a string. If the call returns a pending FutureResult, using that value in execution can raise FutureError; STATEK catches that suspension path in job execution, stores the awaited future and instruction position, and resumes from that position when the condition later passes.

Ready and Pending Results

A temporal function does not have to return a future every time.

@temporal(complement=fetch_report, condition=report_is_ready)
def wait_for_report(report) -> str | FutureResult:
    if report.status == "ready":
        return report.url
    return FutureResult(deps=report, state_num=report.version)

Ready path:

report.status = "ready"
url = wait_for_report(report)
print(url)

Pending path:

report.status = "running"
url = wait_for_report(report)
print(url)

In the pending path, url is future-like until resolved. Accessing its value calls the complement:

future = wait_for_report(report)
 
if future.check_condition():
    print(future.value)

Ordinary application code should not catch and drive FutureError as a control-flow API. Let STATEK job execution handle suspension, and keep complements focused on resolving values.

Human Approval Example

Human approval can be expressed as a temporal wait:

def fetch_approval(future: FutureResult):
    approval = future.deps
    if approval.status == "pending":
        raise FutureError(future)
    return approval
 
def approval_is_done(future: FutureResult) -> bool:
    return future.deps.status in {"approved", "rejected"}
 
@temporal(complement=fetch_approval, condition=approval_is_done)
def wait_for_approval(approval_request):
    if approval_request.status in {"approved", "rejected"}:
        return approval_request
    return FutureResult(deps=approval_request, state_num=approval_request.version)

The agent-facing code stays direct:

approval = wait_for_approval(approval_request)
 
if approval.status == "approved":
    apply_change(proposed_change)
else:
    print("Change was not approved.")

For product approval flows, callbacks are usually the better first API. A UI, email link, or webhook can deliver an approval event directly to the relevant job or create a new job with the approval in context. Use temporal approval waits only when the bounded polling and Python-continuation shape is intentional.

Background Exports

Temporal functions fit controlled waits for application-owned background work:

def export_result(future: FutureResult):
    export = future.deps
    if export.status != "complete":
        raise FutureError(future)
    return export.output_url
 
def export_complete(future: FutureResult) -> bool:
    return future.deps.status == "complete"
 
@temporal(complement=export_result, condition=export_complete)
def wait_for_export(export_job):
    if export_job.status == "complete":
        return export_job.output_url
    return FutureResult(deps=export_job, state_num=export_job.version)

Then durable Python can read as if it is waiting for a normal result:

export_job = start_export(dataset)
output_url = wait_for_export(export_job)
report = build_report(output_url)

The complement should only fetch the durable result. Starting the export should happen before the wait and should be guarded by application state so a resumed job does not duplicate the side effect.

Bounded Queue-Picking Warmup

A common low-level pattern is a bounded worker whose warmup picks the next task:

def fetch_task(future: FutureResult):
    queue = future.deps
    task = queue.peek_ready_task()
    if task is None:
        raise FutureError(future)
    return task
 
def task_available(future: FutureResult) -> bool:
    return future.deps.has_ready_task()
 
@temporal(complement=fetch_task, condition=task_available)
def pick_next_task(queue):
    task = queue.peek_ready_task()
    if task is not None:
        return task
    return FutureResult(deps=queue, state_num=queue.version)

Warmup can then prepare the job workspace:

task = pick_next_task(queue)
user = task.user
message = task.message
request_text = message.text

This is appropriate only when the application bounds how many jobs can wait this way and the readiness check is cheap. For broad event delivery, deliver queue events into jobs instead of creating many suspended polling waits.

Waiting for Any Future

get_any_future(...) combines futures and resolves to the first ready value.

from statek.future import get_any_future
 
primary = wait_for_export(primary_export)
fallback = wait_for_export(fallback_export)
 
first_output_url = get_any_future(primary, fallback)
notify_user(first_output_url)

Internally, the combined future checks each input with check_condition(). Its value is the first input future whose condition is ready. If none are ready, accessing the value raises FutureError.

Waiting for All Futures

get_all_future(...) combines futures and resolves when every input is ready.

from statek.future import get_all_future
 
profile = wait_for_profile(profile_load)
calendar = wait_for_calendar(calendar_load)
preferences = wait_for_preferences(preference_load)
 
profile_data, calendar_data, preference_data = get_all_future(
    profile,
    calendar,
    preferences,
)

The combined future returns a tuple of resolved values in input order. If any input is still pending, accessing the combined value raises FutureError.

Both combined-future helpers require at least one future argument.

Tuple Result Unpacking

If a complement has a fixed-size tuple return annotation, a FutureResult can be unpacked into FutureElement objects:

from typing import Tuple
 
def fetch_pair(future: FutureResult) -> Tuple[str, int]:
    result = future.deps
    if not result.ready:
        raise FutureError(future)
    return result.label, result.count
 
def pair_ready(future: FutureResult) -> bool:
    return future.deps.ready
 
@temporal(complement=fetch_pair, condition=pair_ready)
def wait_for_pair(result):
    return FutureResult(deps=result, state_num=result.version)

Agent code can unpack the pending pair:

label, count = wait_for_pair(result)
print(label, count)

Each element delegates readiness to the parent future. If the parent is not ready, accessing an element value raises FutureError. If the complement does not declare a fixed tuple return type, unpacking raises FutureError.

Extending a Temporal Function

Use @temporal(extends=...) when a wrapper should reuse another temporal function's complement and condition.

@temporal(complement=fetch_report, condition=report_is_ready)
def wait_for_report(report):
    if report.status == "ready":
        return report.url
    return FutureResult(deps=report, state_num=report.version)
 
@temporal(extends=wait_for_report)
def wait_for_report_by_id(report_id):
    report = load_report(report_id)
    return wait_for_report(report)

The extended function must return a result compatible with the base temporal function. You cannot pass both extends= and explicit complement= or condition=, and extends= must refer to another temporal function.

Async Temporal Functions

@temporal(...) also supports async functions:

@temporal(complement=fetch_report, condition=report_is_ready)
async def wait_for_remote_report(report_id):
    report = await load_report_async(report_id)
    if report.status == "ready":
        return report.url
    return FutureResult(deps=report, state_num=report.version)

The async wrapper awaits the function and then applies the same FutureResult handling as the sync decorator.

Anti-Patterns

Do not sleep inside job code for long waits:

# Avoid
time.sleep(600)

Do not make conditions expensive. The worker loop may call check_condition() repeatedly for suspended jobs, so conditions should be cheap state checks.

Do not use temporal tools as a generic webhook bus. Webhooks and user-facing events should normally be authenticated, validated, queued, and delivered as callbacks or new jobs.

Do not put non-idempotent side effects in complements. A complement may be retried when a job resumes or when a future value is inspected again. Start external work in explicit application code, store durable IDs, and let the complement fetch the result.

Do not raise FutureError manually from ordinary tools to invent custom suspension behavior. FutureError is reserved for temporal result retrieval. Ordinary tools should return explicit application results or errors.

Practical Rule

Use temporal tools when all of these are true:

  • the wait belongs inside durable Python execution
  • readiness can be checked cheaply
  • result fetching is idempotent
  • the number of suspended jobs is bounded
  • callbacks or direct event delivery would make the flow less clear

For most human input, webhooks, broad product interruptions, and high-scale event delivery, use callbacks, events, queues, and job creation first.

Where to go next

Read Callbacks and Interruptions for the preferred product-level event model, Futures for future concepts and suspended jobs, Warmup Code for queue-picking startup patterns, and Replay and Recovery for side-effect boundaries.