一、引言
2024年以来,以 GPT-4、Claude 为代表的大语言模型(LLM)能力持续跃升,AI Agent 已从概念验证走向生产落地。然而,将 LLM 的能力真正嵌入企业业务系统,远不止调用一个 Chat Completion API 那么简单。一个真正可用的企业级 Agent,需要在任务拆解、工具调用、多轮对话三个核心维度上具备健壮的工程实现。

本文将深入探讨这三者的设计思路,并结合完整的 Python 代码,构建一个可直接参考的企业级 Agent 框架。

二、整体架构设计
企业级 AI Agent 一般采用分层架构,核心分为四层:

┌──────────────────────────────────────────────┐
│ 接入层 (API Gateway) │
│ REST / WebSocket / gRPC │
├──────────────────────────────────────────────┤
│ 编排层 (Orchestrator) │
│ 任务拆解 → 工具路由 → 对话管理 → 结果聚合 │
├──────────────────────────────────────────────┤
│ 能力层 (Capabilities) │
│ LLM推理 │ 工具注册 │ 记忆管理 │ 安全审计 │
├──────────────────────────────────────────────┤
│ 基础设施层 (Infra) │
│ 向量数据库 │ 消息队列 │ 配置中心 │ 监控告警 │
└──────────────────────────────────────────────┘
编排层是整个 Agent 的核心大脑,我们下面围绕它展开。

三、任务拆解:从自然语言到 DAG
3.1 设计思路
企业场景中,用户的一个指令往往隐含多步操作。例如:“帮我分析上周的销售数据,生成报告并发送给张经理”——这至少涉及数据查询、分析计算、报告生成、邮件发送四个子任务。

任务拆解的核心是将模糊指令转化为有向无环图(DAG),其中节点是原子操作,边代表依赖关系。

3.2 核心数据结构
from future import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
import asyncio
import json

class TaskStatus(Enum):
PENDING = “pending”
RUNNING = “running”
COMPLETED = “completed”
FAILED = “failed”
SKIPPED = “skipped”

@dataclass
class SubTask:
“”“子任务定义”“”
task_id: str
name: str
description: str
tool_name: str # 关联的工具名称
arguments: Dict[str, Any] = field(default_factory=dict)
depends_on: List[str] = field(default_factory=list) # 前置任务 ID 列表
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: Optional[str] = None
retry_count: int = 0
max_retries: int = 2

@dataclass
class TaskPlan:
“”“LLM 生成的任务计划”“”
original_query: str
sub_tasks: List[SubTask]
reasoning: str = “”
3.3 基于 LLM 的任务规划器
我们让 LLM 输出结构化的 JSON 计划,而非自由文本:

TASK_PLANNER_PROMPT = “”"你是一个任务规划专家。请将用户的请求拆解为多个可执行的子任务。

要求:

  1. 每个子任务对应一个具体的工具调用
  2. 明确子任务之间的依赖关系
  3. 以严格的 JSON 格式输出

可用工具列表:
{tools_description}

用户请求:{user_query}

请输出如下格式的 JSON(不要包含其他内容):
{{
“reasoning”: “拆解思路…”,
“sub_tasks”: [
{{
“task_id”: “task_1”,
“name”: “子任务名称”,
“description”: “详细描述”,
“tool_name”: “对应工具名”,
“arguments”: {{}},
“depends_on”: []
}}
]
}}
“”"

class TaskPlanner:
“”“基于 LLM 的任务规划器”“”

def __init__(self, llm_client, tool_registry: "ToolRegistry"):
    self.llm = llm_client
    self.tool_registry = tool_registry

def _build_tools_description(self) -> str:
    """构建工具描述文本,供 LLM 理解可用能力"""
    descriptions = []
    for tool in self.tool_registry.list_tools():
        descriptions.append(
            f"- {tool.name}: {tool.description}\n"
            f"  参数: {json.dumps(tool.parameters_schema, ensure_ascii=False)}"
        )
    return "\n".join(descriptions)

async def plan(self, user_query: str) -> TaskPlan:
    """调用 LLM 生成任务计划"""
    prompt = TASK_PLANNER_PROMPT.format(
        tools_description=self._build_tools_description(),
        user_query=user_query,
    )
    response = await self.llm.chat(prompt)
    plan_dict = self._parse_response(response)

    sub_tasks = [
        SubTask(
            task_id=t["task_id"],
            name=t["name"],
            description=t["description"],
            tool_name=t["tool_name"],
            arguments=t.get("arguments", {}),
            depends_on=t.get("depends_on", []),
        )
        for t in plan_dict["sub_tasks"]
    ]
    return TaskPlan(
        original_query=user_query,
        sub_tasks=sub_tasks,
        reasoning=plan_dict.get("reasoning", ""),
    )

def _parse_response(self, response: str) -> dict:
    """从 LLM 响应中提取 JSON(容错处理)"""
    # 尝试直接解析
    try:
        return json.loads(response)
    except json.JSONDecodeError:
        pass
    # 尝试提取 ```json ... ```中的内容
    import re
    match = re.search(r"```(?:json)?\s*([\s\S]*?)```", response)
    if match:
        return json.loads(match.group(1))
    raise ValueError(f"无法解析 LLM 返回的任务计划: {response[:200]}")

3.4 DAG 执行引擎
有了任务计划,需要一个按照依赖关系调度执行的引擎:

class DAGExecutor:
“”“DAG 执行引擎 —— 按拓扑顺序并行执行无依赖的子任务”“”

def __init__(self, tool_registry: "ToolRegistry"):
    self.tool_registry = tool_registry

async def execute(self, plan: TaskPlan) -> Dict[str, Any]:
    """执行整个任务计划"""
    task_map: Dict[str, SubTask] = {t.task_id: t for t in plan.sub_tasks}
    completed: Dict[str, Any] = {}  # task_id → result
    in_flight: Dict[str, asyncio.Task] = {}

    while len(completed) + sum(
        1 for t in plan.sub_tasks if t.status == TaskStatus.FAILED
    ) < len(plan.sub_tasks):

        # 找出所有依赖已满足且尚未执行的任务
        ready_tasks = []
        for task in plan.sub_tasks:
            if task.task_id in completed:
                continue
            if task.status in (TaskStatus.RUNNING, TaskStatus.FAILED):
                continue
            if all(dep in completed for dep in task.depends_on):
                ready_tasks.append(task)

        if not ready_tasks and not in_flight:
            # 没有可执行的任务且没有进行中的任务 → 死锁或全部完成
            break

        # 并行启动所有就绪任务
        for task in ready_tasks:
            task.status = TaskStatus.RUNNING
            coro = self._execute_single_task(task, completed)
            in_flight[task.task_id] = asyncio.create_task(coro)

        # 等待至少一个任务完成
        done, _ = await asyncio.wait(
            in_flight.values(), return_when=asyncio.FIRST_COMPLETED
        )
        for finished in done:
            tid, result = finished.result()
            completed[tid] = result
            del in_flight[tid]

    return completed

async def _execute_single_task(
    self, task: SubTask, context: Dict[str, Any]
) -> tuple:
    """执行单个子任务,支持重试和上下文注入"""
    tool = self.tool_registry.get(task.tool_name)
    if not tool:
        task.status = TaskStatus.FAILED
        task.error = f"工具 {task.tool_name} 未注册"
        return task.task_id, None

    # 将前置任务的结果注入参数(支持模板变量如 {{task_1.result}})
    resolved_args = self._resolve_arguments(task.arguments, context)

    for attempt in range(task.max_retries + 1):
        try:
            result = await tool.execute(**resolved_args)
            task.status = TaskStatus.COMPLETED
            task.result = result
            return task.task_id, result
        except Exception as e:
            task.retry_count = attempt + 1
            if attempt >= task.max_retries:
                task.status = TaskStatus.FAILED
                task.error = str(e)
                return task.task_id, None
            await asyncio.sleep(2 ** attempt)  # 指数退避

    return task.task_id, None

@staticmethod
def _resolve_arguments(
    arguments: Dict[str, Any], context: Dict[str, Any]
) -> Dict[str, Any]:
    """解析参数中的模板变量,注入前置任务结果"""
    resolved = {}
    for key, value in arguments.items():
        if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"):
            ref = value.strip("{} ").strip()
            # 支持 {{task_1.result.field}} 语法
            parts = ref.split(".")
            current = context
            for part in parts:
                if isinstance(current, dict):
                    current = current.get(part)
                else:
                    current = getattr(current, part, None)
            resolved[key] = current
        else:
            resolved[key] = value
    return resolved

四、工具调用:Function Calling 的工程化封装
4.1 工具注册中心
企业场景下工具数量多、类型杂(内部 API、数据库查询、第三方服务等),需要一个统一的注册与发现机制:

@dataclass
class ToolDefinition:
“”“工具定义”“”
name: str
description: str
parameters_schema: Dict[str, Any] # JSON Schema 格式
handler: Callable # 实际执行函数
require_confirmation: bool = False # 是否需要用户确认(敏感操作)
timeout_seconds: int = 30
category: str = “general” # 分类:data / communication / system

class ToolRegistry:
“”“工具注册中心”“”

def __init__(self):
    self._tools: Dict[str, ToolDefinition] = {}

def register(self, tool: ToolDefinition) -> None:
    """注册一个工具"""
    if tool.name in self._tools:
        raise ValueError(f"工具 {tool.name} 已存在")
    self._tools[tool.name] = tool

def register_from_function(
    self,
    func: Callable,
    name: str = None,
    description: str = None,
    parameters_schema: Dict[str, Any] = None,
    **kwargs,
) -> None:
    """从函数自动注册工具(装饰器模式)"""
    tool = ToolDefinition(
        name=name or func.__name__,
        description=description or func.__doc__ or "",
        parameters_schema=parameters_schema or {},
        handler=func,
        **kwargs,
    )
    self.register(tool)

def get(self, name: str) -> Optional[ToolDefinition]:
    return self._tools.get(name)

def list_tools(self) -> List[ToolDefinition]:
    return list(self._tools.values())

def to_openai_tools(self) -> List[Dict[str, Any]]:
    """转换为 OpenAI Function Calling 格式"""
    tools = []
    for t in self._tools.values():
        tools.append({
            "type": "function",
            "function": {
                "name": t.name,
                "description": t.description,
                "parameters": t.parameters_schema,
            },
        })
    return tools

4.2 工具调用执行器
在实际调用 LLM 时,需要处理 Function Calling 的完整循环 — 即 LLM 返回 tool_calls → 执行 → 把结果返回给 LLM → 继续推理:

import json
from typing import AsyncGenerator

class ToolExecutor:
“”“工具调用执行器 —— 处理 ReAct 循环”“”

MAX_TOOL_CALL_ROUNDS = 10  # 防止无限循环

def __init__(self, llm_client, tool_registry: ToolRegistry):
    self.llm = llm_client
    self.registry = tool_registry

async def run_with_tools(
    self,
    messages: List[Dict[str, Any]],
) -> AsyncGenerator[Dict[str, Any], None]:
    """
    执行带工具调用的对话流程,使用流式输出。
    每步以事件形式 yield,方便上层做进度展示。
    """
    round_count = 0

    while round_count < self.MAX_TOOL_CALL_ROUNDS:
        round_count += 1

        # 调用 LLM
        response = await self.llm.chat(
            messages=messages,
            tools=self.registry.to_openai_tools(),
            tool_choice="auto",
        )

        choice = response["choices"][0]
        assistant_msg = choice["message"]

        # 如果 LLM 决定调用工具
        if assistant_msg.get("tool_calls"):
            tool_calls = assistant_msg["tool_calls"]

            yield {
                "type": "tool_calls_start",
                "tool_calls": [
                    {"name": tc["function"]["name"], "arguments": tc["function"]["arguments"]}
                    for tc in tool_calls
                ],
            }

            # 将 assistant 消息加入历史
            messages.append(assistant_msg)

            # 并行执行所有工具调用
            tool_results = await asyncio.gather(
                *[self._execute_tool_call(tc) for tc in tool_calls],
                return_exceptions=True,
            )

            # 将工具结果作为 tool 消息追加到对话
            for tc, result in zip(tool_calls, tool_results):
                if isinstance(result, Exception):
                    content = json.dumps({"error": str(result)}, ensure_ascii=False)
                else:
                    content = json.dumps(result, ensure_ascii=False)

                messages.append({
                    "role": "tool",
                    "tool_call_id": tc["id"],
                    "content": content,
                })

                yield {
                    "type": "tool_result",
                    "tool_name": tc["function"]["name"],
                    "result": content,
                }

        else:
            # LLM 给出最终文本回复
            content = assistant_msg.get("content", "")
            messages.append(assistant_msg)
            yield {"type": "final_answer", "content": content}
            return

    yield {
        "type": "error",
        "content": f"工具调用超过最大轮次限制 ({self.MAX_TOOL_CALL_ROUNDS})",
    }

async def _execute_tool_call(self, tool_call: dict) -> Any:
    """执行单个工具调用,带超时控制"""
    func_name = tool_call["function"]["name"]
    tool = self.registry.get(func_name)

    if not tool:
        return {"error": f"未知工具: {func_name}"}

    try:
        arguments = json.loads(tool_call["function"]["arguments"])
    except json.JSONDecodeError:
        return {"error": f"参数解析失败: {tool_call['function']['arguments']}"}

    try:
        result = await asyncio.wait_for(
            tool.handler(**arguments) if asyncio.iscoroutinefunction(tool.handler)
            else asyncio.to_thread(tool.handler, **arguments),
            timeout=tool.timeout_seconds,
        )
        return result
    except asyncio.TimeoutError:
        return {"error": f"工具 {func_name} 执行超时 ({tool.timeout_seconds}s)"}

五、多轮对话:上下文管理与状态流转
5.1 对话记忆管理
多轮对话的核心挑战在于有限上下文窗口与长期记忆之间的平衡。解决方案是分层记忆管理:

from collections import deque
from datetime import datetime

@dataclass
class ConversationTurn:
“”“单轮对话记录”“”
role: str # “user” | “assistant” | “tool”
content: str
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
metadata: Dict[str, Any] = field(default_factory=dict)

class ConversationMemory:
“”“分层对话记忆管理”“”

def __init__(
    self,
    max_recent_turns: int = 20,      # 近期完整对话轮数
    max_summary_turns: int = 100,    # 摘要覆盖的历史轮数
    token_budget: int = 8000,        # Token 预算
):
    self.recent_turns: deque[ConversationTurn] = deque(maxlen=max_recent_turns)
    self.summary: str = ""           # 早期对话的压缩摘要
    self.max_recent_turns = max_recent_turns
    self.max_summary_turns = max_summary_turns
    self.token_budget = token_budget
    self._total_turns = 0

def add_turn(self, turn: ConversationTurn) -> None:
    """添加一轮对话,超出窗口的自动转入摘要"""
    self._total_turns += 1

    if len(self.recent_turns) >= self.max_recent_turns:
        # 将最早的对话移入摘要
        evicted = self.recent_turns.popleft()
        self.summary = self._update_summary(self.summary, evicted)

    self.recent_turns.append(turn)

def to_messages(self) -> List[Dict[str, str]]:
    """构建发给 LLM 的消息列表"""
    messages = []

    # 1. 系统消息:包含摘要
    system_content = "你是一个智能助手。"
    if self.summary:
        system_content += f"\n\n[历史对话摘要]\n{self.summary}"
    messages.append({"role": "system", "content": system_content})

    # 2. 近期完整对话
    for turn in self.recent_turns:
        messages.append({"role": turn.role, "content": turn.content})

    return messages

def _update_summary(self, current_summary: str, turn: ConversationTurn) -> str:
    """增量更新摘要(简化版:实际项目中应调用 LLM 做摘要)"""
    snippet = f"[{turn.role}]: {turn.content[:100]}..."
    if current_summary:
        return current_summary + "\n" + snippet
    return snippet

def estimate_tokens(self) -> int:
    """估算当前上下文的 Token 数量"""
    total = len(self.summary) // 2  # 粗略估算:中文约2字符/token
    for turn in self.recent_turns:
        total += len(turn.content) // 2
    return total

5.2 对话状态机
企业级 Agent 往往需要维护会话级别的状态,例如:正在等待用户确认、正在执行某个长流程等。使用有限状态机进行管理:

class SessionState(Enum):
IDLE = “idle” # 空闲,等待用户输入
PLANNING = “planning” # 正在拆解任务
EXECUTING = “executing” # 正在执行
AWAITING_CONFIRMATION = “awaiting_confirmation” # 等待用户确认
COMPLETED = “completed” # 当前任务完成

@dataclass
class Session:
“”“会话上下文”“”
session_id: str
state: SessionState = SessionState.IDLE
memory: ConversationMemory = field(default_factory=ConversationMemory)
current_plan: Optional[TaskPlan] = None
pending_confirmation: Optional[str] = None # 待确认的操作描述
created_at: float = field(default_factory=lambda: datetime.now().timestamp())
metadata: Dict[str, Any] = field(default_factory=dict)
5.3 上下文压缩策略
当 Token 预算紧张时,需要主动压缩历史消息:

class ContextCompressor:
“”“上下文压缩器 —— 在 Token 预算不足时自动压缩早期对话”“”

COMPRESSION_PROMPT = """请将以下对话历史压缩为一段简洁的摘要,保留关键信息(决策、数据、结论),丢弃冗余描述。"""

def __init__(self, llm_client, target_ratio: float = 0.5):
    self.llm = llm_client
    self.target_ratio = target_ratio  # 压缩到原长度的比例

async def compress(self, memory: ConversationMemory) -> None:
    """压缩记忆中的早期对话"""
    if memory.estimate_tokens() < memory.token_budget:
        return  # 无需压缩

    # 取最早的 50% 历史进行压缩
    turns_list = list(memory.recent_turns)
    split_point = len(turns_list) // 2

    old_turns = turns_list[:split_point]
    new_turns = turns_list[split_point:]

    # 构建待压缩文本
    to_compress = "\n".join(
        f"[{t.role}]: {t.content}" for t in old_turns
    )

    # 调用 LLM 压缩
    response = await self.llm.chat(
        f"{self.COMPRESSION_PROMPT}\n\n对话历史:\n{to_compress}"
    )
    compressed_summary = response.strip()

    # 更新记忆:摘要 + 近期对话
    memory.summary = (
        f"{memory.summary}\n\n[压缩历史]\n{compressed_summary}"
        if memory.summary
        else compressed_summary
    )
    memory.recent_turns = deque(new_turns, maxlen=memory.max_recent_turns)

六、完整集成:EnterpriseAgent
将上述三大模块组装为一个完整的企业级 Agent:

class EnterpriseAgent:
“”“企业级 AI Agent —— 集成任务拆解、工具调用与多轮对话”“”

def __init__(
    self,
    llm_client,
    tool_registry: ToolRegistry,
    config: Optional[Dict[str, Any]] = None,
):
    self.llm = llm_client
    self.tool_registry = tool_registry
    self.config = config or {}

    # 初始化核心组件
    self.planner = TaskPlanner(llm_client, tool_registry)
    self.executor = DAGExecutor(tool_registry)
    self.tool_executor = ToolExecutor(llm_client, tool_registry)
    self.compressor = ContextCompressor(llm_client)

    # 会话管理
    self._sessions: Dict[str, Session] = {}

async def chat(
    self,
    session_id: str,
    user_message: str,
) -> AsyncGenerator[Dict[str, Any], None]:
    """主入口:处理用户消息,流式返回结果"""

    # 获取或创建会话
    session = self._get_or_create_session(session_id)

    # 添加用户消息到记忆
    session.memory.add_turn(ConversationTurn(
        role="user", content=user_message
    ))

    # Token 预算检查与压缩
    await self.compressor.compress(session.memory)

    yield {"type": "status", "content": "正在分析您的请求..."}

    # 步骤 1:判断是否需要任务拆解
    if await self._requires_planning(user_message):
        session.state = SessionState.PLANNING
        yield {"type": "status", "content": "正在拆解任务..."}

        plan = await self.planner.plan(user_message)
        session.current_plan = plan

        yield {
            "type": "plan",
            "reasoning": plan.reasoning,
            "sub_tasks": [
                {"id": t.task_id, "name": t.name, "description": t.description}
                for t in plan.sub_tasks
            ],
        }

        # 步骤 2:执行任务 DAG
        session.state = SessionState.EXECUTING
        yield {"type": "status", "content": "正在执行任务..."}

        results = await self.executor.execute(plan)

        yield {"type": "task_results", "results": results}

        # 将执行结果注入对话,让 LLM 生成总结
        session.memory.add_turn(ConversationTurn(
            role="assistant",
            content=f"[系统] 任务执行完成,结果: {json.dumps(results, ensure_ascii=False, default=str)}",
            metadata={"type": "task_execution"},
        ))

    # 步骤 3:标准工具调用对话
    session.state = SessionState.EXECUTING

    messages = session.memory.to_messages()
    async for event in self.tool_executor.run_with_tools(messages):
        yield event

        # 将助手回复存入记忆
        if event["type"] == "final_answer":
            session.memory.add_turn(ConversationTurn(
                role="assistant", content=event["content"]
            ))

    session.state = SessionState.IDLE

async def _requires_planning(self, user_message: str) -> bool:
    """判断用户请求是否复杂到需要任务拆解"""
    PLAN_CHECK_PROMPT = f"""判断以下用户请求是否需要拆解为多个子任务(返回 true 或 false):

用户请求:{user_message}

判断标准:

  • 涉及多个独立步骤或操作 → true
  • 需要调用多个不同工具 → true
  • 简单问答或单步操作 → false

只返回 “true” 或 “false”。“”"

    response = await self.llm.chat(PLAN_CHECK_PROMPT)
    return "true" in response.strip().lower()

def _get_or_create_session(self, session_id: str) -> Session:
    """获取或创建会话"""
    if session_id not in self._sessions:
        self._sessions[session_id] = Session(session_id=session_id)
    return self._sessions[session_id]

def close_session(self, session_id: str) -> None:
    """关闭会话,释放资源"""
    self._sessions.pop(session_id, None)

七、使用示例
async def main():
# 初始化 LLM 客户端(此处以 OpenAI 兼容接口为例)
from openai import AsyncOpenAI

llm_client = AsyncOpenAI(api_key="your-api-key", base_url="https://api.openai.com/v1")

# 初始化工具注册中心并注册业务工具
registry = ToolRegistry()

# 注册数据查询工具
registry.register(ToolDefinition(
    name="query_sales_data",
    description="查询销售数据,支持按时间范围、地区筛选",
    parameters_schema={
        "type": "object",
        "properties": {
            "start_date": {"type": "string", "description": "开始日期,格式 YYYY-MM-DD"},
            "end_date": {"type": "string", "description": "结束日期,格式 YYYY-MM-DD"},
            "region": {"type": "string", "description": "地区,可选"},
        },
        "required": ["start_date", "end_date"],
    },
    handler=query_sales_data_impl,
    category="data",
))

# 注册邮件发送工具
registry.register(ToolDefinition(
    name="send_email",
    description="发送邮件给指定收件人",
    parameters_schema={
        "type": "object",
        "properties": {
            "to": {"type": "string", "description": "收件人邮箱"},
            "subject": {"type": "string", "description": "邮件主题"},
            "body": {"type": "string", "description": "邮件正文"},
        },
        "required": ["to", "subject", "body"],
    },
    handler=send_email_impl,
    require_confirmation=True,     # 敏感操作需用户确认
    category="communication",
))

# 创建 Agent
agent = EnterpriseAgent(
    llm_client=llm_client,
    tool_registry=registry,
    config={"max_tool_rounds": 8},
)

# 用户对话
session_id = "user-001-session-20250101"
user_query = "帮我分析上周北京的销售数据,生成总结报告并发送给 zhang@company.com"

async for event in agent.chat(session_id, user_query):
    print(f"[{event['type']}] {event.get('content', '')}")

# 清理
agent.close_session(session_id)

async def query_sales_data_impl(start_date: str, end_date: str, region: str = None):
“”“实际的数据查询实现”“”
# 这里连接真实数据源(数据库 / API)
return {
“total_revenue”: 1285000,
“orders”: 342,
“top_product”: “智能传感器 X3”,
“period”: f"{start_date} ~ {end_date}",
}

async def send_email_impl(to: str, subject: str, body: str):
“”“实际的邮件发送实现”“”
# 调用 SMTP 或邮件服务 API
return {“status”: “sent”, “to”: to, “subject”: subject}

if name == “main”:
asyncio.run(main())
八、生产环境考量
8.1 安全审计
企业级 Agent 的每一次工具调用都应记录审计日志:

import logging
from datetime import datetime, timezone

class AuditLogger:
“”“审计日志记录器”“”

def __init__(self, log_file: str = "agent_audit.log"):
    self.logger = logging.getLogger("agent_audit")
    handler = logging.FileHandler(log_file)
    handler.setFormatter(logging.Formatter(
        '%(asctime)s | %(levelname)s | %(message)s'
    ))
    self.logger.addHandler(handler)
    self.logger.setLevel(logging.INFO)

def log_tool_call(self, session_id: str, tool_name: str,
                  arguments: dict, result: Any, duration_ms: float):
    """记录工具调用"""
    self.logger.info(
        f"session={session_id} | tool={tool_name} | "
        f"args={json.dumps(arguments, ensure_ascii=False)} | "
        f"result={str(result)[:200]} | duration={duration_ms:.0f}ms"
    )

def log_llm_call(self, session_id: str, prompt_tokens: int,
                 completion_tokens: int, duration_ms: float):
    """记录 LLM 调用"""
    self.logger.info(
        f"session={session_id} | llm_call | "
        f"prompt_tokens={prompt_tokens} | completion_tokens={completion_tokens} | "
        f"duration={duration_ms:.0f}ms"
    )

8.2 速率限制与熔断
class RateLimiter:
“”“简单的滑动窗口速率限制器”“”

def __init__(self, max_calls: int, window_seconds: float):
    self.max_calls = max_calls
    self.window = window_seconds
    self._calls: deque[float] = deque()

async def acquire(self) -> bool:
    """尝试获取调用许可"""
    now = datetime.now().timestamp()
    # 清理过期记录
    while self._calls and self._calls[0] < now - self.window:
        self._calls.popleft()
    if len(self._calls) >= self.max_calls:
        return False
    self._calls.append(now)
    return True

8.3 可观测性
建议为 Agent 接入 OpenTelemetry,对每一次 LLM 调用和工具调用进行 Tracing,并通过 Prometheus + Grafana 监控关键指标:各环节耗时分布、工具调用成功率、Token 消耗趋势、DAG 执行时长等。

九、总结
构建企业级 AI Agent 绝非简单的 Prompt Engineering。本文从工程角度出发,围绕任务拆解(DAG 规划与并行执行)、工具调用(Function Calling 的 ReAct 循环)、多轮对话(分层记忆与状态机)三个核心环节。
真正优秀的企业 Agent,是对这三者进行精细化工程治理后的产物。希望本文的架构设计与代码实现,能为在 Agent 落地之路上提供有价值的参考。下一步可以在此基础上扩展:引入 Human-in-the-Loop 审批流、向量化长期记忆、多 Agent 协作编排等高级能力。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐