Agentic Fleet Guide
This guide shows a practical STATEK shape for applications that receive ongoing user messages. Each incoming message is routed by a dispatcher job to either an existing conversation thread or a new specialist worker.
Use this pattern when your application needs to preserve thread history, route fresh requests to different worker agents, keep worker jobs durable, and inspect why a message was routed the way it was.
This is an orchestration pattern, not a security boundary. Tool permissions, authorization, sandboxing, secrets handling, resource limits, and side-effect controls are enforced by your application and runtime operations, not by the routing decision alone.
Fleet Overview
The dispatcher is intentionally narrow. It looks at the incoming message, the user, recent threads, available agents, and printed examples. It then performs exactly one routing action:
dispatch_to(threads[i])for a clear continuationstart_new_thread(...)for a fresh request
The worker owns the actual task. It can answer the user, ask a clarifying question, inspect durable state, call tools, or request confirmation before a side effect.
Step 1: Create A Durable Root
A durable root or registry gives your application one place to create long-lived agents and share the same task_agents map with the dispatcher.
import dbzero as db0
from statek.agents import DialogAgent, SupervisedAgent
@db0.memo(singleton=True)
class StatekRoot:
def __init__(self):
self.information_retriever = DialogAgent(
role="information_retriever",
send_message=send_dialog_message,
tools=[search_documents, lookup_account_state],
)
self.schedule_assistant = DialogAgent(
role="schedule_assistant",
send_message=send_dialog_message,
tools=[find_availability, propose_time_change, submit_request],
)
self.preferences_assistant = DialogAgent(
role="preferences_assistant",
send_message=send_dialog_message,
tools=[read_preferences, update_preferences],
)
self.sounding_board = SupervisedAgent(
role="sounding_board",
_system_prompt=None,
_tools=[draft_options, summarize_tradeoffs],
)
self.task_agents = {
agent.role: agent
for agent in (
self.information_retriever,
self.schedule_assistant,
self.preferences_assistant,
self.sounding_board,
)
}
self.dispatcher = AppDispatcher(task_agents=self.task_agents)Direct agent creation is fine for a small prototype. A root becomes useful once restarts, queue workers, prompt directories, examples, and multiple dispatch entry points need to agree on the same agents.
Use DialogAgent workers for user-facing conversation threads. Use SupervisedAgent workers for background tasks where another system, not the agent itself, owns user messaging.
Step 2: Implement The Dispatcher
STATEK provides MessageDispatcher as a generic routing agent that accepts chat_history, start_new_thread, and dispatch_to callables. A production-shaped application usually subclasses it to add application-specific thread lookup, new-thread creation, and dynamic agent discovery.
import dbzero as db0
from statek.agents import MessageDispatcher
from statek.agents.agent import Agent
from statek.system import create_tool
@db0.memo
class AppDispatcher(MessageDispatcher):
task_agents: dict[str, Agent] = None
def __init__(self, task_agents: dict[str, Agent]):
self.task_agents = task_agents
super().__init__(
chat_history=recent_threads,
start_new_thread=start_new_thread,
dispatch_to=dispatch_to,
)
self._metadata = {"MODEL": "fast-routing-model"}
self.update_warmup_def(
"user = message.sender\n"
"print('Incoming message:', message)\n"
)
self.append_tool("find_agents")
def init_context(self):
if self._X__context is None:
super().init_context()
self._X__context["recent_chat_threads"] = recent_threads
create_tool(
tool_name="find_agents",
callable=self._find_agents_impl,
docstring="""Find available specialist agents.
Returns:
dict[str, Agent]: Mapping from role name to agent object.
Use the role name in start_new_thread(agent=...).
""",
context=self._X__context,
)
def _find_agents_impl(self) -> dict[str, Agent]:
return self.task_agentsThe routing tools can have application-owned signatures like this:
def start_new_thread(
agent: str,
language: str,
name: str | None = None,
default_example_id: int | None = None,
) -> None:
"""Create a thread, create the delegated worker job, and bind them."""
def dispatch_to(thread) -> None:
"""Append the incoming message to an existing thread and wake its job."""
def find_agents() -> dict[str, Agent]:
"""Return the specialist agents available to the dispatcher."""The dispatcher routes only. Do not give it answer tools, domain mutation tools, or broad retrieval tools just because they are convenient. The worker selected by the dispatcher should own the answer or side effect.
Step 3: Prepare Dispatcher Warmup
Warmup should print the facts needed for one routing decision: message, user, available agents, recent threads, examples, and language or locale hints.
user = message.sender
agents = find_agents()
threads = [thread for thread in recent_chat_threads(count=5) if thread is not None]
language = preferred_language_for(user) or "en"
print("Incoming message:")
print(message)
print("User:")
print(user)
print("Language:")
print(language)
print("Available agents:")
for role, agent in agents.items():
print(f"- {role}: {agent.description or 'No description'}")
for example_id, example_name in list_agent_examples(role):
print(f" example_id={example_id}: {example_name}")
print("Recent threads:")
for i, thread in enumerate(threads):
print(f"threads[{i}]:")
print(thread.summary)
print("latest_activity:", thread.latest_activity_at)
print("last_assistant_message:", thread.last_assistant_message)Treat thread objects as opaque in the dispatcher prompt when warmup already printed summaries. The model should select from threads[i]; it should not inspect object internals or re-fetch context unless your prompt explicitly allows it.
Preloading context in warmup reduces tool calls and makes the routing decision easier to audit. Letting the dispatcher call tools at runtime can work when context is large or expensive, but then the prompt must say exactly which tools it may call before the final routing action.
Step 4: Write Routing Rules
The dispatcher prompt should constrain output tightly: one fenced Python block, no prose, comments for reasoning, and exactly one final action.
You route one incoming user message.
Use the printed context. Do not answer the user. Do not mutate domain state.
Return exactly one fenced Python block and no prose outside the block.
Priority order:
1. Answer detection wins. If the last assistant message in a recent thread asked
the user a question and the incoming message looks like an answer, dispatch to
that thread.
2. Explicit continuation wins over topic overlap. If the user clearly continues,
corrects, confirms, rejects, or adds details to a thread, dispatch to it.
3. Very recent activity is a strong continuation signal.
4. A fresh self-contained question should start a new thread, even if the topic
overlaps with an older thread.
5. If several threads are plausible, choose the most recent matching thread.
6. If starting a thread, choose one agent from printed agents.
7. Do not guess example ids. Use default_example_id only when that id was printed
for the chosen agent and the example matches the request.
Available final actions:
```python
dispatch_to(threads[i])
```
```python
start_new_thread(
agent="agent_role",
language="en",
name="Short thread name",
default_example_id=0,
)
```default_example_id is a startup hint for the worker. It is not hidden policy and it should not override the worker prompt, available tools, permissions, or required confirmations.
The intended routing outputs are small:
# The last assistant message in threads[0] asked for confirmation.
# The incoming message is a direct answer.
dispatch_to(threads[0])# Fresh self-contained lookup request.
# Printed example 1 for information_retriever matches schedule lookup.
start_new_thread(
agent="information_retriever",
language="en",
name="Schedule lookup",
default_example_id=1,
)# Fresh request that needs a scheduling workflow, not read-only retrieval.
# Printed example 2 for schedule_assistant matches coverage planning.
start_new_thread(
agent="schedule_assistant",
language="en",
name="Coverage request",
default_example_id=2,
)Dispatcher Decision Flow
The dispatcher should decide in this order. This keeps short replies such as "yes", "Friday is better", or "use the second option" attached to the worker that asked the question.
Step 5: Build Specialist Workers
Specialist workers should have narrow tools, warmup, examples, locale, and explicit side-effect boundaries. A worker can use the selected example to start faster, but the prompt still controls behavior.
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
print("Current UTC time:", now.isoformat())
print("User:")
print(user)
print("Incoming message:")
print(message)
print("Selected default_example_id:")
print(default_example_id if "default_example_id" in locals() else None)
print("Available examples:")
list_of_examples()
if "default_example_id" in locals():
print("Selected example:")
show_example(default_example_id)ROLE: Information retriever.
GOAL: Answer read-only questions using retrieval tools.
Use retrieval tools only. Do not create requests, update preferences, send
approvals, or mutate durable application state.
If the request is ambiguous, ask one concise clarifying question.
If you have enough verified context, answer directly.ROLE: Schedule assistant.
GOAL: Help the user plan or request a schedule-related change.
Inspect the relevant context first. Propose options before taking action.
Ask for explicit confirmation before submitting a change or contacting another
person. Submit only through the named application tool after confirmation and
policy checks are satisfied.ROLE: Preferences assistant.
GOAL: Help the user understand and update their preferences.
Read current preferences before proposing edits. When an edit changes durable
state, summarize the exact change and ask for confirmation before submitting it.Choose language or locale when the thread is created, then pass it into the delegated job. STATEK job locale can add language-specific rules and hints, but your app still decides how to infer the user's preferred language.
Tool permissions and side effects are application-enforced. A route to schedule_assistant does not by itself make a schedule change safe. Keep state-changing tools narrow, named, audited, and confirmation-aware.
Worker Lifecycle
A worker job should be inspectable from startup through its final answer or side effect.
Step 6: Run The Fleet
For continuous incoming messages, run the dispatcher and workers from a queue-processing worker. The high-level runner uses a single-agent loop for one agent and a fleet loop when more than one agent is active.
from statek import start_statek
from statek.statek_push_queue import StatekPushQueue
root = StatekRoot()
queue = StatekPushQueue()
start_statek(
agents=[
root.dispatcher,
root.information_retriever,
root.schedule_assistant,
root.preferences_assistant,
root.sounding_board,
],
push_queues=[queue],
max_concurrency=50,
)If your application has a separate queue coordinator, it can push incoming message events to the dispatcher and let dispatch_to(...) wake existing worker jobs or start_new_thread(...) create new ones.
def on_incoming_message(message):
queue.push_to_agent_queue(root.dispatcher, message)A dispatcher-only fleet is enough when all incoming work begins as user messages. A coordinator plus dispatcher split is useful when queue processing also needs ingestion, locks, retries, priority, or non-message task events before routing.
Threading Policy
Threading policy is product-specific, but the dispatcher should make it explicit.
- Start a new thread by default for fresh, self-contained repeated questions such as "What is my schedule next week?"
- Dispatch only on clear continuation when the user is answering, correcting, confirming, rejecting, or adding details to an ongoing worker conversation.
- Choose the most recent matching thread when several threads are plausible.
- Prefer a new specialist thread when the requested capability changes and no existing worker is waiting for an answer.
This keeps durable worker histories focused. A read-only lookup thread does not quietly become a state-changing preferences thread, and a pending confirmation does not lose the user's short answer.
When Not To Use This
Do not start with this architecture for one-shot stateless prompts, a tiny prototype, or a workflow where one prompt and no durable thread context is enough.
Add the dispatcher and specialist fleet when there is real routing pressure: multiple task types, different tool permissions, different models, long-lived user conversations, queue processing, or a need to inspect why an incoming message went to a particular worker.
Where To Go Next
- Agents: define roles, tools, descriptions, and worker context
- Warmup Code: prepare dispatcher and worker jobs
- Examples Machinery: provide examples for startup hints and worker behavior
- Runners: run loops, fleets, and queue workers
- Security: bound code execution, secrets, and side effects