LangGraph 事件结构详解与 AGUI 转换指南

本文档详细说明 LangGraph/DeepAgents 返回的事件结构,并提供转换为 AGUI 协议的完整方案。


一、消息类型详解

1. AIMessage / AIMessageChunk

AI 的回复消息,AIMessageChunk 是流式传输时的增量块。

完整 AIMessage 结构

python
AIMessage(
    id="chatcmpl-abc123",                    # 消息唯一 ID
    content="The weather is sunny",          # 文本内容(可能为空)
    type="ai",                               # 消息类型
    
    # 工具调用(完整)
    tool_calls=[
        {
            "name": "get_weather",           # 工具名称
            "args": {"city": "Boston"},      # 工具参数(已解析)
            "id": "call_abc123",             # 工具调用 ID
            "type": "tool_call"              # 固定值
        }
    ],
    
    # 内容块(多模态)
    content_blocks=[
        {"type": "text", "text": "The weather is sunny"},
        {"type": "reasoning", "reasoning": "用户询问天气..."},
    ],
    
    # 元数据
    response_metadata={
        "token_usage": {"completion_tokens": 50, ...},
        "model_name": "gpt-4.1-mini",
        "finish_reason": "tool_calls",       # 或 "stop"
    },
    
    # Token 使用量
    usage_metadata={
        "input_tokens": 100,
        "output_tokens": 50,
        "total_tokens": 150,
    }
)

AIMessageChunk 结构(流式)

python
AIMessageChunk(
    id="chatcmpl-abc123",
    content="sun",                           # 当前 token
    type="ai",
    
    # 工具调用块(增量)
    tool_call_chunks=[
        {
            "name": "get_weather",           # 第一个 chunk 有 name
            "args": "",                      # 后续 chunk 增量填充
            "id": "call_abc123",
            "index": 0,                      # 工具调用索引(可能有多个)
            "type": "tool_call_chunk"
        },
        # 后续 chunks:
        {"name": None, "args": '{"', "id": None, "index": 0, "type": "tool_call_chunk"},
        {"name": None, "args": "city", "id": None, "index": 0, "type": "tool_call_chunk"},
        {"name": None, "args": '"}', "id": None, "index": 0, "type": "tool_call_chunk"},
    ],
    
    # 内容块
    content_blocks=[
        {"type": "text", "text": "sun"},
    ],
    
    # 位置标记
    chunk_position="last",                   # "first" | "middle" | "last" 或 None
)

工具调用流式过程

text
LLM 生成工具调用的过程:

Chunk 1: {"name": "get_weather", "args": "", "id": "call_abc123"}
Chunk 2: {"name": null, "args": "{\"city\"", "id": null}
Chunk 3: {"name": null, "args": ":\"Bos", "id": null}
Chunk 4: {"name": null, "args": "ton\"}", "id": null}
Chunk 5: chunk_position="last"

→ 合并后得到完整 tool_calls:
  [{"name": "get_weather", "args": {"city": "Boston"}, "id": "call_abc123"}]

2. ToolMessage

工具执行结果的返回消息。

python
ToolMessage(
    id="toolmsg-xyz",                        # 消息 ID
    content="It's always sunny in Boston!",  # 工具返回内容
    type="tool",                             # 固定值
    
    tool_call_id="call_abc123",              # 对应的工具调用 ID
    name="get_weather",                      # 工具名称
    
    # 内容块
    content_blocks=[
        {"type": "text", "text": "It's always sunny in Boston!"}
    ],
    
    # 如果是 subagent 的 task 工具结果
    # content 可能是完整的 subagent 输出
    artifact={...},                          # 可选的额外数据
)

3. HumanMessage / SystemMessage

python
HumanMessage(
    id="msg-xxx",
    content="What's the weather?",
    type="human",
)

SystemMessage(
    id="msg-yyy",
    content="You are a helpful assistant.",
    type="system",
)

二、事件流模式详解

推荐配置

python
async for chunk in agent.astream(
    inputs,
    stream_mode=["messages", "updates", "custom"],
    subgraphs=True,                          # 启用 subagent 事件
    version="v2",
):
    ...

1. messages 模式 - Token 流

AI 文本内容流

python
# 事件结构
{
    "type": "messages",
    "ns": (),                                # 主 agent 为空,subagent 有值
    "data": (
        AIMessageChunk(content="The", ...),  # message chunk
        {                                    # metadata
            "langgraph_node": "model_request",
            "langgraph_step": 3,
            "ls_provider": "openai",
            "ls_model_name": "gpt-4.1-mini",
        }
    )
}

工具调用参数流

python
{
    "type": "messages",
    "ns": (),
    "data": (
        AIMessageChunk(
            content="",
            tool_call_chunks=[
                {"name": "get_weather", "args": "", "id": "call_abc123", "index": 0}
            ]
        ),
        {"langgraph_node": "model_request", ...}
    )
}

# 后续 chunks 增量传递 args
{
    "type": "messages",
    "data": (
        AIMessageChunk(
            content="",
            tool_call_chunks=[
                {"name": None, "args": "{\"city\":", "id": None, "index": 0}
            ]
        ),
        {...}
    )
}

2. updates 模式 - 节点完成事件

AI 消息完成(包含工具调用)

python
{
    "type": "updates",
    "ns": (),                                # 主 agent
    "data": {
        "model_request": {                   # 节点名
            "messages": [
                AIMessage(
                    content="",
                    tool_calls=[{
                        "name": "get_weather",
                        "args": {"city": "Boston"},
                        "id": "call_abc123",
                        "type": "tool_call"
                    }],
                    response_metadata={"finish_reason": "tool_calls"}
                )
            ]
        }
    }
}

工具执行结果返回

python
{
    "type": "updates",
    "ns": (),
    "data": {
        "tools": {                           # 节点名
            "messages": [
                ToolMessage(
                    content="It's always sunny in Boston!",
                    tool_call_id="call_abc123",
                    name="get_weather"
                )
            ]
        }
    }
}

AI 最终回复

python
{
    "type": "updates",
    "ns": (),
    "data": {
        "model_request": {
            "messages": [
                AIMessage(
                    content="The weather in Boston is sunny!",
                    tool_calls=[],
                )
            ]
        }
    }
}

3. Subagent 事件

Subagent 事件通过 ns (namespace) 字段区分。

Subagent 开始

主 agent 的 model_request 节点返回 task 工具调用:

python
{
    "type": "updates",
    "ns": (),                                # 主 agent
    "data": {
        "model_request": {
            "messages": [
                AIMessage(
                    tool_calls=[{
                        "name": "task",      # task 工具
                        "args": {
                            "subagent_type": "researcher",
                            "description": "Research AI safety"
                        },
                        "id": "call_xyz789",
                        "type": "tool_call"
                    }]
                )
            ]
        }
    }
}

Subagent 执行中

python
# Subagent 内部事件,ns 包含 "tools:xxx"
{
    "type": "updates",
    "ns": ("tools:call_xyz789",),            # 标识是 subagent
    "data": {
        "model_request": {...}               # subagent 的节点
    }
}

# Subagent token 流
{
    "type": "messages",
    "ns": ("tools:call_xyz789",),
    "data": (
        AIMessageChunk(content="AI safety is..."),
        {"langgraph_node": "model_request"}
    )
}

# Subagent 工具调用
{
    "type": "updates",
    "ns": ("tools:call_xyz789",),
    "data": {
        "model_request": {
            "messages": [
                AIMessage(tool_calls=[{
                    "name": "web_search",
                    "args": {"query": "AI safety"},
                    "id": "call_search001"
                }])
            ]
        }
    }
}

# Subagent 工具结果
{
    "type": "updates",
    "ns": ("tools:call_xyz789",),
    "data": {
        "tools": {
            "messages": [
                ToolMessage(
                    content="Search results...",
                    tool_call_id="call_search001",
                    name="web_search"
                )
            ]
        }
    }
}

Subagent 结束

Subagent 结果返回给主 agent:

python
{
    "type": "updates",
    "ns": (),                                # 回到主 agent
    "data": {
        "tools": {
            "messages": [
                ToolMessage(
                    content="## AI Safety Report\n\n...",  # subagent 完整输出
                    tool_call_id="call_xyz789",
                    name="task"
                )
            ]
        }
    }
}

4. custom 模式 - 自定义事件

python
{
    "type": "custom",
    "ns": (),  # 或 ("tools:call_xyz789",) 表示来自 subagent
    "data": {
        "status": "analyzing",
        "progress": 50,
        # ... 任意字段
    }
}

三、事件时序图

text
用户: "Research AI safety and write a report"

Timeline:
─────────────────────────────────────────────────────────────────────────

[main] messages: AIMessageChunk("I'll research...")        # 主 agent 开始回复
[main] updates:  model_request → AIMessage(tool_calls=[task])  # 调用 task 工具
                                    ↓
                            ┌───────────────────┐
                            │  SUBAGENT START   │
                            │  (researcher)     │
                            └───────────────────┘
                                    ↓
[sub]  messages: AIMessageChunk("Searching for...")        # subagent token 流
[sub]  updates:  model_request → AIMessage(tool_calls=[web_search])
[sub]  messages: ToolMessage(content="Search results...")  # 工具结果 token
[sub]  updates:  tools → ToolMessage(...)                  # 工具完成
[sub]  messages: AIMessageChunk("Based on my research...") # subagent 回复
                                    ↓
                            ┌───────────────────┐
                            │  SUBAGENT END     │
                            └───────────────────┘
                                    ↓
[main] updates:  tools → ToolMessage(content="## Report...")  # subagent 结果返回
[main] messages: AIMessageChunk("Here's the report...")     # 主 agent 继续回复
[main] updates:  model_request → AIMessage(content="...")   # 主 agent 完成


四、转换为 AGUI 事件

AGUI 事件类型定义

typescript
// AGUI 协议事件类型
type AGUIEvent =
  // 文本消息
  | { event: "text-message-start"; data: { messageId: string; role: "assistant" | "user" } }
  | { event: "text-message-content"; data: { messageId: string; delta: string } }
  | { event: "text-message-end"; data: { messageId: string } }
  
  // 工具调用
  | { event: "tool-call-start"; data: { toolCallId: string; toolName: string; parentMessageId?: string } }
  | { event: "tool-call-args"; data: { toolCallId: string; delta: string } }
  | { event: "tool-call-result"; data: { toolCallId: string; result: string } }
  
  // Subagent(自定义事件)
  | { event: "subagent-start"; data: { subagentId: string; subagentType: string; description: string } }
  | { event: "subagent-end"; data: { subagentId: string; result?: string } }
  
  // 自定义事件
  | { event: "custom"; data: Record<string, any> }
  
  // 生命周期
  | { event: "run-start"; data: { threadId?: string } }
  | { event: "run-end"; data: {} }

完整转换代码

python
import json
from typing import AsyncGenerator, Any
from uuid import uuid4
from langchain.messages import AIMessage, AIMessageChunk, ToolMessage


class LangGraphToAGUIConverter:
    """将 LangGraph 事件流转换为 AGUI 协议事件"""
    
    def __init__(self):
        # 状态追踪
        self.current_message_id: str | None = None
        self.active_tool_calls: dict[str, dict] = {}  # tool_call_id -> info
        self.active_subagents: dict[str, dict] = {}   # tool_call_id -> info
        self.message_accumulator: AIMessageChunk | None = None
    
    def _new_message_id(self) -> str:
        return str(uuid4())
    
    def _is_subagent(self, ns: tuple) -> bool:
        """判断是否来自 subagent"""
        return any(segment.startswith("tools:") for segment in ns)
    
    def _get_subagent_id(self, ns: tuple) -> str | None:
        """从 namespace 提取 subagent ID"""
        for segment in ns:
            if segment.startswith("tools:"):
                return segment.split(":")[1]
        return None
    
    async def convert(self, chunk: dict) -> AsyncGenerator[dict, None]:
        """转换单个 LangGraph chunk 为 AGUI 事件列表"""
        
        chunk_type = chunk["type"]
        ns = chunk.get("ns", ())
        is_subagent = self._is_subagent(ns)
        subagent_id = self._get_subagent_id(ns) if is_subagent else None
        
        if chunk_type == "messages":
            async for event in self._handle_messages(chunk, is_subagent, subagent_id):
                yield event
                
        elif chunk_type == "updates":
            async for event in self._handle_updates(chunk, is_subagent, subagent_id):
                yield event
                
        elif chunk_type == "custom":
            async for event in self._handle_custom(chunk, is_subagent, subagent_id):
                yield event
    
    async def _handle_messages(
        self, chunk: dict, is_subagent: bool, subagent_id: str | None
    ) -> AsyncGenerator[dict, None]:
        """处理 messages 模式事件"""
        
        msg, metadata = chunk["data"]
        
        # === AI 文本内容流 ===
        if isinstance(msg, AIMessageChunk):
            
            # 1. 文本内容
            if msg.content:
                # 首次收到内容,发送 message-start
                if not self.current_message_id:
                    self.current_message_id = self._new_message_id()
                    yield {
                        "event": "text-message-start",
                        "data": {
                            "messageId": self.current_message_id,
                            "role": "assistant"
                        }
                    }
                
                # 发送内容增量
                yield {
                    "event": "text-message-content",
                    "data": {
                        "messageId": self.current_message_id,
                        "delta": msg.content
                    }
                }
            
            # 2. 工具调用参数流
            if msg.tool_call_chunks:
                for tc_chunk in msg.tool_call_chunks:
                    tc_id = tc_chunk.get("id")
                    tc_name = tc_chunk.get("name")
                    tc_args = tc_chunk.get("args", "")
                    tc_index = tc_chunk.get("index", 0)
                    
                    # 新工具调用开始
                    if tc_name:
                        tool_call_id = tc_id or f"tc_{uuid4().hex[:8]}"
                        self.active_tool_calls[tool_call_id] = {
                            "name": tc_name,
                            "args": "",
                            "index": tc_index,
                            "message_id": self.current_message_id
                        }
                        yield {
                            "event": "tool-call-start",
                            "data": {
                                "toolCallId": tool_call_id,
                                "toolName": tc_name,
                                "parentMessageId": self.current_message_id
                            }
                        }
                    
                    # 工具参数增量
                    if tc_args:
                        # 找到对应的 tool_call_id
                        for tid, info in self.active_tool_calls.items():
                            if info["index"] == tc_index:
                                info["args"] += tc_args
                                yield {
                                    "event": "tool-call-args",
                                    "data": {
                                        "toolCallId": tid,
                                        "delta": tc_args
                                    }
                                }
                                break
            
            # 3. 累积 message chunk
            if self.message_accumulator is None:
                self.message_accumulator = msg
            else:
                self.message_accumulator = self.message_accumulator + msg
            
            # 4. 最后一个 chunk,结束消息
            if getattr(msg, "chunk_position", None) == "last":
                if self.current_message_id and not self.active_tool_calls:
                    yield {
                        "event": "text-message-end",
                        "data": {"messageId": self.current_message_id}
                    }
                    self.current_message_id = None
                    self.message_accumulator = None
    
    async def _handle_updates(
        self, chunk: dict, is_subagent: bool, subagent_id: str | None
    ) -> AsyncGenerator[dict, None]:
        """处理 updates 模式事件"""
        
        for node_name, data in chunk["data"].items():
            
            # === 主 agent 的 model_request 节点 ===
            if node_name == "model_request" and not is_subagent:
                messages = data.get("messages", [])
                for msg in messages:
                    if isinstance(msg, AIMessage):
                        # 检测 subagent 启动(task 工具调用)
                        for tc in msg.tool_calls:
                            if tc["name"] == "task":
                                subagent_type = tc["args"].get("subagent_type", "unknown")
                                description = tc["args"].get("description", "")
                                self.active_subagents[tc["id"]] = {
                                    "type": subagent_type,
                                    "description": description,
                                    "status": "pending"
                                }
                                yield {
                                    "event": "subagent-start",
                                    "data": {
                                        "subagentId": tc["id"],
                                        "subagentType": subagent_type,
                                        "description": description
                                    }
                                }
            
            # === 主 agent 的 tools 节点 ===
            if node_name == "tools" and not is_subagent:
                messages = data.get("messages", [])
                for msg in messages:
                    if isinstance(msg, ToolMessage):
                        # subagent 结束(task 工具结果)
                        if msg.name == "task":
                            sub_info = self.active_subagents.get(msg.tool_call_id)
                            if sub_info:
                                sub_info["status"] = "complete"
                                yield {
                                    "event": "subagent-end",
                                    "data": {
                                        "subagentId": msg.tool_call_id,
                                        "result": str(msg.content)[:500]  # 截断预览
                                    }
                                }
                        # 普通工具结果
                        else:
                            yield {
                                "event": "tool-call-result",
                                "data": {
                                    "toolCallId": msg.tool_call_id,
                                    "result": str(msg.content)
                                }
                            }
                            # 清理
                            self.active_tool_calls.pop(msg.tool_call_id, None)
    
    async def _handle_custom(
        self, chunk: dict, is_subagent: bool, subagent_id: str | None
    ) -> AsyncGenerator[dict, None]:
        """处理 custom 模式事件"""
        
        custom_data = chunk["data"].copy()
        
        # 添加来源信息
        if is_subagent and subagent_id:
            custom_data["subagentId"] = subagent_id
        
        yield {
            "event": "custom",
            "data": custom_data
        }


# === 使用示例 ===

async def stream_as_agui(agent, inputs: dict) -> AsyncGenerator[str, None]:
    """
    将 LangGraph agent 流式输出转换为 AGUI SSE 格式
    """
    converter = LangGraphToAGUIConverter()
    
    # 发送 run-start
    yield f"event: run-start\ndata: {{}}\n\n"
    
    async for chunk in agent.astream(
        inputs,
        stream_mode=["messages", "updates", "custom"],
        subgraphs=True,
        version="v2",
    ):
        async for agui_event in converter.convert(chunk):
            event_name = agui_event["event"]
            event_data = json.dumps(agui_event["data"])
            yield f"event: {event_name}\ndata: {event_data}\n\n"
    
    # 发送 run-end
    yield f"event: run-end\ndata: {{}}\n\n"

五、前端渲染指南

消息结构

typescript
interface Message {
  id: string;
  role: "user" | "assistant";
  content: string;
  toolCalls?: ToolCall[];
  subagents?: SubagentResult[];
}

interface ToolCall {
  id: string;
  name: string;
  args: Record<string, any>;
  result?: string;
  status: "pending" | "running" | "complete";
}

interface SubagentResult {
  id: string;
  type: string;
  description: string;
  result?: string;
  status: "pending" | "running" | "complete";
  toolCalls?: ToolCall[];
}

渲染逻辑

text
Message (AI 回复)
├── content: "Let me research that..."
├── toolCalls:
│   ├── ToolCall: web_search (complete)
│   │   ├── args: {query: "AI safety"}
│   │   └── result: "Search results..."
│   └── ToolCall: task (subagent)
│       └── Subagent: researcher
│           ├── description: "Research AI safety"
│           ├── toolCalls:
│           │   └── ToolCall: web_search (complete)
│           └── result: "## Research Report..."
└── content: "Based on my research..."

六、关键要点总结

事件来源判断

ns 值 来源
() 主 agent
("tools:call_xxx",) subagent,ID 为 call_xxx
("tools:call_xxx", "model_request:yyy") subagent 内部节点

事件时机

事件类型 触发时机 用途
messages + AIMessageChunk.content LLM 每个 token 打字机效果
messages + tool_call_chunks 工具参数增量 显示工具调用参数流
updates + model_request AI 消息完成 获取完整 tool_calls
updates + tools + ToolMessage 工具执行完成 显示工具结果
updates + task tool_call subagent 启动 显示 subagent 开始
updates + task ToolMessage subagent 结束 显示 subagent 结果

推荐的 stream_mode 组合

python
stream_mode=["messages", "updates", "custom"]
subgraphs=True
version="v2"

这三者组合可以覆盖所有前端渲染需求。

如果要委派subagent,则首先是调task工具,然后这个工具会返回一个call_id,然后这个id就会作为subagent种的ns的值,用来区分主agent和其他subagent