Post

My Agent Core Design - Event Stream & Observability

Event Stream & Observability

  • 第一版支持 session replay + 结构化 event log.
  • SDK 自身日志使用 Python 标准库 logging.
  • logging 默认输出简洁文本日志, 用于诊断 SDK 自身问题.
  • 完整运行 timeline、调试信息和可 replay 数据以 event / SQLite / artifact / Task WAL 为准.
  • core 不引入 structlog 作为第一版依赖.
  • event 顺序:
    • storage manager 通过单 writer 队列分配全局递增 seq.
    • 所有落库 event 按 writer 队列顺序获得 seq.
    • 每个 run 额外维护 run_seq, 用于 run 内 replay.
    • 时间戳只用于展示和粗略筛选, 不作为唯一排序依据.
  • Context node 和结构性运行事件都落 SQLite, 方便 inspect / replay.
  • 高频 delta 默认只走实时 event stream, 不落 SQLite.
  • debug 模式可以把高频 delta 聚合/采样后落库, 不逐 token 写 SQLite.
  • Runtime API 暴露单消费者 run.events(debug=False); 调用方需要多个消费者时自行 fan-out.
  • run.events(debug=False) 默认只包含用户可见事件和必要结构性事件.
  • Task 相关事件默认视为内部/debug 事件, 不进入普通用户可见流; debug=True 或 inspect/replay 可查看.
  • run.events(debug=True) 或 inspect API 可以看到 provider/debug event.
  • run.events() 使用有界 asyncio queue + 背压:
    • producer await put.
    • 消费者慢时 run 会适度变慢.
    • 队列不会无限增长.
    • 结构性事件不丢.
  • 高频 delta coalescing:
    • 连续 model_text_delta 可以合并成更大的 delta.
    • 合并不丢文本内容.
    • tool/model lifecycle 等结构性事件不可合并.
  • 新订阅不回放历史事件, 历史事件从 SQLite 查询.
  • Model streaming delta:
    • 默认不落 SQLite, 只通过 run.events() 实时给调用方.
    • model 完成后一次性写完整 assistant node 和 model_completed event.
    • debug 开启时可落聚合/采样后的 delta event, 不建议每个 token 都写 SQLite.
  • ContextBuildReport:
    • 每次 prompt composer 完成后写 context_built event.
    • context_built event 不关联具体 node_id.
    • event 通过 run_id / agent_id 关联到本次运行.
    • payload 放轻量 ContextBuildReport.
    • payload 中列 included/excluded node ids, compaction node, system block selection, token budget 和 artifact 降级情况.
    • debug 模式额外保存详细 report artifact.
  • Assistant streaming abort:
    • 用户 abort model streaming 时, 写 assistant node 保存 partial text.
    • metadata 标记 partial, aborted, abort_reason.
    • 后续 prompt composer 默认跳过该 partial assistant node, 不参与模型上下文.
    • inspect/replay 仍可看到 partial 输出.
    • 写 aborted_streaming event, run end_reason 为 aborted_streaming.
  • Event stream 包含用户可见事件和 debug 事件:
    • run_queued / run_dequeued / run_cancelled
    • loop_started / loop_completed / loop_aborted / loop_failed
    • context_built
    • message_created / message_delta / message_completed
    • model_started / model_text_delta / model_completed / model_failed
    • tool_started / tool_progress / tool_completed / tool_failed / tool_denied
    • hook_started / hook_completed / hook_failed / hook_denied
    • child_agent_created / worker_dispatched / worker_dispatch_failed / child_agent_cancel_requested / child_agent_cancelled / child_agent_cancel_timeout / child_agent_completed / child_agent_failed / child_result_schema_validation_failed
    • compact_started / compact_completed / compact_failed
    • memory_extraction_started / memory_extraction_completed / memory_extraction_failed
    • error
    • task_created / task_running / task_updated / task_reopened / task_failed / task_completed / task_cancelled / task_step_ready / task_step_claimed / task_step_started / task_step_claim_skipped / task_step_updated / task_step_blocked / task_step_completed / task_step_failed / task_step_cancelled / task_step_reopened / task_step_lease_expired / task_wal_replayed / task_wal_failed
  • 默认用户可见 event 只暴露普通文件写入结果和最终 assistant 输出; Task event 仅用于内部调度、debug、inspect 和 replay.
  • Memory Extraction Job 不是 child/sub/fork agent; memory_extraction_* event 不对应 agent_id / run_id, 通过 session_id 和 source node range 关联.
  • memory_extraction_completed payload 包含 created_memory_ids / updated_memory_ids / ignored_candidates / duplicate_decisions / conflicts / source_node_ids / files_changed / scan_cursor.
  • memory_extraction_failed payload 必须 redacted, 不包含 secret、完整 transcript 或未脱敏原始模型响应.
  • task_step_claim_skipped 覆盖 step_already_claimed / no_step_claimed 等不改变 Task DAG 的 claim 结果.
  • task_reopened 表示 Orchestrator 将 blocked Task 恢复为 pending.
  • task_failed 表示 Orchestrator 显式将整个 Task 置为 failed.
  • no_step_claimed 不写 Task WAL, 只作为 runtime event / dispatch_worker 成功 result / debug 信息存在.
  • task_step_started 表示 worker 将 claimed step 推进为 running.
  • task_step_failed 是独立 Task WAL / event 类型, 用于 worker 显式失败、worker 结束兜底失败、取消和超时失败.
  • step terminal 状态使用独立 Task WAL / event 类型:
    • task_step_completed
    • task_step_failed
    • task_step_cancelled
  • task_step_blocked 表示 worker 或 Orchestrator 将 step 标记为 blocked.
  • task_step_updated 只表示非 terminal step 内容、结果或执行状态更新.
  • task_step_reopened 表示 Orchestrator 将 blocked / failed step 恢复为 pending.
  • Replay / Inspect API:
    • 支持 replay_session(session_id, from_seq=None, to_seq=None).
    • 支持 replay_run(run_id).
    • 支持 get_node_path(node_id).
    • Inspect 返回 nodes / events / artifacts 三块, 不把 node 和 event 混成同一种对象.
    • node 是可重建模型上下文的主数据.
    • event 是 timeline/debug 日志.
    • artifacts 是大 payload、raw debug、文件输出等外部引用.
    • replay_session 默认包含 child/sub agent 摘要事件和 result summary, 不展开 sub agent transcript.
    • sub agent 详情通过 inspect_child(child_run_id) 或独立 child run stream 查询.
    • Inspect API 支持基于历史 nodes, tools, system blocks 和 context_built report 重建 provider-neutral ModelRequest 视图.
    • raw provider request/response 只有 debug artifact 存在时才可查看.
    • Replay/Inspect API 默认做 redaction, include_sensitive=False.
    • 过滤范围包括 api key, env secret, hook/tool 输出中的敏感字段, raw provider debug artifact 摘要等.
    • 需要显式 include_sensitive=True 才返回敏感原文.
    • Plan 文件是普通文件; 缺失时按普通 file read missing 处理.
    • Inspect 当前 Task 直接读取内存 Task DAG.
    • Task timeline 从项目级 Task WAL JSONL 查询.
    • 如果 Task WAL replay 失败, inspect 返回 task_wal_unavailable 和错误摘要.
  • Debug dump 后置, 第一版不做重点.
This post is licensed under CC BY 4.0 by the author.