深入理解 LangGraph Stream Modes:从 Token 流到断点续传

在使用 LangGraph 或 DeepAgents 开发 Agent 应用时,astream 的 stream modes 是一个核心概念。本文将从实际使用场景出发,帮你彻底理解每种模式的作用和区别。

统一输出格式 (v2)

所有 stream mode 使用统一的 StreamPart 格式(需要 version="v2"):

python
{
    "type": "values" | "updates" | "messages" | "custom" | "checkpoints" | "tasks" | "debug",
    "ns": (),           # namespace tuple,subgraph 事件会填充
    "data": ...,        # 实际载荷,类型随 stream_mode 变化
}

类型定义可以从 langgraph.types 导入:

python
from langgraph.types import (
    StreamPart,
    ValuesStreamPart,
    UpdatesStreamPart,
    MessagesStreamPart,
    CustomStreamPart,
    CheckpointStreamPart,
    TasksStreamPart,
    DebugStreamPart,
)

核心三种模式(最常用)

1. messages - LLM Token 流

一句话理解:原始 LLM token 流事件,每个 token 都是一个事件。

text
LLM 调用: "Why did the ice cream go to school? To get..."
            ↓      ↓      ↓      ↓      ↓
          token  token  token  token  token
            ↓      ↓      ↓      ↓      ↓
         event  event  event  event  event  ← 实时发出

事件结构

python
{
    "type": "messages",
    "ns": (),
    "data": (message_chunk, metadata)  # 二元组
}

message_chunk 字段

python
message_chunk.type        # "ai" | "human" | "system" | "tool"
message_chunk.content     # str 或内容块列表
message_chunk.id          # 消息 ID

metadata 字段

python
metadata = {
    "langgraph_node": "write_poem",      # 产生此消息的节点名
    "langgraph_step": 3,                  # 当前步骤编号
    "tags": ["joke", "poem"],             # LLM 调用时设置的标签
    "ls_provider": "openai",              # LLM 提供商
    "ls_model_name": "gpt-4.1-mini",      # 模型名称
}

使用示例

python
async for chunk in graph.astream(inputs, stream_mode="messages", version="v2"):
    if chunk["type"] == "messages":
        msg, metadata = chunk["data"]
        # 按节点过滤
        if metadata["langgraph_node"] == "write_poem":
            print(msg.content, end="")

使用场景:打字机效果、实时显示 LLM 输出


2. updates - 节点 Output

一句话理解:节点执行结束时的 return 值,return 什么就收到什么。

text
Graph 执行流程:

[START] → [node_a] → [node_b] → [node_c] → [END]
              ↓           ↓           ↓
           output      output      output  ← 节点完成后发出

事件结构

python
{
    "type": "updates",
    "ns": (),
    "data": {
        "node_name": {          # 节点名称作为 key
            "messages": [...],  # 该节点 return 的内容
            "result": "..."
        }
    }
}

关键点:返回的是节点 output,不是完整 state:

python
# 假设 state = {"a": 1, "b": 2, "c": 3}

def my_node(state):
    return {"b": 100}  # 只返回变更的部分

# updates 收到:
# {"type": "updates", "data": {"my_node": {"b": 100}}}

# 不是完整 state,只是节点 output

使用示例

python
async for chunk in graph.astream(inputs, stream_mode="updates", version="v2"):
    for node_name, output in chunk["data"].items():
        print(f"节点 {node_name} 完成: {output}")

使用场景:追踪节点执行进度、获取节点处理结果


3. custom - 用户自定义事件

一句话理解:用户主动发送的事件,完全由你控制。

python
from langgraph.config import get_stream_writer

def research_node(state):
    writer = get_stream_writer()
    
    writer({"step": "搜索中...", "progress": 0})   # ← 立即发出
    results = search(state["query"])
    
    writer({"step": "分析中...", "progress": 50})  # ← 立即发出
    analysis = analyze(results)
    
    writer({"step": "完成", "progress": 100})      # ← 立即发出
    return {"research": analysis}

事件结构

python
{
    "type": "custom",
    "ns": (),
    "data": {
        "step": "搜索中...",
        "progress": 0
        # ...任意 key-value
    }
}

使用场景:自定义 UI 状态更新、进度条、步骤提示


三种核心模式对比

messages updates custom
触发时机 LLM 每个 token 节点执行完毕 用户主动调用
颗粒度 最细(token 级) 中等(节点级) 自定义
内容 LLM token + metadata 节点 output 任意 dict
使用场景 打字机效果 追踪节点进度 自定义 UI 状态

进阶模式

4. values - 完整 State 快照

一句话理解:每个节点执行完后,返回完整 state。

python
# 假设 state = {"a": 1, "b": 2}

def node_x(state):
    return {"b": 100}

# updates 收到:{"b": 100}
# values 收到:{"a": 1, "b": 100}  ← 完整 state
updates values
返回内容 节点 output 完整 state
数据量

使用场景:需要随时获取完整状态


5. tasks - 任务生命周期

一句话理解:追踪节点执行状态(开始、成功、失败)。

任务 = 节点执行,每个节点执行一次就是一个任务。

python
{
    "type": "tasks",
    "data": {
        "task_id": "xxx",
        "name": "node_a",
        "status": "running"  # 或 "success" / "error"
    }
}

updates 的区别

updates tasks
关注点 节点 output 了什么 节点执行状态如何
时机 节点执行完 开始 + 结束

使用场景:想知道哪个节点正在执行、捕获执行失败

注意:LangGraph 的 tasks stream mode 和 DeepAgents 的 task 工具是两个不同的概念。前者追踪节点执行状态,后者是 subagent 委托机制。


6. checkpoints - 持久化快照

一句话理解:每次 state 变化后保存的快照,用于断点续传。

前提:需要配置 checkpointer

python
from langgraph.checkpoint.memory import MemorySaver

graph = StateGraph(State).add_node(...).compile(
    checkpointer=MemorySaver()  # 必须配置
)

事件结构

python
{
    "type": "checkpoints",
    "data": {
        "config": {
            "configurable": {
                "thread_id": "conversation_123"
            }
        },
        "checkpoint": {
            "id": "checkpoint_abc",
            "ts": "2024-01-15T10:30:00",
            "channel_values": {
                "messages": [...],
                "query": "什么是量子计算"
            }
        },
        "metadata": {...}
    }
}

使用场景

场景 说明
恢复对话 断开后从上次 checkpoint 继续
时间旅行 回到历史某个状态重新执行
审计追踪 记录 state 每次变化

实际例子

python
# 第一次对话
async for chunk in graph.astream(
    {"messages": [{"role": "user", "content": "研究量子计算"}]},
    config={"configurable": {"thread_id": "chat_001"}},
    stream_mode="checkpoints"
):
    pass

# 后来恢复继续(同一个 thread_id)
async for chunk in graph.astream(
    {"messages": [{"role": "user", "content": "继续"}]},
    config={"configurable": {"thread_id": "chat_001"}},
    stream_mode="checkpoints"
):
    pass  # 从上次的 checkpoint 继续执行

7. debug - 所有信息合集

一句话理解checkpoints + tasks + 额外调试信息,一站式看所有细节。

python
{
    "type": "debug",
    "data": {
        "timestamp": "2024-01-15T10:30:00.123",
        "step": 3,
        "node_name": "research_node",
        "task_id": "task_xyz",
        "state": {...},              # 当前完整 state
        "checkpoint": {...},         # checkpoint 信息
        "writes": {"research": ...}, # 节点 output
        "triggers": [...],           # 触发条件
    }
}

使用场景:开发调试时查看所有内部细节


模式总览

Mode 触发时机 返回内容 使用场景
messages LLM 每个 token token + metadata 打字机效果
updates 节点执行完 节点 output 追踪节点进度
custom 用户主动发 自定义数据 UI 状态更新
values 节点执行完 完整 state 获取完整状态
tasks 任务开始/结束 任务状态 追踪执行生命周期
checkpoints state 保存时 检查点数据 断点续传
debug 全部事件 所有信息 开发调试

实战:组合使用

通常组合前三种模式就够用了:

python
async for chunk in graph.astream(
    inputs,
    stream_mode=["messages", "updates", "custom"],
    version="v2",
):
    if chunk["type"] == "messages":
        # 实时显示 LLM 输出
        msg, _ = chunk["data"]
        print(msg.content, end="")
        
    elif chunk["type"] == "updates":
        # 显示当前执行到哪个节点
        for node, output in chunk["data"].items():
            print(f"\n[节点 {node} 完成]")
            
    elif chunk["type"] == "custom":
        # 显示自定义进度
        if "progress" in chunk["data"]:
            print(f"\n进度: {chunk['data']['progress']}%")

转换为 AGUI 事件

如果你需要将 LangGraph 事件转换为 AGUI 协议格式供前端使用:

python
async def convert_to_agui(chunk: dict) -> list[dict]:
    """将 LangGraph StreamPart 转换为 AGUI 事件"""
    events = []
    
    if chunk["type"] == "messages":
        msg, metadata = chunk["data"]
        if msg.type == "ai":
            events.append({
                "type": "text-message-content",
                "messageId": msg.id,
                "delta": msg.content
            })
            
    elif chunk["type"] == "updates":
        for node_name, data in chunk["data"].items():
            if node_name == "tools" and "messages" in data:
                for tool_msg in data["messages"]:
                    if hasattr(tool_msg, "tool_call_id"):
                        events.append({
                            "type": "tool-call-result",
                            "toolCallId": tool_msg.tool_call_id,
                            "result": tool_msg.content
                        })
                        
    elif chunk["type"] == "custom":
        events.append({
            "type": "custom",
            "data": chunk["data"]
        })
    
    return events

总结

  • 日常开发messages + updates + custom 三件套足够
  • 需要完整状态:加 values
  • 需要断点续传:配置 checkpointer,用 checkpoints
  • 调试问题:用 debug 看所有细节

理解了这 7 种模式,你就能灵活处理 Agent 的各种实时输出场景了。