跳转到主要内容

Documentation Index

Fetch the complete documentation index at: https://qitor.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Engine 是 QitOS 所有智能体工作流共享的执行内核(kernel)。它运行步骤循环,协调 AgentModule 的钩子,分发工具调用,执行评估器,检查停止条件,并写出追踪记录产物。只有当你需要比 agent.run() 更细粒度的控制时,才会直接和它交互。
QitOS 强制遵守单内核规则:一次运行只有一个 Engine。解析器、评估器、记忆适配器、工具集等扩展都挂接在这条主流水线上,而不是再引入第二个执行循环。

循环是如何工作的

每一步都遵循固定顺序:
prepare → decide → act → reduce → critics → check_stop → trace
  1. prepareagent.prepare(state) 把状态格式化成模型可直接消费的提示词文本。
  2. decide:先调用 agent.decide(state, observation);若返回 None,再走 Engine 的默认模型调用路径。
  3. act:把 Decision.actions 中的工具调用交给 ToolRegistry 执行。
  4. reduceagent.reduce(state, observation, decision) 用新的观测结果更新状态。
  5. 评估器:所有已注册的 Critic 在此步后评估当前结果,必要时可停止或重试。
  6. check_stop:检查预算、FinalResultCriteriaagent.should_stop() 以及自定义 StopCriteria
  7. 追踪记录:把本步的步骤记录与运行时事件写入 TraceWriter

构造函数

from qitos import Engine

engine = Engine(
    agent=agent,
    budget=RuntimeBudget(max_steps=20),
    parser=ReActTextParser(),
    critics=[my_critic],
    stop_criteria=[FinalResultCriteria()],
    env=host_env,
    trace_writer=trace_writer,
    hooks=[my_hook],
)
参数类型说明
agentAgentModule必需。要执行的策略模块。
budgetRuntimeBudget | None步数、时间与令牌限制。默认 RuntimeBudget(max_steps=10)
parserParser | None把原始模型输出解析成 Decision 的解析器,必须与提示词格式匹配。
criticslist[Critic] | None每步后执行的评估器,可停止或重试。
stop_criterialist[StopCriteria] | None停止条件列表。默认 [FinalResultCriteria()]
envEnv | None提供 reset/observe/step/is_terminal/close 生命周期的环境。
trace_writerTraceWriter | None为本次运行写出追踪记录产物:manifest.jsonevents.jsonlsteps.jsonl
hookslist[EngineHook] | None生命周期钩子。
render_hookslist | None渲染钩子,会在内部合并进 hooks
history_policyHistoryPolicy | None控制运行内对话历史如何被组装。
recovery_policyRecoveryPolicy | None控制步骤失败时如何恢复。
单次运行的普通场景优先用 agent.run()。当你需要跨多次任务复用同一个 Engine,或在运行间动态调整钩子时,再直接使用 Engine

Engine.run(task)

result = engine.run(task)
它接受普通字符串任务,也接受结构化 Task 对象,返回一个 EngineResult 当传入 Task 时,Engine 会读取 task.budget 并覆盖自身默认预算,同时自动管理资源预检、环境重置/观测/关闭等生命周期。

EngineResult

@dataclass
class EngineResult(Generic[StateT]):
    state: StateT
    records: List[StepRecord]
    events: List[RuntimeEvent]
    step_count: int
    task_result: Optional[TaskResult]
字段说明
state运行结束后的最终强类型状态。重点查看 state.final_resultstate.stop_reason
records每步一个 StepRecord,包含决策、观测结果与状态差异。
events本次运行发出的全部 RuntimeEvent
step_count实际执行的步数,即 len(records)
task_result结构化任务结果,包含成功标志与条件结果。
常见用法:
result = engine.run("summarize the paper")

print(result.state.final_result)
print(result.state.stop_reason)
print(result.step_count)
print(result.task_result.success)

钩子

钩子用于在不修改 Engine 内部逻辑的前提下观察或响应生命周期事件。它们通常实现 EngineHook,在 on_before_stepon_after_step 等边界被调用。
engine.register_hook(my_hook)
engine.unregister_hook(my_hook)
engine.clear_hooks()
你也可以在构造时通过 hooks 传入,或在 agent.run(hooks=[...]) 中动态附加。

预算耗尽

当步数、墙钟时间或令牌预算耗尽时,Engine 会把 state.stop_reason 设置为对应值,并写出 END 事件。运行会优雅结束,仍然返回 EngineResult,你可以通过检查 state.stop_reason 判断是否发生了预算耗尽。
from qitos.engine.states import RuntimeBudget

engine = Engine(
    agent=agent,
    budget=RuntimeBudget(
        max_steps=30,
        max_runtime_seconds=120.0,
        max_tokens=50_000,
    ),
)

从 AgentModule 构建 Engine

AgentModule.build_engine() 是一个便捷工厂,会创建一个和当前智能体绑定好的 Engine:
engine = agent.build_engine(
    budget=RuntimeBudget(max_steps=15),
    trace_writer=trace_writer,
)
result = engine.run(task)
这与直接写 Engine(agent=agent, ...) 等价,也是 agent.run() 在内部采用的路径。

AsyncEngine

AsyncEngine 提供非阻塞的智能体执行能力。它封装了同样的 Engine 循环,但把阻塞调用放到线程池中执行,因此在 asyncio 事件循环中使用是安全的。
from qitos import AsyncEngine

async_engine = AsyncEngine(agent=agent, budget=RuntimeBudget(max_steps=20))

AsyncEngine.arun(task)

异步运行智能体循环,返回与 Engine.run() 相同的 EngineResult
result = await async_engine.arun("分析数据")
print(result.state.final_result)

AsyncEngine.arun_stream(task)

异步运行智能体循环,并在事件发生时产出结构化的 EngineEvent 对象——非常适合实时 UI 更新或将进度流式推送给客户端。
async for event in async_engine.arun_stream("分析数据"):
    print(event.event_type, event.step_id, event.payload)
事件在步骤边界(step_startstep_end)、阶段切换(decideactreducecriticcheck_stop)和多智能体事件(handoffdelegatefanout)时产出。流总是以 run_start 开始、以 run_end 结束。

EngineEvent

@dataclass
class EngineEvent:
    event_type: EngineEventType   # step_start, step_end, decide, act, ...
    step_id: int
    agent_id: str | None          # 产出此步骤的智能体
    phase: RuntimePhase | None
    ok: bool
    payload: Dict[str, Any]
    error: str | None
    ts: str                       # ISO 8601 时间戳

EventStream

EventStream 是支撑 arun_stream() 的异步队列。你也可以独立使用它,将事件扇出到多个消费者:
from qitos.engine.events import EventStream

stream = EventStream()
q1 = stream.subscribe()  # 消费者 1 的 asyncio.Queue
q2 = stream.subscribe()  # 消费者 2 的 asyncio.Queue

异步模型

当配置的模型实现了 acall()(来自 AsyncModel)时,AsyncEngine 可以在不阻塞事件循环的情况下调用它。内置的异步模型适配器:
from qitos.models import AsyncOpenAICompatibleModel, AsyncOpenAIModel

llm = AsyncOpenAICompatibleModel(
    model="qwen-plus",
    api_key="...",
    base_url="https://...",
)
result = await llm.acall([{"role": "user", "content": "你好"}])
AsyncEngine.arun() 与任何模型都兼容——同步模型会自动调度到线程池。当你需要真正的非阻塞 I/O(例如在高并发 Web 服务器中)时,才需要使用异步模型适配器。