Post

Pi - Loop

Pi - Loop

Loop

  • loop = outer_loop + inner_loop
  • outer_loop允许在inner_loop执行中, 接收getSteeringMessages(),

Main loop logic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * `agentLoop()` 与 `agentLoopContinue()` 共用的主循环骨架。
 *
 * 可以把它理解成“两层循环”:
 * - 外层循环:agent 原本要停下时,看看 follow-up 队列里有没有“收尾接力”消息;
 * - 内层循环:持续处理 assistant 响应、tool call 批次,以及 turn 之间注入的 steering 消息。
 *
 * 核心不变量:
 * 1. 每次真正请求模型前,`currentContext.messages` 都是一份完整、可见、合法的 transcript;
 * 2. assistant turn 完成后,toolResult 会被补回 transcript,再决定是否下一轮继续;
 * 3. `prepareNextTurn()` 只影响“下一次 provider 请求”,绝不回改已经在飞的请求。
 */
// Outer loop: continues when queued follow-up messages arrive after agent would stop
while (true) {
    let hasMoreToolCalls = true;

    // Inner loop: process tool calls and steering messages
    while (hasMoreToolCalls || pendingMessages.length > 0) {

        // Process pending messages (inject before next assistant response)
        if (pendingMessages.length > 0) {
            for (const message of pendingMessages) {
                await emit({ type: "message_start", message });
                await emit({ type: "message_end", message });
                currentContext.messages.push(message);
                newMessages.push(message);
            }
            pendingMessages = [];
        }

        // ...

        // inner_loop每轮turn结束后检查是否还有pendingMessages
        // executeToolCalls()
        pendingMessages = (await config.getSteeringMessages?.()) || [];
    }

    // Agent would stop here. Check for follow-up messages.
    const followUpMessages = (await config.getFollowUpMessages?.()) || [];
    if (followUpMessages.length > 0) {
        // Set as pending so inner loop processes them
        pendingMessages = followUpMessages;
        continue;
    }

    // No more messages, exit
    break;
}
  • 支持 runAgentLoop & runAgentLoopContinue

Agent

  • 进入loop前先normalizePromptInput ```ts /**
  • Agent 是 low-level loop 之上的有状态包装层。 *
  • 它的职责不是替代 runAgentLoop(),而是把“运行时拥有者”需要承担的状态集中起来:
    • 持有当前 transcript 与工具列表;
    • 维护 activeRun / abortController / streamingMessage / pendingToolCalls;
    • 对外暴露 prompt() / continue() / steer() / followUp() 等 API;
    • 把 low-level loop 的事件归约回本地状态,并转发给订阅者。 *
  • 可以把它理解成:
    • agent-loop.ts 负责“如何跑一轮循环”;
    • agent.ts 负责“谁拥有这次运行,以及如何把运行折叠回可观察状态”。 */ export class Agent {} // normalizePromptInput() 负责把多种输入形态统一折叠成标准 AgentMessage[]。 const messages = this.normalizePromptInput(input, images); await this.runPromptMessages(messages); ```

      最外层交互式loop

  • TUI层循环接收用户输入
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
    while (true) {
      const userInput = await this.getUserInput();
      try {
          // 里面最终会调用runLoop
          await this.session.prompt(userInput);
      } catch (error: unknown) {
          const errorMessage = error instanceof Error ? error.message : "Unknown error occurred";
          this.showError(errorMessage);
      }
    }
    

Execute Tool Calls

  • Sequential or Parallel ```ts /**
  • 执行 assistant message 中携带的 tool calls。 *
  • 这里先做一次“批次级调度决策”:
    • 如果全局配置要求顺序执行,走 sequential;
    • 如果任意单个 tool 声明自己必须顺序执行,也整体降级为 sequential;
    • 否则才走 parallel。 *
  • 之所以按“整批降级”处理,而不是部分并行、部分串行混搭,是为了保持批次语义简单,
  • 避免在一个 assistant turn 里出现难以解释的依赖竞态。 */ async function executeToolCalls( currentContext: AgentContext, assistantMessage: AssistantMessage, config: AgentLoopConfig, signal: AbortSignal | undefined, emit: AgentEventSink, ): Promise { // 这里再次从 assistantMessage 中取 toolCall,保证调度逻辑只依赖最终定稿消息, // 不依赖 streaming 期间的 partial toolCall 片段。 const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall"); const hasSequentialToolCall = toolCalls.some( (tc) => currentContext.tools?.find((t) => t.name === tc.name)?.executionMode === "sequential", ); if (config.toolExecution === "sequential" || hasSequentialToolCall) { return executeToolCallsSequential(currentContext, assistantMessage, toolCalls, config, signal, emit); } return executeToolCallsParallel(currentContext, assistantMessage, toolCalls, config, signal, emit); }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    
    #### executeToolCallsParallel
    ```ts
    async function executeToolCallsParallel(
     currentContext: AgentContext,
     assistantMessage: AssistantMessage,
     toolCalls: AgentToolCall[],
     config: AgentLoopConfig,
     signal: AbortSignal | undefined,
     emit: AgentEventSink,
    ): Promise<ExecutedToolCallBatch> {
     // 并行模式下,prepare 阶段仍按 assistant 原顺序串行进行;
     // 真正并发的是 execute + finalize。
     // 这样做的好处是:
     // 1. 参数准备/校验/beforeHook 仍具确定性;
     // 2. 真正耗时的工具执行可以并发;
     // 3. 结果消息最终仍能按原 toolCall 顺序落回 transcript。
     const finalizedCalls: FinalizedToolCallEntry[] = [];
    
     for (const toolCall of toolCalls) {
    
         const preparation = await prepareToolCall(currentContext, assistantMessage, toolCall, config, signal);
         // immediate 直接执行了
         if (preparation.kind === "immediate") {
             const finalized = {
                 toolCall,
                 result: preparation.result,
                 isError: preparation.isError,
             } satisfies FinalizedToolCallOutcome;
             finalizedCalls.push(finalized);
             if (signal?.aborted) {
                 break;
             }
             continue;
         }
         // ts 语法糖并发
         finalizedCalls.push(async () => {
             const executed = await executePreparedToolCall(preparation, signal, emit);
             const finalized = await finalizeExecutedToolCall(
                 currentContext,
                 assistantMessage,
                 preparation,
                 executed,
                 config,
                 signal,
             );
             await emitToolExecutionEnd(finalized, emit);
             return finalized;
         });
         if (signal?.aborted) {
             break;
         }
     }
    
     const orderedFinalizedCalls = await Promise.all(
         finalizedCalls.map((entry) => (typeof entry === "function" ? entry() : Promise.resolve(entry))),
     );
     // 注意:`Promise.all` 返回结果顺序与输入数组顺序一致,
     // 因此即使并发执行,toolResult message 仍会按 assistant 原始 toolCall 顺序写回 transcript。
     const messages: ToolResultMessage[] = [];
     for (const finalized of orderedFinalizedCalls) {
         const toolResultMessage = createToolResultMessage(finalized);
         await emitToolResultMessage(toolResultMessage, emit);
         messages.push(toolResultMessage);
     }
    
     return {
         messages,
         terminate: shouldTerminateToolBatch(orderedFinalizedCalls),
     };
    }
    
This post is licensed under CC BY 4.0 by the author.