简单复刻manus, 使用 LangGraph 开发智能体的入门教程
简单复刻manus, 使用 LangGraph 开发智能体的入门教程。
·
简单复刻manus, 使用 LangGraph 开发智能体的入门教程
智能体架构设计
- LangGraph 是一个专为大语言模型应用设计的框架,它通过有向图结构组织智能体的工作流程,使复杂任务分解为可管理的节点和状态转换。开发者可以定义明确的执行路径、条件分支和循环逻辑,同时保持状态的一致性和可追踪性。LangGraph 智能体具备记忆管理、上下文感知、工具调用和错误恢复等核心能力,特别适合构建需要多步推理、持续对话和复杂决策的AI应用。
- 最近manus火上天际,我们使用LangGraph简单开发一个能做任务分解和调用工具执行的简单智能体,整个架构包含以下核心组件:
架构总览
+------------------------------------------+
| 旅行规划智能体 |
+------------------------------------------+
|
+-----------+-----------+
| |
+-------v-------+ +--------v--------+
| 状态管理 | | 工作流管理 |
| (AgentState) | | (StateGraph) |
+---------------+ +-----------------+
| |
| +--------+--------+
| | |
| +------v------+ +------v------+ +------v------+
| | create_plan |-->| collect_ |-->| generate_ |
| | | | information | | summary |
| +-------------+ +-------------+ +-------------+
| |
| |
| v
| +----------------+
| | should_continue|
| | _collecting |
| +----------------+
| | |
| | |
| +----v---+ +---v----+
| |continue| |summarize|
| +--------+ +--------+
| | |
| | |
| +---------+
| |
v v
+---------------+ +-----------------+
| 工具集合 | | LLM 引擎 |
| (Tools) |<------------>| (ChatOpenAI) |
+---------------+ +-----------------+
|
|
+-------v--------+
| search_web |
| get_weather |
| search_ |
| attractions |
| search_hotels |
| search_ |
| restaurants |
| search_ |
| transportation |
+----------------+
- 架构图比较简单, 可以看到,我们使用langgraph开发智能体只需要两个组件,StateGraph管理流程,AgentState管理状态
- AgentState 需要我们自己定义, 可以自由定义我们的任务执行需要哪些字段。
# 定义状态模型
class AgentState(TypedDict):
"""智能体状态模型"""
messages: List[BaseMessage] # 对话历史
tool_results: Dict[str, Any] # 工具执行结果
plan: Optional[Dict[str, Any]] # 任务规划
current_step: Optional[str] # 当前执行步骤
final_answer: Optional[str] # 最终回答
error_count: int # 错误计数
2. 代码讲解
2.1 用户输入
- 我们先定义一个main函数,在初始化initial_state 的时候messages 模拟用户输入。
- create_travel_agent 返回我们经过grpah构建的工作流,传入initial_state 调用。
def main():
"""主函数"""
# 创建智能体
agent = create_travel_agent()
# 初始化状态
initial_state = {
"messages": [
SystemMessage(content=SYSTEM_PROMPT),
HumanMessage(content="我想去北京旅游5天,预算适中,喜欢历史文化和美食,请帮我规划一下行程。")
],
"tool_results": {},
"plan": None,
"current_step": None,
"final_answer": None,
"error_count": 0
}
# 执行智能体
result = agent.invoke(initial_state)
# 打印结果
for message in result["messages"]:
if isinstance(message, HumanMessage):
print(f"\n用户: {message.content}")
elif isinstance(message, AIMessage):
print(f"\n助手: {message.content}")
elif isinstance(message, ToolMessage):
print(f"\n工具 ({message.name}): {message.content[:100]}...")
2.2 状态图管理器, 构建工作流程
- 我们使用了langgraph的StateGraph来管理我们的工作流。 通过create_agent 来构建我们的工作图。
# 创建工作流
def create_travel_agent() -> StateGraph:
"""创建旅行规划智能体工作流"""
# 初始化工作流
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("create_plan", create_plan)
workflow.add_node("collect_information", collect_information)
workflow.add_node("generate_summary", generate_summary)
# 设置入口点
workflow.set_entry_point("create_plan")
# 添加边
workflow.add_edge("create_plan", "collect_information")
# 添加条件边
workflow.add_conditional_edges(
"collect_information",
should_continue_collecting,
{
"continue": "collect_information",
"summarize": "generate_summary"
}
)
# 添加结束边
workflow.add_edge("generate_summary", END)
return workflow.compile()
- StateGraph 创建一个graph, 用于下面的流程编排。
- set_entry_point 这是起始节点
- add_node 添加工作节点,
- add_edge 连接节点直接的边。会把参数的节点1 和节点2 构建一条连接边,执行完节点1后执行节点2
- add_conditional_edges 添加带跳转条件的边, 根据条件决定是否执下一个节点
- 用户请求进来, 线执行 create_plan 拆解任务。 然后让ai去分析执行这个计划需要
收集那些信息, collect_information根据ai返回的工具调用信息, 去执行工具选择信息, collect_information节点根据条件should_continue_collecting 函数来判断是要继续执行,还是采集结束, 去generate_summary 总结报告
节点graph流程
'
+-------------+
| __start__ |
+------+------+
|
v
+------+------+
| create_plan |
+------+------+
|
v
+------+-----------------+
| collect_information |<---------+
+------+-----------------+ |
| |
| |
| | |
| | |
| v |
| [continue] |
| | |
[summarize] +---------------+
|
|
|
v
+------+-----------------+
| generate_summary |
+------+-----------------+
|
v
+------+------+
| __end__ |
+-------------+
'
2.3 任务理解节点,拆分任务
- create_plan 是流程图里面的第一个节点, 负责拆分任务。拆分任务后,返回的是AgentState
- langgraph拿到返回的状态后,会更新state, 然后根据状态,决策判断要执行的下一个节点。
- 通过 AgentState 可以控制grpah的调用流程。
- 代码功能比较简单, 调用了llm, 让llm帮我们拆分任务。和确认每个任务要调用的工具。然后把plan 存储在了我们的AgentState里面。
def create_plan(state: AgentState) -> AgentState:
"""创建旅行计划"""
llm = get_llm()
messages = state["messages"]
# 构建提示
planning_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""你是一位专业的旅行规划专家。请分析用户的旅行需求,并制定一个详细的信息收集计划。
你需要确定:
1. 旅行目的地
2. 旅行时间和持续时间
3. 旅行预算
4. 用户的兴趣爱好和偏好
5. 需要收集的关键信息
请以JSON格式返回你的计划,包括以下字段:
- destination: 目的地
- duration: 旅行持续时间(天数)
- budget_level: 预算级别(经济型/舒适型/豪华型)
- interests: 兴趣爱好列表
- information_needs: 需要收集的信息列表
- tool_calls: 计划使用的工具调用列表,每个调用包含tool_name和parameters
确保你的计划全面且有针对性。"""),
MessagesPlaceholder(variable_name="messages")
])
# 调用LLM创建计划
response = llm.invoke(planning_prompt.format(messages=messages))
# 尝试解析JSON响应
try:
plan_text = response.content
# 提取JSON部分
json_match = re.search(r'```json\n(.*?)\n```', plan_text, re.DOTALL)
if json_match:
plan_json = json_match.group(1)
else:
# 尝试直接解析
plan_json = plan_text
plan = json.loads(plan_json)
except Exception as e:
# 如果解析失败,创建一个基本计划
plan = {
"destination": "未指定",
"duration": 0,
"budget_level": "未指定",
"interests": [],
"information_needs": ["基本旅游信息"],
"tool_calls": [{"tool_name": "search_web", "parameters": {"query": "旅游规划建议"}}]
}
# 更新状态
return {
**state,
"messages": messages + [AIMessage(content=f"我将为您规划一次{plan.get('destination', '旅行')}之旅。正在收集相关信息...")],
"plan": plan,
"current_step": "collect_information"
}
2.4 执行计划 collect_information
- 拿到计划后, 我们让大模型线帮我们分析要执行哪些工具, 和参数
- 然后 我们通过 tool_function.invoke 调用对应的工具,填充结果。
- 工具返回的信息, 我们依然存储到AgentState 中,
- 示例中, 我们简单判断了采集信息的梳理 if len(tool_results) >= 8: 够了就去总结报告
def collect_information(state: AgentState) -> AgentState:
"""收集旅行相关信息"""
llm = get_llm()
messages = state["messages"]
plan = state["plan"]
tool_results = state.get("tool_results", {})
# 检查是否已经收集了足够的信息
if len(tool_results) >= 8:
return {
**state,
"current_step": "generate_summary"
}
# 构建提示
tool_selection_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""你是一位旅行规划助手。基于当前的旅行计划和已收集的信息,确定下一步需要使用的工具。
可用的工具包括:
- search_web: 搜索网络获取旅游相关信息
- get_weather: 获取指定地点的天气信息
- search_attractions: 搜索特定地点的旅游景点
- search_hotels: 搜索特定地点的酒店信息
- search_restaurants: 搜索特定地点的餐厅信息
- search_transportation: 搜索两地之间的交通信息
请选择最合适的工具并提供必要的参数。以JSON格式返回,包含以下字段:
- tool_name: 工具名称
- parameters: 工具参数字典
如果已经收集了足够的信息,请返回 {"tool_name": "none", "parameters": {}}
注意:如果你认为已经收集了足够的信息来生成旅行计划,必须返回tool_name为"none"。"""),
MessagesPlaceholder(variable_name="messages"),
SystemMessage(content="当前旅行计划:{plan}"),
SystemMessage(content="已收集的信息:{tool_results}"),
SystemMessage(content="已收集的信息数量:{info_count}。如果已收集6条或更多信息,请考虑返回none。")
])
# 调用LLM选择工具
response = llm.invoke(
tool_selection_prompt.format(
messages=messages,
plan=json.dumps(plan, ensure_ascii=False),
tool_results=json.dumps(tool_results, ensure_ascii=False),
info_count=len(tool_results)
)
)
# 尝试解析JSON响应
try:
tool_call_text = response.content
# 提取JSON部分
json_match = re.search(r'```json\n(.*?)\n```', tool_call_text, re.DOTALL)
if json_match:
tool_call_json = json_match.group(1)
else:
# 尝试直接解析
tool_call_json = tool_call_text
tool_call = json.loads(tool_call_json)
# 确保tool_call是字典格式
if isinstance(tool_call, list) and len(tool_call) > 0:
tool_call = tool_call[0] # 取第一个元素
except Exception as e:
# 如果解析失败,记录错误并进入总结阶段
error_message = AIMessage(content=f"在解析工具调用时发生错误: {str(e)}。将进入总结阶段。")
return {
**state,
"messages": messages + [error_message],
"current_step": "generate_summary"
}
# 检查是否决定不再使用工具
tool_name = tool_call.get("tool_name", "").lower()
if tool_name == "none" or tool_name == "" or tool_name not in [t.name for t in tools]:
return {
**state,
"messages": messages + [AIMessage(content="已收集足够的信息,正在为您生成旅行计划...")],
"current_step": "generate_summary"
}
# 执行工具调用
parameters = tool_call.get("parameters", {})
tool_function = next((tool for tool in tools if tool.name == tool_name), None)
result = tool_function.invoke(input=parameters,config={"callbacks": [CustomCallbackHandler()]})
# 更新工具结果
updated_tool_results = {
**tool_results,
f"{tool_name}_{datetime.now().strftime('%H%M%S')}": {
"parameters": parameters,
"result": result
}
}
# 添加工具消息
tool_message = ToolMessage(
content=result,
tool_call_id=f"{tool_name}_{datetime.now().strftime('%H%M%S')}",
name=tool_name
)
# 更新状态
return {
**state,
"messages": messages + [tool_message],
"tool_results": updated_tool_results,
"current_step": "collect_information" # 继续收集信息
}
2.5 总结报告 generate_summary
- 总结报告也很简单, 我们把之前的计划, 和采集的信息。 丢给大模型,让大模型去给我们总结,是不是很省事.
- 总结的报告, 我们依然放回 AgentState, 然后返回。
def generate_summary(state: AgentState) -> AgentState:
"""生成旅行计划总结"""
llm = get_llm()
messages = state["messages"]
plan = state["plan"]
tool_results = state.get("tool_results", {})
# 构建提示
summary_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""你是一位专业的旅行规划师。请基于收集到的所有信息,为用户生成一份详细、实用的旅行计划。
你的旅行计划应包括以下内容:
1. 目的地概览:简要介绍目的地的特色和亮点
2. 行程安排:每天的详细行程安排,包括景点、活动、用餐等
3. 住宿推荐:适合用户预算的住宿选择
4. 交通信息:往返交通和当地交通建议
5. 美食推荐:当地特色美食和餐厅推荐
6. 旅行贴士:实用的旅行建议、注意事项和小技巧
7. 预算规划:旅行预算的详细规划
请确保你的计划具体、实用,并且考虑到用户的偏好和需求。使用清晰的标题和分段,使计划易于阅读和理解。"""),
MessagesPlaceholder(variable_name="messages"),
SystemMessage(content="旅行计划:{plan}"),
SystemMessage(content="收集到的信息:{tool_results}")
])
# 调用LLM生成总结
response = llm.invoke(
summary_prompt.format(
messages=messages,
plan=json.dumps(plan, ensure_ascii=False),
tool_results=json.dumps(tool_results, ensure_ascii=False)
)
)
# 生成最终回答
final_answer = response.content
# 更新状态
return {
**state,
"messages": messages + [AIMessage(content=final_answer)],
"final_answer": final_answer,
"current_step": "end"
}
2.6 callback调试
- langgraph 支持callback调试, 让我们可以清晰的知道大模型走到那步了。
- callback 可以监听的地方我一一列出来了。
class CustomCallbackHandler(BaseCallbackHandler):
"""增强版回调处理器,提供详细执行监控和错误处理"""
def __init__(self):
self.execution_stack = []
self.timers = {}
def on_llm_start(self, serialized, prompts, **kwargs):
self._start_timer('llm')
print(f"🕒 [{self._get_timestamp()}] LLM开始生成 | 提示数量: {len(prompts)}")
def on_llm_end(self, response, **kwargs):
duration = self._end_timer('llm')
print(f"✅ [{self._get_timestamp()}] LLM生成完成 | 耗时: {duration:.2f}s | 响应长度: {len(str(response))}")
def on_tool_start(self, serialized, input_str, **kwargs):
self._start_timer('tool')
print(f"🔧 [{self._get_timestamp()}] 工具开始执行 | 工具: {serialized.get('name')} | 输入: {input_str[:200]}")
def on_tool_end(self, output, **kwargs):
duration = self._end_timer('tool')
print(f"✅ [{self._get_timestamp()}] 工具执行完成 | 耗时: {duration:.2f}s | 输出: {str(output)[:200]}")
def on_tool_error(self, error, **kwargs):
print(f"❌ [{self._get_timestamp()}] 工具执行错误 | 错误类型: {type(error).__name__} | 详情: {str(error)[:500]}")
def on_chain_start(self, serialized, inputs, **kwargs):
self._start_timer('chain')
chain_type = serialized.get('name', '未知链')
print(f"⛓️ [{self._get_timestamp()}] 链开始执行 | 类型: {chain_type} | 输入参数: {self._format_inputs(inputs)}")
def on_chain_end(self, outputs, **kwargs):
duration = self._end_timer('chain')
print(f"✅ [{self._get_timestamp()}] 链执行完成 | 耗时: {duration:.2f}s | 输出参数: {self._format_outputs(outputs)}")
def on_agent_action(self, action, **kwargs):
print(f"🤔 [{self._get_timestamp()}] Agent决策 | 选择工具: {action.tool} | 输入: {action.tool_input[:200]}")
def _start_timer(self, event_type):
self.timers[event_type] = time.time()
def _end_timer(self, event_type):
return time.time() - self.timers.pop(event_type, time.time())
def _get_timestamp(self):
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def _format_inputs(self, inputs):
return json.dumps(inputs, indent=2, ensure_ascii=False)[:300]
def _format_outputs(self, outputs):
return json.dumps(outputs, indent=2, ensure_ascii=False)[:300]
3.总结
通过以上步骤,您可以使用 LangGraph 框架快速构建智能体。该框架提供了灵活的状态管理和任务分解能力,使得开发复杂的智能体变得更加简单和高效。希望本教程能帮助您快速上手 LangGraph 的使用。
关注我获取更多信息
git源码:https://github.com/lovelly/aicode.git
更多推荐

所有评论(0)