Skip to main content
Every symbol listed here is exported from qitos and accessible as:
from qitos import AgentModule, Engine, Decision, ...

AgentModule is the strategy layer of QitOS. You subclass it to define your agent’s state shape, system prompt, decision logic, and reduction rules. The Engine drives the execution loop and calls each hook in order.
class AgentModule(ABC, Generic[StateT, ObservationT, ActionT])
Constructor
def __init__(
    self,
    tool_registry: Any = None,
    llm: Any = None,
    model_parser: Any = None,
    memory: Memory | None = None,
    history: History | None = None,
    **config: Any,
)
ParameterTypeDescription
tool_registryToolRegistry | NoneRegistry of tools the agent can call
llmAnyLLM callable used for model decisions
model_parserAnyParser that converts raw model output to a Decision
memoryMemory | NoneOptional memory adapter
historyHistory | NoneOptional history adapter
**configAnyExtra keyword args stored as self.config
HooksOverride these methods in your subclass. Only init_state and reduce are required.
@abstractmethod
def init_state(self, task: str, **kwargs: Any) -> StateT
Create and return the initial typed state for a run. Called once by Engine.run() before the step loop begins. Use **kwargs to accept extra parameters forwarded from AgentModule.run().
@abstractmethod
def reduce(
    self,
    state: StateT,
    observation: ObservationT,
    decision: Decision[ActionT],
) -> StateT
Fold the current observation and decision into the next state. Called at the end of every step. Return the updated state.
def build_system_prompt(self, state: StateT) -> str | None
Return a dynamic system prompt string, or None to use no system prompt. Called at the start of each step’s decide phase. Default returns None.
def prepare(self, state: StateT) -> str
Convert the current state into a model-ready text string (the user turn). Default returns str(state).
def decide(
    self,
    state: StateT,
    observation: ObservationT,
) -> Decision[ActionT] | None
Optional custom decision hook. Return a Decision to bypass the Engine’s model call, or None to let the Engine call the LLM and parse the output. Default returns None.
def should_stop(self, state: StateT) -> bool
Optional additional stop condition checked after each step. Return True to terminate the run with StopReason.AGENT_CONDITION. Default returns False.
.run() methodConvenience method that builds an Engine, runs it, and returns the final result.
def run(
    self,
    task: str | Task,
    return_state: bool = False,
    hooks: List[Any] | None = None,
    render_hooks: List[Any] | None = None,
    engine_kwargs: Dict[str, Any] | None = None,
    workspace: str | None = None,
    max_steps: int | None = None,
    env: Any = None,
    parser: Any = None,
    search: Any = None,
    critics: List[Any] | None = None,
    stop_criteria: List[Any] | None = None,
    history_policy: Any = None,
    trace: Any = None,
    render: Any = None,
    trace_logdir: str = "./runs",
    trace_prefix: str | None = None,
    theme: str = "research",
    **state_kwargs: Any,
) -> Any
ParameterTypeDefaultDescription
taskstr | TaskrequiredTask objective or structured Task object
return_stateboolFalseWhen True, returns the full EngineResult; otherwise returns state.final_result
hooksList[Any] | NoneNoneAdditional EngineHook instances to register
render_hooksList[Any] | NoneNoneAdditional render hook instances
engine_kwargsDict[str, Any] | NoneNoneExtra keyword arguments forwarded to the Engine constructor
workspacestr | NoneNonePath to workspace root; auto-constructs a HostEnv when set
max_stepsint | NoneNoneOverride the maximum number of steps
envAnyNoneExplicit Env instance; takes precedence over workspace
parserAnyNoneParser to pass to the Engine
searchAnyNoneSearch strategy instance
criticsList[Any] | NoneNoneList of Critic instances
stop_criteriaList[Any] | NoneNoneCustom stop criteria list
history_policyAnyNoneHistoryPolicy instance
traceAnyNoneTrue to enable default tracing, or a TraceWriter instance
renderAnyNoneTrue to enable default render hook
trace_logdirstr"./runs"Directory where trace files are written
trace_prefixstr | NoneNonePrefix for the auto-generated run ID
themestr"research"Render theme name
**state_kwargsAnyExtra kwargs forwarded to init_state()
Returns state.final_result by default, or an EngineResult when return_state=True.
Engine is the execution kernel. It owns the phase loop, tool execution, recovery, tracing, and stop-criteria evaluation. You normally obtain an Engine through AgentModule.build_engine() or AgentModule.run(), but you can also construct one directly.
class Engine(Generic[StateT, ObservationT, ActionT])
Constructor
def __init__(
    self,
    agent: AgentModule[StateT, ObservationT, ActionT],
    budget: RuntimeBudget | None = None,
    validation_gate: StateValidationGate | None = None,
    recovery_handler: RecoveryHandler | None = None,
    recovery_policy: RecoveryPolicy | None = None,
    trace_writer: TraceWriter | None = None,
    parser: Parser[ActionT] | None = None,
    stop_criteria: List[StopCriteria] | None = None,
    branch_selector: BranchSelector | None = None,
    search: Search | None = None,
    critics: List[Critic] | None = None,
    env: Env | None = None,
    history_policy: HistoryPolicy | None = None,
    hooks: List[EngineHook] | None = None,
    render_hooks: List[Any] | None = None,
)
ParameterTypeDefaultDescription
agentAgentModulerequiredAgent whose hooks the Engine will call
budgetRuntimeBudget | NoneRuntimeBudget(max_steps=10)Step, time, and token budgets
validation_gateStateValidationGate | Nonedefault gatePre/post phase state validation
recovery_handlerRecoveryHandler | NoneNoneCallable invoked on recoverable errors
recovery_policyRecoveryPolicy | Nonedefault policyControls retry behaviour on failures
trace_writerTraceWriter | NoneNoneWrites structured trace artifacts to disk
parserParser[ActionT] | NoneNoneParser for raw model output
stop_criteriaList[StopCriteria] | None[FinalResultCriteria()]Ordered list of stop criteria
branch_selectorBranchSelector | NoneFirstCandidateSelector()Strategy for picking among branch candidates
searchSearch | NoneNoneSearch strategy for branch decisions
criticsList[Critic] | None[]Critics evaluated after each step
envEnv | NoneNoneEnvironment for observe/step lifecycle
history_policyHistoryPolicy | NoneHistoryPolicy()Controls message history assembly
hooksList[EngineHook] | None[]Engine lifecycle hooks
render_hooksList[Any] | NoneNoneRender hooks merged into hooks
Methods
def run(self, task: str | Task, **kwargs: Any) -> EngineResult[StateT]
Execute the agent loop for task. Resets run state, initialises the env, calls agent.init_state(), then iterates the decide→act→reduce→check_stop cycle until a stop condition triggers. Returns an EngineResult.
def register_hook(self, hook: Any) -> None
Append one hook instance to the active hook list.
def unregister_hook(self, hook: Any) -> None
Remove one hook instance from the active hook list (identity comparison).
def clear_hooks(self) -> None
Remove all registered hooks.
EngineResult is the dataclass returned by Engine.run().
@dataclass
class EngineResult(Generic[StateT]):
    state: StateT
    records: List[StepRecord]
    events: List[RuntimeEvent]
    step_count: int
    task_result: Optional[TaskResult] = None
FieldTypeDescription
stateStateTFinal typed state after the run
recordsList[StepRecord]Per-step records including decision, actions, and observations
eventsList[RuntimeEvent]Ordered list of all runtime events emitted during the run
step_countintNumber of steps executed
task_resultTaskResult | NoneStructured task outcome, populated when a Task object was passed
Decision is the canonical output of the decide phase. Use the factory class methods rather than constructing directly.
@dataclass
class Decision(Generic[ActionT]):
    mode: DecisionMode          # "act" | "final" | "wait" | "branch"
    actions: List[ActionT]
    final_answer: Optional[str]
    rationale: Optional[str]
    meta: Dict[str, Any]
    candidates: List[Decision[ActionT]]
Modes
ModeMeaning
"act"Execute one or more actions
"final"Produce a final answer and stop
"wait"Skip action execution this step
"branch"Propose multiple candidate decisions for the branch selector
Factory methods
@classmethod
def act(
    cls,
    actions: List[ActionT],
    rationale: Optional[str] = None,
    meta: Optional[Dict[str, Any]] = None,
) -> Decision[ActionT]
@classmethod
def final(
    cls,
    answer: str,
    rationale: Optional[str] = None,
    meta: Optional[Dict[str, Any]] = None,
) -> Decision[ActionT]
@classmethod
def wait(
    cls,
    rationale: Optional[str] = None,
    meta: Optional[Dict[str, Any]] = None,
) -> Decision[ActionT]
@classmethod
def branch(
    cls,
    candidates: List[Decision[ActionT]],
    rationale: Optional[str] = None,
    meta: Optional[Dict[str, Any]] = None,
) -> Decision[ActionT]
.validate() — Raises ValueError if the decision is structurally invalid (e.g. act with no actions).
Action is the normalized action contract emitted by the policy and consumed by the executor.
@dataclass
class Action:
    name: str
    args: Dict[str, Any] = field(default_factory=dict)
    kind: ActionKind = ActionKind.TOOL
    action_id: Optional[str] = None
    timeout_s: Optional[float] = None
    max_retries: int = 0
    idempotent: bool = True
    classification: str = "default"
    metadata: Dict[str, Any] = field(default_factory=dict)
FieldTypeDescription
namestrTool name to call
argsDict[str, Any]Keyword arguments forwarded to the tool
kindActionKindCurrently only ActionKind.TOOL ("tool")
action_idstr | NoneOptional unique identifier for the action
timeout_sfloat | NonePer-action timeout override in seconds
max_retriesintNumber of retries on failure
idempotentboolWhether the action is safe to retry
classificationstrUser-defined label for grouping/filtering
metadataDict[str, Any]Arbitrary extra metadata
Action.from_dict(payload) — Construct from a plain dict.
StateSchema is the canonical typed state base class. Subclass it to define your agent’s state fields.
@dataclass
class StateSchema:
    schema_version: int = 1
    task: str = ""
    current_step: int = 0
    max_steps: int = 10
    final_result: Optional[str] = None
    stop_reason: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)
    metrics: Dict[str, Any] = field(default_factory=dict)
FieldDescription
taskThe task objective string
current_stepStep counter incremented by advance_step()
max_stepsHard cap on steps; validated on every advance_step() call
final_resultThe agent’s final answer string
stop_reasonA StopReason value string set when the run ends
metadataFree-form dict for agent-specific data
metricsFree-form dict for numeric metrics
Key methods
def set_stop(self, reason: StopReason | str, final_result: Optional[str] = None) -> None
def advance_step(self) -> None
def validate(self) -> None
def to_dict(self) -> Dict[str, Any]

@classmethod
def from_dict(cls, payload: Dict[str, Any], strict: bool = True) -> StateT

@classmethod
def migrate_payload(cls, payload: Dict[str, Any], target_version: int) -> Dict[str, Any]
Use Task when you need to pass structured metadata, resources, and budget constraints alongside the objective string.
@dataclass
class Task:
    id: str
    objective: str
    inputs: Dict[str, Any] = field(default_factory=dict)
    resources: List[TaskResource] = field(default_factory=list)
    env_spec: Optional[EnvSpec] = None
    constraints: Dict[str, Any] = field(default_factory=dict)
    success_criteria: List[str] = field(default_factory=list)
    budget: TaskBudget = field(default_factory=TaskBudget)
    metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class TaskBudget:
    max_steps: Optional[int] = None
    max_runtime_seconds: Optional[float] = None
    max_tokens: Optional[int] = None
@dataclass
class TaskResource:
    kind: str               # "file" | "dir" | "url" | "artifact"
    path: Optional[str] = None
    uri: Optional[str] = None
    mount_to: Optional[str] = None
    required: bool = True
    description: str = ""
    metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class TaskResult:
    task_id: str
    success: bool
    stop_reason: Optional[str]
    final_result: Any
    criteria: List[TaskCriterionResult] = field(default_factory=list)
    artifacts: List[TaskResourceBinding] = field(default_factory=list)
    metrics: Dict[str, Any] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)
Task helper methods
def validate(self) -> None
def validate_structured(self, workspace: Optional[str] = None) -> List[TaskValidationIssue]
def resolve_resources(self, workspace: Optional[str] = None) -> List[TaskResourceBinding]
def to_dict(self) -> Dict[str, Any]

@classmethod
def from_dict(cls, payload: Dict[str, Any]) -> Task
Env is the abstract environment interface. Implement it to provide a custom observe/step lifecycle for your agent.
class Env(ABC):
    @abstractmethod
    def reset(self, task: Any = None) -> Any: ...

    @abstractmethod
    def observe(self) -> Any: ...

    @abstractmethod
    def step(self, action: Any) -> Any: ...

    @abstractmethod
    def is_terminal(self) -> bool: ...

    @abstractmethod
    def close(self) -> None: ...
EnvSpec is a dataclass used inside Task to declare the environment type and configuration:
@dataclass
class EnvSpec:
    type: str                          # e.g. "host", "docker", "tau_bench"
    config: Dict[str, Any] = field(default_factory=dict)
    required_tools: List[str] = field(default_factory=list)
    capabilities: List[str] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)
The tool decorator marks a callable as a QitOS tool and attaches metadata to it without changing its call semantics.
def tool(
    name: Optional[str] = None,
    description: Optional[str] = None,
    timeout_s: Optional[float] = None,
    max_retries: int = 0,
    permissions: Optional[ToolPermission] = None,
    required_ops: Optional[List[str]] = None,
)
ParameterTypeDescription
namestr | NoneOverride the tool name (defaults to the function’s __name__)
descriptionstr | NoneOverride the tool description (defaults to the docstring)
timeout_sfloat | NonePer-call timeout in seconds
max_retriesintNumber of retries on failure
permissionsToolPermission | NonePermission flags for the tool
required_opsList[str] | NoneRuntime ops required from the environment
Example
from qitos import tool

@tool(name="search_web", timeout_s=30.0, permissions=ToolPermission(network=True))
def search_web(query: str) -> str:
    """Search the web and return results."""
    ...
ToolRegistry stores tools and toolsets and is passed to AgentModule and Engine at construction time.
class ToolRegistry
ConstructorToolRegistry() (no parameters)Methods
def register(
    self,
    item: Any,
    name: Optional[str] = None,
    meta: Optional[ToolMeta] = None,
) -> ToolRegistry
Register a single callable or BaseTool. Returns self for chaining.
def register_toolset(
    self,
    toolset: Any,
    namespace: Optional[str] = None,
) -> ToolRegistry
Register all tools from a toolset object. Tool names are prefixed with namespace (defaults to toolset.name).
def include(self, obj: Any) -> ToolRegistry
Scan an object for methods decorated with @tool and register them all.
def get(self, name: str) -> Optional[BaseTool]
def list_tools(self) -> List[str]
def list_toolsets(self) -> List[str]
def describe_tool(self, name: str) -> Dict[str, Any]
def call(self, name: str, runtime_context: Optional[Dict[str, Any]] = None, **kwargs: Any) -> Any
def get_tool_descriptions(self) -> str
def get_all_specs(self) -> List[Dict[str, Any]]
def setup(self, context: Optional[Dict[str, Any]] = None) -> None
def teardown(self, context: Optional[Dict[str, Any]] = None) -> None
Example
from qitos import ToolRegistry, tool

registry = ToolRegistry()

@tool(name="greet")
def greet(name: str) -> str:
    """Say hello."""
    return f"Hello, {name}!"

registry.register(greet)
Memory is the abstract interface for long-term memory adapters.
class Memory(ABC):
    @abstractmethod
    def append(self, record: MemoryRecord) -> None: ...

    @abstractmethod
    def retrieve(
        self,
        query: Optional[Dict[str, Any]] = None,
        state: Any = None,
        observation: Any = None,
    ) -> Any: ...

    @abstractmethod
    def summarize(self, max_items: int = 5) -> str: ...

    @abstractmethod
    def evict(self) -> int: ...

    @abstractmethod
    def reset(self, run_id: Optional[str] = None) -> None: ...
MemoryRecord is the unit of storage:
@dataclass
class MemoryRecord:
    role: str
    content: Any
    step_id: int
    metadata: Dict[str, Any] = field(default_factory=dict)
History is the abstract interface for model message history adapters.
class History(ABC):
    @abstractmethod
    def append(self, message: HistoryMessage) -> None: ...

    @abstractmethod
    def retrieve(
        self,
        query: Optional[Dict[str, Any]] = None,
        state: Any = None,
        observation: Any = None,
    ) -> Any: ...

    @abstractmethod
    def summarize(self, max_items: int = 5) -> str: ...

    @abstractmethod
    def evict(self) -> int: ...

    @abstractmethod
    def reset(self, run_id: Optional[str] = None) -> None: ...
HistoryPolicy controls how the Engine assembles history for model calls:
@dataclass
class HistoryPolicy:
    roles: List[str] = field(default_factory=lambda: ["user", "assistant"])
    max_messages: int = 24
    step_window: Optional[int] = None
    max_tokens: Optional[int] = None
FieldDescription
rolesMessage roles to include
max_messagesMaximum number of messages to include
step_windowIf set, only include messages from the last N steps
max_tokensIf set, trim messages to fit within this token budget
HistoryMessage is the unit of storage:
@dataclass
class HistoryMessage:
    role: str
    content: str
    step_id: int
    metadata: Dict[str, Any] = field(default_factory=dict)
StopReason is a string enum. Its value is written to state.stop_reason when a run ends.
class StopReason(str, Enum):
    SUCCESS = "success"
    FINAL = "final"
    MAX_STEPS = "max_steps"
    BUDGET_STEPS = "budget_steps"
    BUDGET_TIME = "budget_time"
    BUDGET_TOKENS = "budget_tokens"
    AGENT_CONDITION = "agent_condition"
    CRITIC_STOP = "critic_stop"
    STAGNATION = "stagnation"
    ENV_TERMINAL = "env_terminal"
    TASK_VALIDATION_FAILED = "task_validation_failed"
    ENV_CAPABILITY_MISMATCH = "env_capability_mismatch"
    UNRECOVERABLE_ERROR = "unrecoverable_error"
ValueWhen set
successAgent completed successfully
finalDecision.final() was accepted
max_stepsStateSchema.max_steps reached
budget_stepsRuntimeBudget.max_steps reached
budget_timeRuntimeBudget.max_runtime_seconds elapsed
budget_tokensRuntimeBudget.max_tokens consumed
agent_conditionAgentModule.should_stop() returned True
critic_stopA Critic returned action="stop"
stagnationNo state change detected for N steps
env_terminalEnv.is_terminal() returned True
task_validation_failedTask.validate_structured() produced issues
env_capability_mismatchEnvironment missing required ops
unrecoverable_errorFatal error with no recovery path
QitosRuntimeError is the base class for all structured runtime errors in QitOS.
class QitosRuntimeError(Exception):
    def __init__(self, info: RuntimeErrorInfo): ...
    info: RuntimeErrorInfo
RuntimeErrorInfo carries structured context:
@dataclass
class RuntimeErrorInfo:
    category: ErrorCategory   # model | parse | tool | state | task | env | system
    message: str
    phase: str
    step_id: int
    recoverable: bool = False
    details: Dict[str, Any] = field(default_factory=dict)
Typed subclasses: ModelExecutionError, ParseExecutionError, ToolExecutionError, StateExecutionError, SystemExecutionError.