Pi - Loop
Pi - Loop
Loop
- loop = outer_loop + inner_loop
- outer_loop允许在inner_loop执行中, 接收getSteeringMessages(),
Main loop logic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* `agentLoop()` 与 `agentLoopContinue()` 共用的主循环骨架。
*
* 可以把它理解成“两层循环”:
* - 外层循环:agent 原本要停下时,看看 follow-up 队列里有没有“收尾接力”消息;
* - 内层循环:持续处理 assistant 响应、tool call 批次,以及 turn 之间注入的 steering 消息。
*
* 核心不变量:
* 1. 每次真正请求模型前,`currentContext.messages` 都是一份完整、可见、合法的 transcript;
* 2. assistant turn 完成后,toolResult 会被补回 transcript,再决定是否下一轮继续;
* 3. `prepareNextTurn()` 只影响“下一次 provider 请求”,绝不回改已经在飞的请求。
*/
// Outer loop: continues when queued follow-up messages arrive after agent would stop
while (true) {
let hasMoreToolCalls = true;
// Inner loop: process tool calls and steering messages
while (hasMoreToolCalls || pendingMessages.length > 0) {
// Process pending messages (inject before next assistant response)
if (pendingMessages.length > 0) {
for (const message of pendingMessages) {
await emit({ type: "message_start", message });
await emit({ type: "message_end", message });
currentContext.messages.push(message);
newMessages.push(message);
}
pendingMessages = [];
}
// ...
// inner_loop每轮turn结束后检查是否还有pendingMessages
// executeToolCalls()
pendingMessages = (await config.getSteeringMessages?.()) || [];
}
// Agent would stop here. Check for follow-up messages.
const followUpMessages = (await config.getFollowUpMessages?.()) || [];
if (followUpMessages.length > 0) {
// Set as pending so inner loop processes them
pendingMessages = followUpMessages;
continue;
}
// No more messages, exit
break;
}
- 支持 runAgentLoop & runAgentLoopContinue
Agent
- 进入loop前先normalizePromptInput ```ts /**
Agent是 low-level loop 之上的有状态包装层。 *- 它的职责不是替代
runAgentLoop(),而是把“运行时拥有者”需要承担的状态集中起来: - 持有当前 transcript 与工具列表;
- 维护 activeRun / abortController / streamingMessage / pendingToolCalls;
- 对外暴露
prompt()/continue()/steer()/followUp()等 API;
- 对外暴露
- 把 low-level loop 的事件归约回本地状态,并转发给订阅者。 *
- 可以把它理解成:
agent-loop.ts负责“如何跑一轮循环”;
- TUI层循环接收用户输入
1 2 3 4 5 6 7 8 9 10
while (true) { const userInput = await this.getUserInput(); try { // 里面最终会调用runLoop await this.session.prompt(userInput); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : "Unknown error occurred"; this.showError(errorMessage); } }
Execute Tool Calls
- Sequential or Parallel ```ts /**
- 执行 assistant message 中携带的 tool calls。 *
- 这里先做一次“批次级调度决策”:
- 如果全局配置要求顺序执行,走 sequential;
- 如果任意单个 tool 声明自己必须顺序执行,也整体降级为 sequential;
- 否则才走 parallel。 *
- 之所以按“整批降级”处理,而不是部分并行、部分串行混搭,是为了保持批次语义简单,
- 避免在一个 assistant turn 里出现难以解释的依赖竞态。 */ async function executeToolCalls( currentContext: AgentContext, assistantMessage: AssistantMessage, config: AgentLoopConfig, signal: AbortSignal | undefined, emit: AgentEventSink, ): Promise
{ // 这里再次从 assistantMessage 中取 toolCall,保证调度逻辑只依赖最终定稿消息, // 不依赖 streaming 期间的 partial toolCall 片段。 const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall"); const hasSequentialToolCall = toolCalls.some( (tc) => currentContext.tools?.find((t) => t.name === tc.name)?.executionMode === "sequential", ); if (config.toolExecution === "sequential" || hasSequentialToolCall) { return executeToolCallsSequential(currentContext, assistantMessage, toolCalls, config, signal, emit); } return executeToolCallsParallel(currentContext, assistantMessage, toolCalls, config, signal, emit); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
#### executeToolCallsParallel ```ts async function executeToolCallsParallel( currentContext: AgentContext, assistantMessage: AssistantMessage, toolCalls: AgentToolCall[], config: AgentLoopConfig, signal: AbortSignal | undefined, emit: AgentEventSink, ): Promise<ExecutedToolCallBatch> { // 并行模式下,prepare 阶段仍按 assistant 原顺序串行进行; // 真正并发的是 execute + finalize。 // 这样做的好处是: // 1. 参数准备/校验/beforeHook 仍具确定性; // 2. 真正耗时的工具执行可以并发; // 3. 结果消息最终仍能按原 toolCall 顺序落回 transcript。 const finalizedCalls: FinalizedToolCallEntry[] = []; for (const toolCall of toolCalls) { const preparation = await prepareToolCall(currentContext, assistantMessage, toolCall, config, signal); // immediate 直接执行了 if (preparation.kind === "immediate") { const finalized = { toolCall, result: preparation.result, isError: preparation.isError, } satisfies FinalizedToolCallOutcome; finalizedCalls.push(finalized); if (signal?.aborted) { break; } continue; } // ts 语法糖并发 finalizedCalls.push(async () => { const executed = await executePreparedToolCall(preparation, signal, emit); const finalized = await finalizeExecutedToolCall( currentContext, assistantMessage, preparation, executed, config, signal, ); await emitToolExecutionEnd(finalized, emit); return finalized; }); if (signal?.aborted) { break; } } const orderedFinalizedCalls = await Promise.all( finalizedCalls.map((entry) => (typeof entry === "function" ? entry() : Promise.resolve(entry))), ); // 注意:`Promise.all` 返回结果顺序与输入数组顺序一致, // 因此即使并发执行,toolResult message 仍会按 assistant 原始 toolCall 顺序写回 transcript。 const messages: ToolResultMessage[] = []; for (const finalized of orderedFinalizedCalls) { const toolResultMessage = createToolResultMessage(finalized); await emitToolResultMessage(toolResultMessage, emit); messages.push(toolResultMessage); } return { messages, terminate: shouldTerminateToolBatch(orderedFinalizedCalls), }; }
This post is licensed under CC BY 4.0 by the author.