My Agent Core Design - Event Stream & Observability
Event Stream & Observability
- 第一版支持 session replay + 结构化 event log.
- SDK 自身日志使用 Python 标准库
logging. - logging 默认输出简洁文本日志, 用于诊断 SDK 自身问题.
- 完整运行 timeline、调试信息和可 replay 数据以 event / SQLite / artifact / Task WAL 为准.
- core 不引入
structlog作为第一版依赖. - event 顺序:
- storage manager 通过单 writer 队列分配全局递增
seq. - 所有落库 event 按 writer 队列顺序获得
seq. - 每个 run 额外维护
run_seq, 用于 run 内 replay. - 时间戳只用于展示和粗略筛选, 不作为唯一排序依据.
- storage manager 通过单 writer 队列分配全局递增
- Context node 和结构性运行事件都落 SQLite, 方便 inspect / replay.
- 高频 delta 默认只走实时 event stream, 不落 SQLite.
- debug 模式可以把高频 delta 聚合/采样后落库, 不逐 token 写 SQLite.
- Runtime API 暴露单消费者
run.events(debug=False); 调用方需要多个消费者时自行 fan-out. run.events(debug=False)默认只包含用户可见事件和必要结构性事件.- Task 相关事件默认视为内部/debug 事件, 不进入普通用户可见流;
debug=True或 inspect/replay 可查看. run.events(debug=True)或 inspect API 可以看到 provider/debug event.run.events()使用有界 asyncio queue + 背压:- producer
await put. - 消费者慢时 run 会适度变慢.
- 队列不会无限增长.
- 结构性事件不丢.
- producer
- 高频 delta coalescing:
- 连续
model_text_delta可以合并成更大的 delta. - 合并不丢文本内容.
- tool/model lifecycle 等结构性事件不可合并.
- 连续
- 新订阅不回放历史事件, 历史事件从 SQLite 查询.
- Model streaming delta:
- 默认不落 SQLite, 只通过
run.events()实时给调用方. - model 完成后一次性写完整 assistant node 和 model_completed event.
- debug 开启时可落聚合/采样后的 delta event, 不建议每个 token 都写 SQLite.
- 默认不落 SQLite, 只通过
- ContextBuildReport:
- 每次 prompt composer 完成后写
context_builtevent. context_builtevent 不关联具体 node_id.- event 通过 run_id / agent_id 关联到本次运行.
- payload 放轻量
ContextBuildReport. - payload 中列 included/excluded node ids, compaction node, system block selection, token budget 和 artifact 降级情况.
- debug 模式额外保存详细 report artifact.
- 每次 prompt composer 完成后写
- Assistant streaming abort:
- 用户 abort model streaming 时, 写 assistant node 保存 partial text.
- metadata 标记 partial, aborted, abort_reason.
- 后续 prompt composer 默认跳过该 partial assistant node, 不参与模型上下文.
- inspect/replay 仍可看到 partial 输出.
- 写 aborted_streaming event, run end_reason 为 aborted_streaming.
- Event stream 包含用户可见事件和 debug 事件:
- run_queued / run_dequeued / run_cancelled
- loop_started / loop_completed / loop_aborted / loop_failed
- context_built
- message_created / message_delta / message_completed
- model_started / model_text_delta / model_completed / model_failed
- tool_started / tool_progress / tool_completed / tool_failed / tool_denied
- hook_started / hook_completed / hook_failed / hook_denied
- child_agent_created / worker_dispatched / worker_dispatch_failed / child_agent_cancel_requested / child_agent_cancelled / child_agent_cancel_timeout / child_agent_completed / child_agent_failed / child_result_schema_validation_failed
- compact_started / compact_completed / compact_failed
- memory_extraction_started / memory_extraction_completed / memory_extraction_failed
- error
- task_created / task_running / task_updated / task_reopened / task_failed / task_completed / task_cancelled / task_step_ready / task_step_claimed / task_step_started / task_step_claim_skipped / task_step_updated / task_step_blocked / task_step_completed / task_step_failed / task_step_cancelled / task_step_reopened / task_step_lease_expired / task_wal_replayed / task_wal_failed
- 默认用户可见 event 只暴露普通文件写入结果和最终 assistant 输出; Task event 仅用于内部调度、debug、inspect 和 replay.
- Memory Extraction Job 不是 child/sub/fork agent; memory_extraction_* event 不对应 agent_id / run_id, 通过 session_id 和 source node range 关联.
- memory_extraction_completed payload 包含 created_memory_ids / updated_memory_ids / ignored_candidates / duplicate_decisions / conflicts / source_node_ids / files_changed / scan_cursor.
- memory_extraction_failed payload 必须 redacted, 不包含 secret、完整 transcript 或未脱敏原始模型响应.
task_step_claim_skipped覆盖step_already_claimed/no_step_claimed等不改变 Task DAG 的 claim 结果.task_reopened表示 Orchestrator 将 blocked Task 恢复为 pending.task_failed表示 Orchestrator 显式将整个 Task 置为 failed.no_step_claimed不写 Task WAL, 只作为 runtime event / dispatch_worker 成功 result / debug 信息存在.task_step_started表示 worker 将 claimed step 推进为 running.task_step_failed是独立 Task WAL / event 类型, 用于 worker 显式失败、worker 结束兜底失败、取消和超时失败.- step terminal 状态使用独立 Task WAL / event 类型:
task_step_completedtask_step_failedtask_step_cancelled
task_step_blocked表示 worker 或 Orchestrator 将 step 标记为 blocked.task_step_updated只表示非 terminal step 内容、结果或执行状态更新.task_step_reopened表示 Orchestrator 将 blocked / failed step 恢复为 pending.- Replay / Inspect API:
- 支持
replay_session(session_id, from_seq=None, to_seq=None). - 支持
replay_run(run_id). - 支持
get_node_path(node_id). - Inspect 返回 nodes / events / artifacts 三块, 不把 node 和 event 混成同一种对象.
- node 是可重建模型上下文的主数据.
- event 是 timeline/debug 日志.
- artifacts 是大 payload、raw debug、文件输出等外部引用.
replay_session默认包含 child/sub agent 摘要事件和 result summary, 不展开 sub agent transcript.- sub agent 详情通过
inspect_child(child_run_id)或独立 child run stream 查询. - Inspect API 支持基于历史 nodes, tools, system blocks 和
context_builtreport 重建 provider-neutralModelRequest视图. - raw provider request/response 只有 debug artifact 存在时才可查看.
- Replay/Inspect API 默认做 redaction,
include_sensitive=False. - 过滤范围包括 api key, env secret, hook/tool 输出中的敏感字段, raw provider debug artifact 摘要等.
- 需要显式
include_sensitive=True才返回敏感原文. - Plan 文件是普通文件; 缺失时按普通 file read missing 处理.
- Inspect 当前 Task 直接读取内存 Task DAG.
- Task timeline 从项目级 Task WAL JSONL 查询.
- 如果 Task WAL replay 失败, inspect 返回 task_wal_unavailable 和错误摘要.
- 支持
- Debug dump 后置, 第一版不做重点.
This post is licensed under CC BY 4.0 by the author.