LangGraph 事件结构详解与 AGUI 转换指南
本文档详细说明 LangGraph/DeepAgents 返回的事件结构,并提供转换为 AGUI 协议的完整方案。
一、消息类型详解
1. AIMessage / AIMessageChunk
AI 的回复消息,AIMessageChunk 是流式传输时的增量块。
完整 AIMessage 结构
python
AIMessage(
id="chatcmpl-abc123", # 消息唯一 ID
content="The weather is sunny", # 文本内容(可能为空)
type="ai", # 消息类型
# 工具调用(完整)
tool_calls=[
{
"name": "get_weather", # 工具名称
"args": {"city": "Boston"}, # 工具参数(已解析)
"id": "call_abc123", # 工具调用 ID
"type": "tool_call" # 固定值
}
],
# 内容块(多模态)
content_blocks=[
{"type": "text", "text": "The weather is sunny"},
{"type": "reasoning", "reasoning": "用户询问天气..."},
],
# 元数据
response_metadata={
"token_usage": {"completion_tokens": 50, ...},
"model_name": "gpt-4.1-mini",
"finish_reason": "tool_calls", # 或 "stop"
},
# Token 使用量
usage_metadata={
"input_tokens": 100,
"output_tokens": 50,
"total_tokens": 150,
}
)
AIMessageChunk 结构(流式)
python
AIMessageChunk(
id="chatcmpl-abc123",
content="sun", # 当前 token
type="ai",
# 工具调用块(增量)
tool_call_chunks=[
{
"name": "get_weather", # 第一个 chunk 有 name
"args": "", # 后续 chunk 增量填充
"id": "call_abc123",
"index": 0, # 工具调用索引(可能有多个)
"type": "tool_call_chunk"
},
# 后续 chunks:
{"name": None, "args": '{"', "id": None, "index": 0, "type": "tool_call_chunk"},
{"name": None, "args": "city", "id": None, "index": 0, "type": "tool_call_chunk"},
{"name": None, "args": '"}', "id": None, "index": 0, "type": "tool_call_chunk"},
],
# 内容块
content_blocks=[
{"type": "text", "text": "sun"},
],
# 位置标记
chunk_position="last", # "first" | "middle" | "last" 或 None
)
工具调用流式过程
text
LLM 生成工具调用的过程:
Chunk 1: {"name": "get_weather", "args": "", "id": "call_abc123"}
Chunk 2: {"name": null, "args": "{\"city\"", "id": null}
Chunk 3: {"name": null, "args": ":\"Bos", "id": null}
Chunk 4: {"name": null, "args": "ton\"}", "id": null}
Chunk 5: chunk_position="last"
→ 合并后得到完整 tool_calls:
[{"name": "get_weather", "args": {"city": "Boston"}, "id": "call_abc123"}]
2. ToolMessage
工具执行结果的返回消息。
python
ToolMessage(
id="toolmsg-xyz", # 消息 ID
content="It's always sunny in Boston!", # 工具返回内容
type="tool", # 固定值
tool_call_id="call_abc123", # 对应的工具调用 ID
name="get_weather", # 工具名称
# 内容块
content_blocks=[
{"type": "text", "text": "It's always sunny in Boston!"}
],
# 如果是 subagent 的 task 工具结果
# content 可能是完整的 subagent 输出
artifact={...}, # 可选的额外数据
)
3. HumanMessage / SystemMessage
python
HumanMessage(
id="msg-xxx",
content="What's the weather?",
type="human",
)
SystemMessage(
id="msg-yyy",
content="You are a helpful assistant.",
type="system",
)
二、事件流模式详解
推荐配置
python
async for chunk in agent.astream(
inputs,
stream_mode=["messages", "updates", "custom"],
subgraphs=True, # 启用 subagent 事件
version="v2",
):
...
1. messages 模式 - Token 流
AI 文本内容流
python
# 事件结构
{
"type": "messages",
"ns": (), # 主 agent 为空,subagent 有值
"data": (
AIMessageChunk(content="The", ...), # message chunk
{ # metadata
"langgraph_node": "model_request",
"langgraph_step": 3,
"ls_provider": "openai",
"ls_model_name": "gpt-4.1-mini",
}
)
}
工具调用参数流
python
{
"type": "messages",
"ns": (),
"data": (
AIMessageChunk(
content="",
tool_call_chunks=[
{"name": "get_weather", "args": "", "id": "call_abc123", "index": 0}
]
),
{"langgraph_node": "model_request", ...}
)
}
# 后续 chunks 增量传递 args
{
"type": "messages",
"data": (
AIMessageChunk(
content="",
tool_call_chunks=[
{"name": None, "args": "{\"city\":", "id": None, "index": 0}
]
),
{...}
)
}
2. updates 模式 - 节点完成事件
AI 消息完成(包含工具调用)
python
{
"type": "updates",
"ns": (), # 主 agent
"data": {
"model_request": { # 节点名
"messages": [
AIMessage(
content="",
tool_calls=[{
"name": "get_weather",
"args": {"city": "Boston"},
"id": "call_abc123",
"type": "tool_call"
}],
response_metadata={"finish_reason": "tool_calls"}
)
]
}
}
}
工具执行结果返回
python
{
"type": "updates",
"ns": (),
"data": {
"tools": { # 节点名
"messages": [
ToolMessage(
content="It's always sunny in Boston!",
tool_call_id="call_abc123",
name="get_weather"
)
]
}
}
}
AI 最终回复
python
{
"type": "updates",
"ns": (),
"data": {
"model_request": {
"messages": [
AIMessage(
content="The weather in Boston is sunny!",
tool_calls=[],
)
]
}
}
}
3. Subagent 事件
Subagent 事件通过 ns (namespace) 字段区分。
Subagent 开始
主 agent 的 model_request 节点返回 task 工具调用:
python
{
"type": "updates",
"ns": (), # 主 agent
"data": {
"model_request": {
"messages": [
AIMessage(
tool_calls=[{
"name": "task", # task 工具
"args": {
"subagent_type": "researcher",
"description": "Research AI safety"
},
"id": "call_xyz789",
"type": "tool_call"
}]
)
]
}
}
}
Subagent 执行中
python
# Subagent 内部事件,ns 包含 "tools:xxx"
{
"type": "updates",
"ns": ("tools:call_xyz789",), # 标识是 subagent
"data": {
"model_request": {...} # subagent 的节点
}
}
# Subagent token 流
{
"type": "messages",
"ns": ("tools:call_xyz789",),
"data": (
AIMessageChunk(content="AI safety is..."),
{"langgraph_node": "model_request"}
)
}
# Subagent 工具调用
{
"type": "updates",
"ns": ("tools:call_xyz789",),
"data": {
"model_request": {
"messages": [
AIMessage(tool_calls=[{
"name": "web_search",
"args": {"query": "AI safety"},
"id": "call_search001"
}])
]
}
}
}
# Subagent 工具结果
{
"type": "updates",
"ns": ("tools:call_xyz789",),
"data": {
"tools": {
"messages": [
ToolMessage(
content="Search results...",
tool_call_id="call_search001",
name="web_search"
)
]
}
}
}
Subagent 结束
Subagent 结果返回给主 agent:
python
{
"type": "updates",
"ns": (), # 回到主 agent
"data": {
"tools": {
"messages": [
ToolMessage(
content="## AI Safety Report\n\n...", # subagent 完整输出
tool_call_id="call_xyz789",
name="task"
)
]
}
}
}
4. custom 模式 - 自定义事件
python
{
"type": "custom",
"ns": (), # 或 ("tools:call_xyz789",) 表示来自 subagent
"data": {
"status": "analyzing",
"progress": 50,
# ... 任意字段
}
}
三、事件时序图
text
用户: "Research AI safety and write a report"
Timeline:
─────────────────────────────────────────────────────────────────────────
[main] messages: AIMessageChunk("I'll research...") # 主 agent 开始回复
[main] updates: model_request → AIMessage(tool_calls=[task]) # 调用 task 工具
↓
┌───────────────────┐
│ SUBAGENT START │
│ (researcher) │
└───────────────────┘
↓
[sub] messages: AIMessageChunk("Searching for...") # subagent token 流
[sub] updates: model_request → AIMessage(tool_calls=[web_search])
[sub] messages: ToolMessage(content="Search results...") # 工具结果 token
[sub] updates: tools → ToolMessage(...) # 工具完成
[sub] messages: AIMessageChunk("Based on my research...") # subagent 回复
↓
┌───────────────────┐
│ SUBAGENT END │
└───────────────────┘
↓
[main] updates: tools → ToolMessage(content="## Report...") # subagent 结果返回
[main] messages: AIMessageChunk("Here's the report...") # 主 agent 继续回复
[main] updates: model_request → AIMessage(content="...") # 主 agent 完成
四、转换为 AGUI 事件
AGUI 事件类型定义
typescript
// AGUI 协议事件类型
type AGUIEvent =
// 文本消息
| { event: "text-message-start"; data: { messageId: string; role: "assistant" | "user" } }
| { event: "text-message-content"; data: { messageId: string; delta: string } }
| { event: "text-message-end"; data: { messageId: string } }
// 工具调用
| { event: "tool-call-start"; data: { toolCallId: string; toolName: string; parentMessageId?: string } }
| { event: "tool-call-args"; data: { toolCallId: string; delta: string } }
| { event: "tool-call-result"; data: { toolCallId: string; result: string } }
// Subagent(自定义事件)
| { event: "subagent-start"; data: { subagentId: string; subagentType: string; description: string } }
| { event: "subagent-end"; data: { subagentId: string; result?: string } }
// 自定义事件
| { event: "custom"; data: Record<string, any> }
// 生命周期
| { event: "run-start"; data: { threadId?: string } }
| { event: "run-end"; data: {} }
完整转换代码
python
import json
from typing import AsyncGenerator, Any
from uuid import uuid4
from langchain.messages import AIMessage, AIMessageChunk, ToolMessage
class LangGraphToAGUIConverter:
"""将 LangGraph 事件流转换为 AGUI 协议事件"""
def __init__(self):
# 状态追踪
self.current_message_id: str | None = None
self.active_tool_calls: dict[str, dict] = {} # tool_call_id -> info
self.active_subagents: dict[str, dict] = {} # tool_call_id -> info
self.message_accumulator: AIMessageChunk | None = None
def _new_message_id(self) -> str:
return str(uuid4())
def _is_subagent(self, ns: tuple) -> bool:
"""判断是否来自 subagent"""
return any(segment.startswith("tools:") for segment in ns)
def _get_subagent_id(self, ns: tuple) -> str | None:
"""从 namespace 提取 subagent ID"""
for segment in ns:
if segment.startswith("tools:"):
return segment.split(":")[1]
return None
async def convert(self, chunk: dict) -> AsyncGenerator[dict, None]:
"""转换单个 LangGraph chunk 为 AGUI 事件列表"""
chunk_type = chunk["type"]
ns = chunk.get("ns", ())
is_subagent = self._is_subagent(ns)
subagent_id = self._get_subagent_id(ns) if is_subagent else None
if chunk_type == "messages":
async for event in self._handle_messages(chunk, is_subagent, subagent_id):
yield event
elif chunk_type == "updates":
async for event in self._handle_updates(chunk, is_subagent, subagent_id):
yield event
elif chunk_type == "custom":
async for event in self._handle_custom(chunk, is_subagent, subagent_id):
yield event
async def _handle_messages(
self, chunk: dict, is_subagent: bool, subagent_id: str | None
) -> AsyncGenerator[dict, None]:
"""处理 messages 模式事件"""
msg, metadata = chunk["data"]
# === AI 文本内容流 ===
if isinstance(msg, AIMessageChunk):
# 1. 文本内容
if msg.content:
# 首次收到内容,发送 message-start
if not self.current_message_id:
self.current_message_id = self._new_message_id()
yield {
"event": "text-message-start",
"data": {
"messageId": self.current_message_id,
"role": "assistant"
}
}
# 发送内容增量
yield {
"event": "text-message-content",
"data": {
"messageId": self.current_message_id,
"delta": msg.content
}
}
# 2. 工具调用参数流
if msg.tool_call_chunks:
for tc_chunk in msg.tool_call_chunks:
tc_id = tc_chunk.get("id")
tc_name = tc_chunk.get("name")
tc_args = tc_chunk.get("args", "")
tc_index = tc_chunk.get("index", 0)
# 新工具调用开始
if tc_name:
tool_call_id = tc_id or f"tc_{uuid4().hex[:8]}"
self.active_tool_calls[tool_call_id] = {
"name": tc_name,
"args": "",
"index": tc_index,
"message_id": self.current_message_id
}
yield {
"event": "tool-call-start",
"data": {
"toolCallId": tool_call_id,
"toolName": tc_name,
"parentMessageId": self.current_message_id
}
}
# 工具参数增量
if tc_args:
# 找到对应的 tool_call_id
for tid, info in self.active_tool_calls.items():
if info["index"] == tc_index:
info["args"] += tc_args
yield {
"event": "tool-call-args",
"data": {
"toolCallId": tid,
"delta": tc_args
}
}
break
# 3. 累积 message chunk
if self.message_accumulator is None:
self.message_accumulator = msg
else:
self.message_accumulator = self.message_accumulator + msg
# 4. 最后一个 chunk,结束消息
if getattr(msg, "chunk_position", None) == "last":
if self.current_message_id and not self.active_tool_calls:
yield {
"event": "text-message-end",
"data": {"messageId": self.current_message_id}
}
self.current_message_id = None
self.message_accumulator = None
async def _handle_updates(
self, chunk: dict, is_subagent: bool, subagent_id: str | None
) -> AsyncGenerator[dict, None]:
"""处理 updates 模式事件"""
for node_name, data in chunk["data"].items():
# === 主 agent 的 model_request 节点 ===
if node_name == "model_request" and not is_subagent:
messages = data.get("messages", [])
for msg in messages:
if isinstance(msg, AIMessage):
# 检测 subagent 启动(task 工具调用)
for tc in msg.tool_calls:
if tc["name"] == "task":
subagent_type = tc["args"].get("subagent_type", "unknown")
description = tc["args"].get("description", "")
self.active_subagents[tc["id"]] = {
"type": subagent_type,
"description": description,
"status": "pending"
}
yield {
"event": "subagent-start",
"data": {
"subagentId": tc["id"],
"subagentType": subagent_type,
"description": description
}
}
# === 主 agent 的 tools 节点 ===
if node_name == "tools" and not is_subagent:
messages = data.get("messages", [])
for msg in messages:
if isinstance(msg, ToolMessage):
# subagent 结束(task 工具结果)
if msg.name == "task":
sub_info = self.active_subagents.get(msg.tool_call_id)
if sub_info:
sub_info["status"] = "complete"
yield {
"event": "subagent-end",
"data": {
"subagentId": msg.tool_call_id,
"result": str(msg.content)[:500] # 截断预览
}
}
# 普通工具结果
else:
yield {
"event": "tool-call-result",
"data": {
"toolCallId": msg.tool_call_id,
"result": str(msg.content)
}
}
# 清理
self.active_tool_calls.pop(msg.tool_call_id, None)
async def _handle_custom(
self, chunk: dict, is_subagent: bool, subagent_id: str | None
) -> AsyncGenerator[dict, None]:
"""处理 custom 模式事件"""
custom_data = chunk["data"].copy()
# 添加来源信息
if is_subagent and subagent_id:
custom_data["subagentId"] = subagent_id
yield {
"event": "custom",
"data": custom_data
}
# === 使用示例 ===
async def stream_as_agui(agent, inputs: dict) -> AsyncGenerator[str, None]:
"""
将 LangGraph agent 流式输出转换为 AGUI SSE 格式
"""
converter = LangGraphToAGUIConverter()
# 发送 run-start
yield f"event: run-start\ndata: {{}}\n\n"
async for chunk in agent.astream(
inputs,
stream_mode=["messages", "updates", "custom"],
subgraphs=True,
version="v2",
):
async for agui_event in converter.convert(chunk):
event_name = agui_event["event"]
event_data = json.dumps(agui_event["data"])
yield f"event: {event_name}\ndata: {event_data}\n\n"
# 发送 run-end
yield f"event: run-end\ndata: {{}}\n\n"
五、前端渲染指南
消息结构
typescript
interface Message {
id: string;
role: "user" | "assistant";
content: string;
toolCalls?: ToolCall[];
subagents?: SubagentResult[];
}
interface ToolCall {
id: string;
name: string;
args: Record<string, any>;
result?: string;
status: "pending" | "running" | "complete";
}
interface SubagentResult {
id: string;
type: string;
description: string;
result?: string;
status: "pending" | "running" | "complete";
toolCalls?: ToolCall[];
}
渲染逻辑
text
Message (AI 回复)
├── content: "Let me research that..."
├── toolCalls:
│ ├── ToolCall: web_search (complete)
│ │ ├── args: {query: "AI safety"}
│ │ └── result: "Search results..."
│ └── ToolCall: task (subagent)
│ └── Subagent: researcher
│ ├── description: "Research AI safety"
│ ├── toolCalls:
│ │ └── ToolCall: web_search (complete)
│ └── result: "## Research Report..."
└── content: "Based on my research..."
六、关键要点总结
事件来源判断
| ns 值 | 来源 |
|---|---|
() |
主 agent |
("tools:call_xxx",) |
subagent,ID 为 call_xxx |
("tools:call_xxx", "model_request:yyy") |
subagent 内部节点 |
事件时机
| 事件类型 | 触发时机 | 用途 |
|---|---|---|
messages + AIMessageChunk.content |
LLM 每个 token | 打字机效果 |
messages + tool_call_chunks |
工具参数增量 | 显示工具调用参数流 |
updates + model_request |
AI 消息完成 | 获取完整 tool_calls |
updates + tools + ToolMessage |
工具执行完成 | 显示工具结果 |
updates + task tool_call |
subagent 启动 | 显示 subagent 开始 |
updates + task ToolMessage |
subagent 结束 | 显示 subagent 结果 |
推荐的 stream_mode 组合
python
stream_mode=["messages", "updates", "custom"]
subgraphs=True
version="v2"
这三者组合可以覆盖所有前端渲染需求。
如果要委派subagent,则首先是调task工具,然后这个工具会返回一个call_id,然后这个id就会作为subagent种的ns的值,用来区分主agent和其他subagent
评论
欢迎留下反馈,评论发布后会立即显示。