为什么需要 LangGraph?

从 Chain 到 Graph:技术演进的必然

在前几篇文章中,我们学习了 LangChain 的 Chain 链式调用。Chain 的设计理念是"线性"的——A → B → C,一条直线走到黑。

但当业务场景变得复杂时,线性结构的三大局限性就暴露出来了:

局限性说明典型问题
分支处理困难无法根据中间结果动态选择路径订单处理中,高金额订单需要审批,低金额自动通过
循环能力缺失无法实现"重试"或"多轮迭代"Agent 查完天气后可能还需要查穿衣建议
状态管理脆弱多轮交互中上下文容易丢失对话中断后无法恢复之前的决策状态

LangGraph 的出现正是为了解决这三个问题

LangGraph 核心定位

LangGraph 本质上是将有向图作为 Agent 工作流的编排引擎。它的核心价值体现在:

维度LangChain (Chain)LangGraph (Graph)
流程结构线性链表有向图
分支支持需嵌套条件判断原生条件边
循环支持不支持支持(agent → tools → agent)
状态管理手动拼接消息列表自动状态归约器 + 持久化检查点
可观测性日志调试节点级可视追踪
适用场景80% 常规任务复杂状态机、多轮决策

💡 选型建议:简单 RAG、单轮问答用 Chain;多轮对话、复杂 Agent 用 Graph。

LangGraph 核心概念

LangGraph 的核心是状态图(StateGraph)——把整个 Agent 的运行抽象成一张图,图上有节点、有边,还有一块在所有节点之间流转的"公共黑板"。

┌──────────────────────────────────────────────────────────────┐
│                    LangGraph 核心架构                         │
│                                                              │
│   ┌─────────┐                      ┌─────────────────┐       │
│   │  START  │ ───────────────────→ │    StateGraph    │      │
│   └─────────┘                      └────────┬────────┘       │
│                                             │                │
│                                  ┌──────────┼──────────┐     │
│                                  ▼          ▼          ▼     │
│                            ┌────────┐  ┌───────┐  ┌───────┐  │
│                            │  Nodes │  │ Edges │  │ State │  │
│                            │ (节点)  │  │ (边)  │  │ (状态) │  │
│                            └────────┘  └───────┘  └───────┘  │
│                                                              │
└──────────────────────────────────────────────────────────────┘

1. 状态 (State) - 公共黑板

State 是 LangGraph 中最核心的概念——它是在所有节点之间共享的"数据容器"。

// 定义状态类型(类似 TypeScript interface)
interface AgentState {
  messages: BaseMessage[];  // 对话历史
  userQuery: string;        // 用户输入
  toolResults: string[];    // 工具调用结果
  currentStep: string;      // 当前执行步骤
  retryCount: number;       // 重试次数
}

2. 节点 (Node) - 执行单元

节点是图中的一个"动作"——每个节点接收当前状态,执行业务逻辑,返回状态更新。

// 节点函数签名
async function myNode(state: AgentState): Promise<Partial<AgentState>> {
  // 1. 读取当前状态
  const { userQuery, messages } = state;
  
  // 2. 执行业务逻辑
  const result = await doSomething(userQuery);
  
  // 3. 返回状态更新(LangGraph 自动合并)
  return { 
    messages: [...messages, new AIMessage(result)],
    currentStep: "completed" 
  };
}

3. 边 (Edge) - 流转规则

边定义了节点之间的流转路径:

  • 普通边:固定跳转,A → B
  • 条件边:根据状态动态选择,A → (B 或 C)
// 普通边
graph.addEdge("node_a", "node_b");

// 条件边
graph.addConditionalEdges(
  "agent",
  (state) => {
    if (state.needsTool) return "tools";
    return END;
  }
);

基础工作流实战

最简单的 LangGraph 图

让我们从最基础的"提问 → 回答"图开始:

import { StateGraph, MessagesAnnotation, START, END } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";
import { HumanMessage, AIMessage } from "@langchain/core/messages";
import dotenv from "dotenv";

dotenv.config();

async function basicGraph() {
  // 1. 初始化模型
  const model = new ChatOpenAI({
    apiKey: process.env.DASHSCOPE_API_KEY,
    configuration: {
      baseURL: process.env.DASHSCOPE_API_URL,
    },
    model: "qwen-plus",
    temperature: 0.7,
  });

  // 2. 定义节点函数
  async function chatNode(state: typeof MessagesAnnotation.State) {
    const response = await model.invoke(state.messages);
    return { messages: [response] };
  }

  // 3. 构建状态图
  const workflow = new StateGraph(MessagesAnnotation)
    .addNode("chat", chatNode)           // 添加节点
    .addEdge(START, "chat")              // 设置入口
    .addEdge("chat", END);               // 设置出口

  // 4. 编译并执行
  const app = workflow.compile();
  
  const result = await app.invoke({
    messages: [new HumanMessage("你好,请介绍一下自己")]
  });
  
  console.log(result.messages[result.messages.length - 1]?.content);
}

basicGraph();

带条件分支的图

接下来实现一个"智能客服路由"——根据用户问题类型,路由到不同处理节点:

import { StateGraph, MessagesAnnotation, START, END } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";
import { HumanMessage, BaseMessage } from "@langchain/core/messages";
import dotenv from "dotenv";

dotenv.config();

type SupportState = {
  messages: BaseMessage[];
  intent?: "technical" | "billing" | "general";
  resolution?: string;
};

async function conditionalGraph() {
  const model = new ChatOpenAI({
    apiKey: process.env.DASHSCOPE_API_KEY,
    configuration: {
      baseURL: process.env.DASHSCOPE_API_URL,
    },
    model: "qwen-plus",
    temperature: 0.3,
  });

  // 意图识别节点
  async function intentClassifier(state: SupportState) {
    const lastMessage = state.messages[state.messages.length - 1];
    const prompt = `分析用户问题的意图,只返回一个词:technical(技术问题)、billing(计费问题)、general(一般问题)。
    
用户问题:${lastMessage.content}`;
    
    const response = await model.invoke([new HumanMessage(prompt)]);
    const intent = response.content.toString().toLowerCase().trim();
    
    return { 
      intent: intent as "technical" | "billing" | "general",
      messages: [...state.messages, response] 
    };
  }

  // 技术问题处理
  async function technicalHandler(state: SupportState) {
    const userQuery = state.messages[0].content;
    const response = await model.invoke([
      new HumanMessage(`用户遇到技术问题,请给出专业的技术支持建议。问题:${userQuery}`)
    ]);
    return { 
      messages: [...state.messages, response],
      resolution: "technical_solved"
    };
  }

  // 计费问题处理
  async function billingHandler(state: SupportState) {
    const userQuery = state.messages[0].content;
    const response = await model.invoke([
      new HumanMessage(`用户遇到计费问题,请引导用户联系财务部门。问题:${userQuery}`)
    ]);
    return { 
      messages: [...state.messages, response],
      resolution: "billing_redirected"
    };
  }

  // 一般问题处理
  async function generalHandler(state: SupportState) {
    const userQuery = state.messages[0].content;
    const response = await model.invoke([
      new HumanMessage(`用户咨询一般问题,请友好回答。问题:${userQuery}`)
    ]);
    return { 
      messages: [...state.messages, response],
      resolution: "general_answered"
    };
  }

  // StateGraph 配置
  const workflow = new StateGraph<SupportState>({
    channels: {
      messages: {
        value: (x: BaseMessage[], y: BaseMessage[]) => [...x, ...y],
        default: () => [],
      },
      intent: { value: (_, b) => b },
      resolution: { value: (_, b) => b },
    },
  })
    .addNode("classify", intentClassifier)
    .addNode("technical", technicalHandler)
    .addNode("billing", billingHandler)
    .addNode("general", generalHandler)
    .addEdge(START, "classify")
    .addConditionalEdges("classify", (state) => {
      if (state.intent === "technical") return "technical";
      if (state.intent === "billing") return "billing";
      return "general";
    })
    .addEdge("technical", END)
    .addEdge("billing", END)
    .addEdge("general", END);

  const app = workflow.compile();
  
  // 测试用例
  const testCases = [
    "我的代码报错了,怎么办?",
    "这个月为什么多扣了钱?",
    "今天天气怎么样?"
  ];

  for (const query of testCases) {
    console.log(`\n📝 用户:${query}`);
    const result = await app.invoke({ messages: [new HumanMessage(query)] });
    console.log(`🤖 意图:${result.intent}`);
    console.log(`🤖 回复:${result.messages[result.messages.length - 1].content}`);
  }
}

conditionalGraph();

实现 ReAct Agent(核心模式)

这是 LangGraph 最经典的应用——实现带循环的 ReAct Agent:

import { StateGraph, MessagesAnnotation, START, END } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { HumanMessage, AIMessage, ToolMessage } from "@langchain/core/messages";
import { ToolNode } from "@langchain/langgraph/prebuilt";
import dotenv from "dotenv";

dotenv.config();

async function reactAgent() {
  // 1. 定义工具
  const calculator = tool(
    async ({ expression }) => {
      const result = Function('"use strict";return (' + expression + ')')();
      return result.toString();
    },
    {
      name: "calculator",
      description: "数学计算",
      schema: z.object({ expression: z.string() }),
    }
  );

  const getWeather = tool(
    async ({ city }) => {
      const weathers: Record<string, string> = {
        "北京": "晴天,25°C",
        "上海": "多云,28°C",
        "深圳": "阵雨,30°C",
      };
      return weathers[city] || `${city}:晴,22°C`;
    },
    {
      name: "get_weather",
      description: "查询天气",
      schema: z.object({ city: z.string() }),
    }
  );

  const tools = [calculator, getWeather];
  
  // 2. 初始化模型并绑定工具
  const model = new ChatOpenAI({
    apiKey: process.env.DASHSCOPE_API_KEY,
    configuration: {
      baseURL: process.env.DASHSCOPE_API_URL,
    },
    model: "qwen-plus",
    temperature: 0.3,
  }).bindTools(tools);

  // 3. 定义节点
  async function agentNode(state: typeof MessagesAnnotation.State) {
    const response = await model.invoke(state.messages);
    return { messages: [response] };
  }

  // 使用预置的 ToolNode
  const toolNode = new ToolNode(tools);

  // 4. 定义条件路由(核心:决定是否继续循环)
  function shouldContinue(state: typeof MessagesAnnotation.State) {
    const lastMessage = state.messages[state.messages.length - 1] as AIMessage;
    
    // 如果模型调用了工具,继续到工具节点
    if (lastMessage.tool_calls && lastMessage.tool_calls.length > 0) {
      return "tools";
    }
    // 否则结束
    return END;
  }

  // 5. 构建工作流(形成循环)
  const workflow = new StateGraph(MessagesAnnotation)
    .addNode("agent", agentNode)
    .addNode("tools", toolNode)
    .addEdge(START, "agent")
    .addConditionalEdges("agent", shouldContinue)
    .addEdge("tools", "agent");  // 关键:工具执行完回到 agent

  const app = workflow.compile();

  // 6. 执行
  const userInput = "北京今天天气怎么样?另外帮我算一下 25 * 4 + 10";
  console.log(`📝 用户:${userInput}\n`);

  const result = await app.invoke({
    messages: [new HumanMessage(userInput)]
  });

  console.log("🤖 最终回复:");
  console.log(result.messages[result.messages.length - 1]?.content);
}

reactAgent();

工作流程图解

START → agent (思考) → should_continue (条件判断)
                              ↓
                    ┌─────────┴─────────┐
                    ↓                   ↓
                有工具调用            无工具调用
                    ↓                   ↓
                 tools (执行)          END
                    ↓
              ┌─────┴─────┐
              ↓           ↓
           回到 agent   继续思考

LangGraph 与传统 Chain 的区别

核心差异对比

维度LangChain ChainLangGraph
结构线性有向无环图有向图(支持循环)
状态管理手动拼接消息列表自动归约器 + 检查点
循环支持❌ 不支持✅ 原生支持(agent→tools→agent)
分支逻辑嵌套 if-else条件边(conditional edges)
持久化需自行实现内置 Checkpointer
可观测性日志输出节点级状态追踪
人机协作困难原生支持 interrupt

为什么说 LangGraph 更适合复杂 Agent?

某电商平台智能推荐系统曾因 Chain 设计导致 30% 的推荐结果因上下文断裂失效,转向 LangGraph 后问题得到解决。

关键原因

  1. 循环能力:Agent 可以反复"思考-行动-观察"直到任务完成
  2. 状态持久化:使用 Checkpointer,对话中断后可恢复
  3. 条件路由:根据中间结果动态决策,而非预设路径

前端应用场景展望

表单流程编排

// 多步骤表单
const formWorkflow = new StateGraph(FormState)
  .addNode("step1", collectPersonalInfo)
  .addNode("step2", collectAddress)
  .addNode("step3", validateForm)
  .addEdge("step1", "step2")
  .addEdge("step2", "step3")
  .addConditionalEdges("step3", (state) => 
    state.isValid ? END : "step1"
  );

AI 对话流程

用户咨询 → 意图识别 → 知识库检索 → 生成回复 → 满意度收集

代码审查流水线

代码提交 → 静态分析 → AI 审查 → 人工审批 → 合并/拒绝

入门常见问题

问题 1:状态类型定义报错

现象:自定义状态时 TypeScript 报错

解决:确保状态字段有正确的归约器(Reducer)定义

// ✅ 正确
const workflow = new StateGraph<MyState>({
  channels: {
    messages: { value: (a, b) => [...a, ...b] },  // 追加模式
    count: { value: (_, b) => b },                 // 覆盖模式
  }
});

问题 2:ToolNode 找不到工具

现象ToolNode 未定义

解决:确保正确导入

import { ToolNode } from "@langchain/langgraph/prebuilt";

问题 3:循环无法终止

现象:Agent 陷入无限循环

解决:添加最大迭代次数限制

// 在条件边中加入计数检查
function shouldContinue(state) {
  if (state.iterationCount > 5) return END;
  // ... 其他逻辑
}

问题 4:状态更新不生效

现象:节点返回的字段没有合并到状态中

解决:节点必须返回一个对象,LangGraph 会自动合并

// ✅ 正确:返回需要更新的字段
return { messages: [response] };

// ❌ 错误:直接修改 state
state.messages.push(response);
return state;

结语

通过这篇教程,我们初步认识了 LangGraph 的核心概念和实践方法。LangGraph 通过"节点-边-状态"的图式结构,为构建复杂 AI Agent 提供了标准化的工程方案。

对于文章中错误的地方或有任何疑问,欢迎在评论区留言讨论!