LangGraph Human-in-the-loop 全解
·
LangGraph Human-in-the-loop 全解
一、什么是 Human-in-the-loop(人在回路)?
Human-in-the-loop(简称 HITL)是在自动化工作流的关键节点插入人工干预的机制,让 Agent 在执行到高风险、高不确定性或需要人类决策的步骤时自动暂停,等待人类确认、修改或补充信息后再继续执行。
为什么必须要有 HITL?
在生产级 Agent 系统中,纯自动化执行存在三大致命问题:
- 高风险操作不可控:比如退款、删数据、发邮件、调用付费 API,一旦出错会造成不可逆的损失
- 模型幻觉无法避免:LLM 偶尔会生成错误或不符合要求的内容,需要人类审核兜底
- 信息缺失无法自行解决:Agent 可能缺少只有人类知道的上下文或敏感信息
LangGraph 的 HITL 机制完美解决了这些问题,它基于中断 + 恢复的模式,让你可以在工作流的任意节点插入人工干预,同时保证状态的完整持久化。
二、LangGraph HITL 核心原理与 API
1. 三大核心组件
表格
| 组件 | 作用 | 核心说明 |
|---|---|---|
interrupt() 函数 |
暂停工作流执行 | 在节点内部调用,触发中断,传递需要人类查看的信息 |
Command(resume=...) 对象 |
恢复工作流执行 | 人类输入完成后,用 Command 将输入传递给 Agent,继续执行 |
| Checkpointer(检查点) | 持久化中断状态 | 必须配置,否则无法支持中断和恢复,中断时自动保存完整状态 |
2. 标准执行流程
plaintext
1. Agent正常执行,到达关键节点
2. 节点内部调用 `interrupt(payload)`,触发中断
3. LangGraph自动保存当前完整状态到Checkpointer
4. 工作流暂停,将payload返回给调用方(前端/控制台)
5. 人类查看payload,输入确认/修改/补充信息
6. 调用方用 `Command(resume=人类输入)` 重新调用Agent
7. LangGraph从Checkpoint恢复状态,`interrupt()` 函数返回人类输入
8. 节点继续执行剩余代码,工作流恢复正常运行
3. 关键注意事项
- 必须配置 Checkpointer:中断依赖状态持久化,没有 Checkpointer 无法工作
- 不要用 try/except 包裹 interrupt:interrupt 通过抛出特殊异常实现,包裹会导致中断失效
- payload 必须可序列化:interrupt 的参数必须是能转成 JSON 的类型(字符串、字典、列表等)
- 中断是节点内的暂停:不是停止整个图,而是暂停在节点内部的 interrupt 行,恢复后从该行继续执行
三、实战:给双智能体系统添加人工干预
我们基于你之前的研究员 + 作家双智能体系统,添加两个最关键的人工干预点:
- 研究员完成研究笔记后:等待人类确认是否需要补充资料
- 作家完成初稿后:等待人类审核通过,或提出修改意见
第一步:核心 API 导入
# 必须导入这两个核心API
from langgraph.types import interrupt, Command
第二步:完整可运行代码
from typing import TypedDict, Annotated, Sequence, Literal
from dotenv import load_dotenv
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.messages import BaseMessage, AIMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START, END
from langgraph.graph import add_messages, StateGraph
from langgraph.types import interrupt, Command
from pydantic_settings import BaseSettings, SettingsConfigDict
# 加载环境变量
load_dotenv()
class Settings(BaseSettings):
ZHIPU_API_KEY: str
ZHIPU_BASE_URL: str
LLM_MODEL: str
LLM_BACKUP_MODEL: str
LLM_TIMEOUT: int=360
MAX_TURNS: int=3
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore"
)
settings = Settings()
# 全局状态定义
class MultiAgentState(TypedDict):
task: str
topic: str
research_notes: str
draft: str
final_article: str
current_agent: Literal["researcher", "writer", "human"]
turn_count: int
max_turns: int
messages: Annotated[Sequence[BaseMessage], add_messages]
error: str | None
human_feedback: str # ✅️ 新增: 存储人类反馈
# 初始化LLM
llm = ChatZhipuAI(
api_key=settings.ZHIPU_API_KEY,
base_url=settings.ZHIPU_BASE_URL,
model=settings.LLM_BACKUP_MODEL,
temperature=0.7,
timeout=settings.LLM_TIMEOUT
)
# 智能体1:研究员(带人工确认)
def create_researcher_agent():
researcher_prompt = ChatPromptTemplate.from_messages([
("system", """
你是专业的资深研究员,擅长围绕特定主题收集、整理、提炼准确、全面的资料。
如果有人类提供的反馈,请优先根据反馈补充或修改研究笔记。
"""),
("human", """
写作任务:{task}
写作主题:{topic}
人类反馈:{human_feedback}
请提供详细的结构化研究笔记。"""
)
])
def researcher_node(state: MultiAgentState):
print("\n-- 🔬 研究员开始工作 --")
try:
# 调用LLM生成研究员笔记
response = llm.invoke(researcher_prompt.format(
task=state["task"],
topic=state["topic"],
human_feedback=state.get("human_feedback", "无")
))
research_notes = response.content
print("✅️ 研究员完成研究笔记")
except Exception as e:
print(f"❌️ 研究员工作失败:{str(e)}")
return {
"error": str(e),
"turn_count": state["turn_count"] + 1,
}
# ✅️ 关键:出发人工干预 等待人类确认
print("\n-- 🔄 等待人类确认研究笔记...")
human_approval = interrupt({
"type": "research_approval",
"topic": state["topic"],
"research_notes": research_notes,
"prompt": "请确认研究笔记是否合格?输入'通过'继续,或输入意见让研究院补充:"
})
# 当人类输入后,代码从这里继续执行
print(f"\n 📝收到人类反馈:{human_approval}")
# 如果人类输入“通过”,进入下一步
if human_approval.strip() == "通过":
return {
"research_notes": research_notes,
"current_agent": "writer",
"turn_count": state["turn_count"] + 1,
"human_feedback": "",
"messages": [AIMessage(content=f"研究员已完成研究笔记:\n{research_notes}")]
}
# 否则,研究员根据人类反馈修改研究笔记
else:
print(f'🔄 研究员根据人类反馈修改研究笔记...')
return {
"human_feedback": human_approval,
"turn_count": state["turn_count"] + 1,
"current_agent": "researcher", # 回到研究员节点,重新生成
}
builder = StateGraph(MultiAgentState)
builder.add_node("researcher", researcher_node)
builder.set_entry_point("researcher")
return builder.compile()
# 智能体2: 作家 带人工审核
def create_writer_agent():
writer_prompt = ChatPromptTemplate.from_messages([
("system", """
你是专业的资深作家,擅长基于研究员提供的资料,创作高质量、结构清晰、语言流畅的文章。
如果有人类提供的修改意见,请优先根据意见修改文章。
"""),
("human", """
写作任务:{task}
写作主题:{topic}
研究笔记:{research_notes}
人类修改意见:{human_feedback}
请写一篇完整的文章初稿。""")
])
def writer_node(state: MultiAgentState):
print("\n-- ✍️ 作家开始工作 --")
try:
# 调用LLM生成初稿
response = llm.invoke(writer_prompt.format(
task=state["task"],
topic=state["topic"],
research_notes=state["research_notes"],
human_feedback=state.get("human_feedback", "无")
))
draft = response.content
print("✅️ 作家完成文章初稿")
except Exception as e:
print(f"❌️ 作家工作失败:{str(e)}")
return {
"error": str(e),
"turn_count": state["turn_count"] + 1,
}
# ✅️ 关键:出发人工干预,等待人类审核
print("\n 🔄 等待人类审核文章初稿...")
human_approval = interrupt({
"type": "writer_approval",
"topic": state["topic"],
"draft": draft,
"prompt": "请审核文章初稿:输入'通过' 生成最终文章,或输入修改意见让作家修改:"
})
# 人类输入后继续执行
print(f"\n 📝 收到人类审核意见:{human_approval}")
if human_approval.strip() == "通过":
return {
"draft": draft,
"current_agent": "finalize",
"turn_count": state["turn_count"] + 1,
"human_feedback": "",
"messages": [AIMessage(content=f"作家已完成文章初稿:\n{draft}")]
}
else:
print("🔄 作家根据人类意见修改文章")
return {
"human_feedback": human_approval,
"turn_count": state["turn_count"] + 1,
"current_agent": "writer", # 回到作家起点 重新生成
}
builder = StateGraph(MultiAgentState)
builder.add_node("writer", writer_node)
builder.set_entry_point("writer")
return builder.compile()
# 主图: 智能体调度器
def build_multi_agent_system():
researcher_agent = create_researcher_agent()
writer_agent = create_writer_agent()
builder = StateGraph(MultiAgentState)
# 把子图作为节点加入主图
builder.add_node("researcher", researcher_agent)
builder.add_node("writer", writer_agent)
# 最终汇成总节点
def finalize_node(state: MultiAgentState):
print("\n-- 🎉 任务完成 生成最终文章 --")
return {
"final_article": state["draft"],
"messages": [AIMessage(content=f"最终文章:\n{state['draft']}")],
}
builder.add_node("finalize", finalize_node)
# 条件路由
def router(state: MultiAgentState) -> Literal["researcher", "writer", "finalize", "end"]:
if state.get("error"):
print(f"❌️ 系统出错:{state['error']}, 任务结束")
return "end"
if state.get("turn_count") >= state["max_turns"]:
print("⚠️ 超过最大交互轮次,强制结束任务")
return "finalize"
current_agent = state["current_agent"]
if current_agent == "researcher":
return "researcher"
elif current_agent == "writer":
return "writer"
elif current_agent == "finalize":
return "finalize"
else:
return "finalize"
builder.add_edge(START, "researcher")
builder.add_conditional_edges(
"researcher",
router,
{
"writer": "writer",
"finalize": "finalize",
"end": END
}
)
builder.add_conditional_edges(
"writer",
router,
{
"researcher": "researcher",
"finalize": "finalize",
"end": END
}
)
builder.add_edge("finalize", END)
# ✅️ 必须配置Checkpointer 否则中断无法工作
checkpointer = MemorySaver()
# 生成环境使用RedisSaver, 支持服务重启后恢复
# from langgraph.checkpoint.redis import RedisSaver
# checkpointer = RedisSaver("redis://localhost:6379/0")
return builder.compile(checkpointer=checkpointer)
# 运行与人交互逻辑
def run_with_human_interaction(agent, initial_state, config):
"""带人工交互的运行函数"""
print("===== 🤝 带人工干预的双智能体协作系统 启动 =====")
# 第一次运行 直到遇到第一个中断
result = agent.invoke(initial_state, config)
# 循环任务中, 直到任务完成
while True:
# 检查是否中断
if "__interrupt__" in result:
interrupt_info = result["__interrupt__"][0]
payload = interrupt_info.value
# 打印中断信息和提示
print("\n" + "=" * 80)
print(f"⚠️ 触发人工干预:{payload['type']}")
print(f"主题:{payload['topic']}")
print("-" * 80)
# ✅ 根据类型动态获取内容字段
content_key = "research_notes" if payload["type"] == "research_approval" else "draft"
content = payload.get(content_key, "")
if len(content) > 500:
print(f"内容预览:\n{content[:500]}...")
else:
print(f"内容:\n{content}")
print("-" * 80)
# 获取人类输入
human_input = input(payload["prompt"])
# 用 Command恢复执行
print("\n ▶️ 恢复执行...")
result = agent.invoke(
Command(resume=human_input),
config=config
)
else:
# 没有中断 任务完成
break
return result
# 测试运行
if __name__ == "__main__":
# 初始化多智能体系统
multi_agent = build_multi_agent_system()
# 会话配置: 同一个thread_id支持服务重启恢复中断
config = {
"configurable": {
"thread_id": "multi_agent_hil_session_001",
"user_id": "user_001"
}
}
# 测试写作任务
test_task = "写一篇关于LangGraph多智能体系统的技术文章"
test_topic = "LangGraph多智能体系统的原理、架构与实现"
# 初始化状态
initial_state = {
"task": test_task,
"topic": test_topic,
"research_notes": "",
"draft": "",
"final_article": "",
"current_agent": "researcher",
"turn_count": 0,
"max_turns": settings.MAX_TURNS,
"messages": [HumanMessage(content=test_task)],
"error": None,
"human_feedback": ""
}
# 运行带人工交互的系统
result = run_with_human_interaction(multi_agent, initial_state, config)
# 输出最终结果
print("\n" + "=" * 80)
print("📄 最终完成的文章:")
print("=" * 80)
print(result["final_article"])
print("=" * 80)
print(f"\n✅ 任务完成,共交互 {result['turn_count']} 轮")
四、运行效果演示
plaintext
✅️ 研究员完成研究笔记
-- 🔄 等待人类确认研究笔记...
================================================================================
⚠️ 触发人工干预:research_approval
主题:LangGraph多智能体系统的原理、架构与实现
--------------------------------------------------------------------------------
内容预览:
研究笔记:LangGraph多智能体系统的原理、架构与实现
一、引言
1. 背景介绍
- 多智能体系统(MAS)的定义和发展历程
- LangGraph多智能体系统的应用领域和优势
2. 文章目的
- 深入了解LangGraph多智能体系统的原理、架构与实现
- 为读者提供对LangGraph多智能体系统的全面认识
二、LangGraph多智能体系统的原理
1. 基本原理
- 基于多智能体系统(MAS)的原理
- 智能体的自主性、协作性和适应性
2. 智能体模型
- 智能体的定义和特性
- 智能体的通信、感知和决策机制
3. 系统演化与学习
- 系统的演化过程
- 智能体的学习与适应能力
三、LangGraph多智能体系统的架构
1. 架构概述
- LangGraph多智能体系统的整体架构
- 架构的模块化和层次化设计
2. 模块功能
- 智能体模块:实现智能体的通信、感知和决策
- 网络模块:负责智能体之间的通信和数据传输
- 控制模块:实现系统的整体控制和管理
3. 系统接口
...
--------------------------------------------------------------------------------
请确认研究笔记是否合格?输入'通过'继续,或输入意见让研究院补充:通过
▶️ 恢复执行...
-- 🔬 研究员开始工作 --
✅️ 研究员完成研究笔记
-- 🔄 等待人类确认研究笔记...
📝收到人类反馈:通过
-- ✍️ 作家开始工作 --
✅️ 作家完成文章初稿
🔄 等待人类审核文章初稿...
================================================================================
⚠️ 触发人工干预:writer_approval
主题:LangGraph多智能体系统的原理、架构与实现
--------------------------------------------------------------------------------
内容预览:
LangGraph多智能体系统的原理、架构与实现
一、引言
随着信息技术的快速发展,复杂系统的处理能力逐渐成为衡量一个系统性能的重要指标。LangGraph多智能体系统作为一种新型的分布式系统,利用智能体技术实现复杂任务的高效执行,具有广泛的应用前景。本文旨在详细介绍LangGraph多智能体系统的原理、架构与实现,为相关研究和应用提供参考。
二、LangGraph多智能体系统的原理
2.1 智能体概念
智能体(Agent)是一种具有感知、推理、决策和行动能力的实体。在LangGraph中,智能体负责执行特定任务,并通过与其他智能体交互实现协作。
2.2 智能体通信
智能体之间的通信是LangGraph多智能体系统实现协作的关键。LangGraph采用基于消息传递的通信机制,智能体通过发送和接收消息进行信息交换。
2.3 智能体协作
LangGraph多智能体系统通过智能体之间的协作实现复杂任务的高效执行。智能体协作包括任务分配、任务执行、结果反馈等环节。
三、LangGraph多智能体系统的架构
3.1 模块化设计
LangGraph采用模块化设计,将系统划分...
--------------------------------------------------------------------------------
请审核文章初稿:输入'通过' 生成最终文章,或输入修改意见让作家修改:通过
▶️ 恢复执行...
-- ✍️ 作家开始工作 --
✅️ 作家完成文章初稿
🔄 等待人类审核文章初稿...
📝 收到人类审核意见:通过
-- 🎉 任务完成 生成最终文章 --
================================================================================
📄 最终完成的文章:
================================================================================
LangGraph多智能体系统的原理、架构与实现
一、引言
随着信息技术的飞速发展,分布式系统在各个领域中的应用越来越广泛。LangGraph多智能体系统作为一种基于智能体技术的分布式系统,旨在通过智能体之间的协作与交互,实现复杂任务的高效执行。本文将详细介绍LangGraph多智能体系统的原理、架构与实现。
二、LangGraph多智能体系统的原理
2.1 智能体概念
智能体(Agent)是一种具有感知、推理、决策和行动能力的实体。在LangGraph中,智能体负责执行特定任务,并通过与其他智能体交互实现协作。
2.2 智能体通信
智能体之间的通信是LangGraph多智能体系统实现协作的关键。LangGraph采用基于消息传递的通信机制,智能体通过发送和接收消息进行信息交换。
2.3 智能体协作
LangGraph多智能体系统通过智能体之间的协作实现复杂任务的高效执行。智能体协作包括任务分配、任务执行、结果反馈等环节。
三、LangGraph多智能体系统的架构
3.1 模块化设计
LangGraph采用模块化设计,将系统划分为多个功能模块,包括智能体模块、通信模块、任务管理模块等。
3.2 智能体模块
智能体模块负责智能体的创建、初始化、执行和销毁。该模块包含智能体的感知、推理、决策和行动等功能。
3.3 通信模块
通信模块负责智能体之间的消息传递。该模块实现消息的发送、接收、路由和过滤等功能。
3.4 任务管理模块
任务管理模块负责任务的创建、分配、执行和监控。该模块确保任务在智能体之间的高效分配和执行。
四、LangGraph多智能体系统的实现
4.1 开发环境
LangGraph多智能体系统采用Java语言进行开发,利用Java的并发编程特性实现智能体的并发执行。
4.2 智能体实现
智能体实现采用面向对象编程思想,定义智能体类及其方法,实现智能体的感知、推理、决策和行动等功能。
4.3 通信实现
通信实现采用消息队列机制,利用Java的JMS(Java Message Service)技术实现智能体之间的消息传递。
4.4 任务管理实现
任务管理实现采用任务调度算法,根据智能体的能力和任务需求进行任务分配,并监控任务执行过程。
五、总结
LangGraph多智能体系统通过智能体之间的协作与交互,实现复杂任务的高效执行。本文详细介绍了LangGraph多智能体系统的原理、架构与实现,为相关研究和应用提供了参考。随着未来技术的发展,LangGraph多智能体系统有望在更多领域发挥重要作用。
================================================================================
✅ 任务完成,共交互 2 轮
五、核心要点总结
- 三大核心 API:
interrupt()暂停、Command(resume=...)恢复、Checkpointer持久化 - 中断是节点内的暂停:不是停止整个图,而是暂停在 interrupt 行,恢复后从该行继续
- 必须配置 Checkpointer:没有 Checkpointer,中断和恢复功能完全无法工作
- payload 设计要清晰:给人类的提示要明确,包含足够的上下文信息
- 支持无限期等待:中断状态会永久保存在 Checkpointer 中,服务重启后可以通过同一个
thread_id恢复
六、进阶扩展方向
1. 工具调用前的人工审批
给工具调用节点添加人工干预,在工具执行前让人类确认是否允许调用,避免高风险操作:
def tool_node(state):
tool_call = state["tool_calls"][0]
# 触发人工审批
approval = interrupt({
"type": "tool_approval",
"tool_name": tool_call["name"],
"tool_args": tool_call["args"],
"prompt": "是否允许执行这个工具调用?输入 '允许' 或 '拒绝':"
})
if approval == "允许":
return execute_tool(tool_call)
else:
return {"error": "工具调用被人类拒绝"}
2. 超时自动处理
给中断添加超时机制,如果超过指定时间人类没有输入,自动执行默认操作(比如拒绝工具调用、继续执行等)。
3. 集成 Web 界面
将中断信息通过 API 暴露给前端,前端渲染审核界面,人类在网页上输入反馈后,后端调用Command(resume=...)恢复执行。
4. 人工干预历史记录
将所有人工干预的记录(时间、输入、操作人)存入数据库,方便审计和追溯。
5. 批量人工审核
对于需要处理大量任务的场景,实现批量审核界面,让人类可以一次性审核多个中断任务。
LangGraph 的 HITL 机制是生产级 Agent 系统的必备功能,它让你在自动化和人工监督之间找到了完美的平衡点,既提高了效率,又保证了系统的安全性和可靠性。
更多推荐

所有评论(0)