深入理解 LangGraph Stream Modes:从 Token 流到断点续传
在使用 LangGraph 或 DeepAgents 开发 Agent 应用时,astream 的 stream modes 是一个核心概念。本文将从实际使用场景出发,帮你彻底理解每种模式的作用和区别。
统一输出格式 (v2)
所有 stream mode 使用统一的 StreamPart 格式(需要 version="v2"):
{
"type": "values" | "updates" | "messages" | "custom" | "checkpoints" | "tasks" | "debug",
"ns": (), # namespace tuple,subgraph 事件会填充
"data": ..., # 实际载荷,类型随 stream_mode 变化
}
类型定义可以从 langgraph.types 导入:
from langgraph.types import (
StreamPart,
ValuesStreamPart,
UpdatesStreamPart,
MessagesStreamPart,
CustomStreamPart,
CheckpointStreamPart,
TasksStreamPart,
DebugStreamPart,
)
核心三种模式(最常用)
1. messages - LLM Token 流
一句话理解:原始 LLM token 流事件,每个 token 都是一个事件。
LLM 调用: "Why did the ice cream go to school? To get..."
↓ ↓ ↓ ↓ ↓
token token token token token
↓ ↓ ↓ ↓ ↓
event event event event event ← 实时发出
事件结构:
{
"type": "messages",
"ns": (),
"data": (message_chunk, metadata) # 二元组
}
message_chunk 字段:
message_chunk.type # "ai" | "human" | "system" | "tool"
message_chunk.content # str 或内容块列表
message_chunk.id # 消息 ID
metadata 字段:
metadata = {
"langgraph_node": "write_poem", # 产生此消息的节点名
"langgraph_step": 3, # 当前步骤编号
"tags": ["joke", "poem"], # LLM 调用时设置的标签
"ls_provider": "openai", # LLM 提供商
"ls_model_name": "gpt-4.1-mini", # 模型名称
}
使用示例:
async for chunk in graph.astream(inputs, stream_mode="messages", version="v2"):
if chunk["type"] == "messages":
msg, metadata = chunk["data"]
# 按节点过滤
if metadata["langgraph_node"] == "write_poem":
print(msg.content, end="")
使用场景:打字机效果、实时显示 LLM 输出
2. updates - 节点 Output
一句话理解:节点执行结束时的 return 值,return 什么就收到什么。
Graph 执行流程:
[START] → [node_a] → [node_b] → [node_c] → [END]
↓ ↓ ↓
output output output ← 节点完成后发出
事件结构:
{
"type": "updates",
"ns": (),
"data": {
"node_name": { # 节点名称作为 key
"messages": [...], # 该节点 return 的内容
"result": "..."
}
}
}
关键点:返回的是节点 output,不是完整 state:
# 假设 state = {"a": 1, "b": 2, "c": 3}
def my_node(state):
return {"b": 100} # 只返回变更的部分
# updates 收到:
# {"type": "updates", "data": {"my_node": {"b": 100}}}
# 不是完整 state,只是节点 output
使用示例:
async for chunk in graph.astream(inputs, stream_mode="updates", version="v2"):
for node_name, output in chunk["data"].items():
print(f"节点 {node_name} 完成: {output}")
使用场景:追踪节点执行进度、获取节点处理结果
3. custom - 用户自定义事件
一句话理解:用户主动发送的事件,完全由你控制。
from langgraph.config import get_stream_writer
def research_node(state):
writer = get_stream_writer()
writer({"step": "搜索中...", "progress": 0}) # ← 立即发出
results = search(state["query"])
writer({"step": "分析中...", "progress": 50}) # ← 立即发出
analysis = analyze(results)
writer({"step": "完成", "progress": 100}) # ← 立即发出
return {"research": analysis}
事件结构:
{
"type": "custom",
"ns": (),
"data": {
"step": "搜索中...",
"progress": 0
# ...任意 key-value
}
}
使用场景:自定义 UI 状态更新、进度条、步骤提示
三种核心模式对比
messages |
updates |
custom |
|
|---|---|---|---|
| 触发时机 | LLM 每个 token | 节点执行完毕 | 用户主动调用 |
| 颗粒度 | 最细(token 级) | 中等(节点级) | 自定义 |
| 内容 | LLM token + metadata | 节点 output | 任意 dict |
| 使用场景 | 打字机效果 | 追踪节点进度 | 自定义 UI 状态 |
进阶模式
4. values - 完整 State 快照
一句话理解:每个节点执行完后,返回完整 state。
# 假设 state = {"a": 1, "b": 2}
def node_x(state):
return {"b": 100}
# updates 收到:{"b": 100}
# values 收到:{"a": 1, "b": 100} ← 完整 state
updates |
values |
|
|---|---|---|
| 返回内容 | 节点 output | 完整 state |
| 数据量 | 小 | 大 |
使用场景:需要随时获取完整状态
5. tasks - 任务生命周期
一句话理解:追踪节点执行状态(开始、成功、失败)。
任务 = 节点执行,每个节点执行一次就是一个任务。
{
"type": "tasks",
"data": {
"task_id": "xxx",
"name": "node_a",
"status": "running" # 或 "success" / "error"
}
}
和 updates 的区别:
updates |
tasks |
|
|---|---|---|
| 关注点 | 节点 output 了什么 | 节点执行状态如何 |
| 时机 | 节点执行完 | 开始 + 结束 |
使用场景:想知道哪个节点正在执行、捕获执行失败
注意:LangGraph 的
tasksstream mode 和 DeepAgents 的task工具是两个不同的概念。前者追踪节点执行状态,后者是 subagent 委托机制。
6. checkpoints - 持久化快照
一句话理解:每次 state 变化后保存的快照,用于断点续传。
前提:需要配置 checkpointer
from langgraph.checkpoint.memory import MemorySaver
graph = StateGraph(State).add_node(...).compile(
checkpointer=MemorySaver() # 必须配置
)
事件结构:
{
"type": "checkpoints",
"data": {
"config": {
"configurable": {
"thread_id": "conversation_123"
}
},
"checkpoint": {
"id": "checkpoint_abc",
"ts": "2024-01-15T10:30:00",
"channel_values": {
"messages": [...],
"query": "什么是量子计算"
}
},
"metadata": {...}
}
}
使用场景:
| 场景 | 说明 |
|---|---|
| 恢复对话 | 断开后从上次 checkpoint 继续 |
| 时间旅行 | 回到历史某个状态重新执行 |
| 审计追踪 | 记录 state 每次变化 |
实际例子:
# 第一次对话
async for chunk in graph.astream(
{"messages": [{"role": "user", "content": "研究量子计算"}]},
config={"configurable": {"thread_id": "chat_001"}},
stream_mode="checkpoints"
):
pass
# 后来恢复继续(同一个 thread_id)
async for chunk in graph.astream(
{"messages": [{"role": "user", "content": "继续"}]},
config={"configurable": {"thread_id": "chat_001"}},
stream_mode="checkpoints"
):
pass # 从上次的 checkpoint 继续执行
7. debug - 所有信息合集
一句话理解:checkpoints + tasks + 额外调试信息,一站式看所有细节。
{
"type": "debug",
"data": {
"timestamp": "2024-01-15T10:30:00.123",
"step": 3,
"node_name": "research_node",
"task_id": "task_xyz",
"state": {...}, # 当前完整 state
"checkpoint": {...}, # checkpoint 信息
"writes": {"research": ...}, # 节点 output
"triggers": [...], # 触发条件
}
}
使用场景:开发调试时查看所有内部细节
模式总览
| Mode | 触发时机 | 返回内容 | 使用场景 |
|---|---|---|---|
messages |
LLM 每个 token | token + metadata | 打字机效果 |
updates |
节点执行完 | 节点 output | 追踪节点进度 |
custom |
用户主动发 | 自定义数据 | UI 状态更新 |
values |
节点执行完 | 完整 state | 获取完整状态 |
tasks |
任务开始/结束 | 任务状态 | 追踪执行生命周期 |
checkpoints |
state 保存时 | 检查点数据 | 断点续传 |
debug |
全部事件 | 所有信息 | 开发调试 |
实战:组合使用
通常组合前三种模式就够用了:
async for chunk in graph.astream(
inputs,
stream_mode=["messages", "updates", "custom"],
version="v2",
):
if chunk["type"] == "messages":
# 实时显示 LLM 输出
msg, _ = chunk["data"]
print(msg.content, end="")
elif chunk["type"] == "updates":
# 显示当前执行到哪个节点
for node, output in chunk["data"].items():
print(f"\n[节点 {node} 完成]")
elif chunk["type"] == "custom":
# 显示自定义进度
if "progress" in chunk["data"]:
print(f"\n进度: {chunk['data']['progress']}%")
转换为 AGUI 事件
如果你需要将 LangGraph 事件转换为 AGUI 协议格式供前端使用:
async def convert_to_agui(chunk: dict) -> list[dict]:
"""将 LangGraph StreamPart 转换为 AGUI 事件"""
events = []
if chunk["type"] == "messages":
msg, metadata = chunk["data"]
if msg.type == "ai":
events.append({
"type": "text-message-content",
"messageId": msg.id,
"delta": msg.content
})
elif chunk["type"] == "updates":
for node_name, data in chunk["data"].items():
if node_name == "tools" and "messages" in data:
for tool_msg in data["messages"]:
if hasattr(tool_msg, "tool_call_id"):
events.append({
"type": "tool-call-result",
"toolCallId": tool_msg.tool_call_id,
"result": tool_msg.content
})
elif chunk["type"] == "custom":
events.append({
"type": "custom",
"data": chunk["data"]
})
return events
总结
- 日常开发:
messages+updates+custom三件套足够 - 需要完整状态:加
values - 需要断点续传:配置 checkpointer,用
checkpoints - 调试问题:用
debug看所有细节
理解了这 7 种模式,你就能灵活处理 Agent 的各种实时输出场景了。
评论
欢迎留下反馈,评论发布后会立即显示。