本系列博文在掘金同步发布, 更多优质文章,请关注本人掘金账号:
人肉推土机的掘金账号

AutoGen系列一:基础介绍与入门教程

AutoGen系列二:深入自定义智能体

AutoGen系列三:内置智能体的应用与实战

AutoGen系列四:自定义智能体的高级技巧

AutoGen系列五: 智能体团队协作的深度剖析与实践

AutoGen 技术博客系列 (六):SelectorGroupChat 的原理与实践

AutoGen 技术博客系列 (七):状态管理与组件序列化解析

AutoGen 技术博客系列 八:深入剖析 Swarm—— 智能体协作的新范式

AutoGen 技术博客系列 九:从 v0.2 到 v0.4 的迁移指南

在 AutoGen 的世界里,智能体(Agent)构成了构建多智能体应用的基石。AutoGen 0.4 版本带来了一系列预设智能体,这些智能体封装了常用的功能与行为模式,极大地简化了复杂应用的构建过程。本文将深入剖析这些预设智能体,通过实际代码示例展示其应用场景与使用方法,并对 AutoGen 的内部机制进行深度分析。

AutoGen 预设智能体

AutoGen 0.4 版本主要提供了以下几种预设智能体,它们各自具备独特的功能与应用场景:

  • AssistantAgent:这是一个通用的助手智能体,它利用大型语言模型(LLM)进行文本生成与推理,还可通过工具(tools)扩展其能力。AssistantAgent 的核心功能是接收消息,利用 LLM 进行处理并生成响应。通过设置 system_message,能够定义智能体的角色与行为;设置 model_client 可指定使用的 LLM 模型;通过 tools 参数添加工具函数,扩展智能体功能;设置 reflect_on_tool_use=True,可让智能体反思工具的使用结果,并提供自然语言响应。
  • UserProxyAgent:该智能体主要用于接收用户输入,并将其发送给其他智能体,可看作是用户与多智能体系统交互的桥梁。其核心功能是接收用户输入,然后将其转换为消息发送给其他智能体。通过 input_func 参数,可自定义输入函数,例如使用 input () 从控制台接收用户输入。
  • CodeExecutorAgent:主要用于执行代码,能够接收代码并执行,返回代码的执行结果。通常与 AssistantAgent 结合使用,可执行由 AssistantAgent 生成的代码。通过设置 code_executor,可指定代码执行器,如 LocalCommandLineCodeExecutor。
  • OpenAIAssistantAgent:用于与 OpenAI Assistants API 集成的智能体,可复用 OpenAI Assistant 的能力。它支持自定义线程和上传文件,能与 OpenAI 的 Assistant API 进行交互。
  • MultimodalWebSurfer:这是一个多模态的网页浏览智能体,能够浏览网页并理解网页上的文本和图像。可用于网页信息的提取与分析,以及需要结合文本和图像信息的任务。
  • FileSurfer:主要用于读取和分析文件。
  • VideoSurfer:主要用于处理和分析视频。

实战示例

利用 AssistantAgent 和工具进行天气查询

通过以下实际例子,展示如何使用 AssistantAgent 和工具函数实现天气查询功能:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 定义一个工具函数,用于查询天气
async def get_weather(city: str, units: str = "imperial") -> str:
    if units == "imperial":
        return f"The weather in {city} is 73 °F and Sunny."
    elif units == "metric":
        return f"The weather in {city} is 23 °C and Sunny."
    else:
        return f"Sorry, I don't know the weather in {city}."
        
async def main():
    # 创建一个OpenAI模型客户端
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    # 创建一个AssistantAgent智能体,并添加工具函数
    assistant_agent = AssistantAgent(
        name="assistant_agent",
        model_client=model_client,
        tools=[get_weather],  # 添加天气查询工具
        system_message="You are a helpful assistant that can use tools to answer user questions. Use tools if needed.",
        reflect_on_tool_use=True
    )
    # 用户提问
    user_input = "What is the weather in New York?"
    # 调用AssistantAgent的on_messages方法处理用户输入
    response = await assistant_agent.on_messages(
        [TextMessage(content=user_input, source="user")],
        CancellationToken()
    )
    # 打印AssistantAgent的响应
    print("Assistant:", response.chat_message.content)
    
asyncio.run(main())

运行结果:

Assistant: The weather in New York is 23 °C and Sunny.

代码分析

  1. 导入必要的模块:导入 asyncio 用于异步编程,AssistantAgent 用于创建助手智能体,TextMessage 用于创建文本消息,CancellationToken 用于取消操作,OpenAIChatCompletionClient 用于创建 OpenAI 模型客户端。
  1. 定义 get_weather 工具函数:该函数接收城市名称和单位作为参数,返回天气信息。
  1. 创建 OpenAIChatCompletionClient:使用 OpenAIChatCompletionClient 创建一个 OpenAI 模型客户端,并指定使用的模型为 gpt-4o(可更换为其他模型)。
  1. 创建 AssistantAgent:创建一个名为 assistant_agent 的 AssistantAgent 实例,并配置其 model_client、tools 和 system_message。tools 参数传入 get_weather 函数,使智能体具备查询天气的能力;system_message 设置了智能体的角色和行为,使其能够使用工具来回答用户问题;reflect_on_tool_use=True 参数设置了让智能体反思工具的使用结果,并给出自然语言的回复。
  1. 处理用户输入:用户输入 “What is the weather in New York?” 并将其封装为 TextMessage。
  1. 调用 on_messages 方法:调用 assistant_agent.on_messages () 方法,将用户输入传递给智能体,并使用 CancellationToken 进行控制。

利用 AssistantAgent 和 UserProxyAgent 实现交互式诗歌创作

以下示例展示如何使用 AssistantAgent 和 UserProxyAgent 创建一个简单的诗歌创作助手,用户可以提供反馈并指导诗歌的创作过程:

import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
    assistant = AssistantAgent("assistant", model_client=model_client)
    user_proxy = UserProxyAgent("user_proxy", input_func=input)
    termination = TextMentionTermination("APPROVE")
    team = RoundRobinGroupChat([assistant, user_proxy], termination_condition=termination)
    stream = team.run_stream(task="Write a 4-line poem about the ocean.")
    await Console(stream)
    
asyncio.run(main())

运行结果:

---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
Waves whisper secrets to the shore’s embrace,
A dance of blue under the sun's warm grace.
Endless horizons where dreams take flight,
The ocean's heart glimmers, a canvas of light.
TERMINATE [Prompt tokens: 46, Completion tokens: 49]
---------- user_proxy ----------
APPROVE
---------- Summary ----------
Number of messages: 3
Finish reason: Text 'APPROVE' mentioned
Total prompt tokens: 46
Total completion tokens: 49
Duration: 6.64 seconds
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='Write a 4-line poem about the ocean.', type='TextMessage'), TextMessage(source='assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=49), content="Waves whisper secrets to the shore’s embrace, \nA dance of blue under the sun's warm grace. \nEndless horizons where dreams take flight, \nThe ocean's heart glimmers, a canvas of light. \nTERMINATE", type='TextMessage'), TextMessage(source='user_proxy', models_usage=None, content='APPROVE', type='TextMessage')], stop_reason="Text 'APPROVE' mentioned")

代码分析

  1. 创建智能体:创建一个 AssistantAgent(负责生成诗歌)和一个 UserProxyAgent(接收用户反馈)。
  1. 设置终止条件:使用 TextMentionTermination (“APPROVE”) 设置当用户输入 “APPROVE” 时终止对话。
  1. 创建团队:使用 RoundRobinGroupChat 创建一个轮流对话的团队。
  1. 运行对话:使用 team.run_stream () 启动对话,并通过 Console 输出。
  1. 用户反馈:用户输入 “APPROVE” 终止对话,并输出对话的总结信息。

使用 RoundRobinGroupChat 实现多轮对话和反馈

以下示例展示如何使用 RoundRobinGroupChat 实现多轮对话,让智能体根据用户反馈改进诗歌:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
    assistant = AssistantAgent("assistant", model_client=model_client)
    team = RoundRobinGroupChat([assistant], max_turns=1)
    task = "Write a 4-line poem about the ocean."
    while True:
        stream = team.run_stream(task=task)
        await Console(stream)
        task = input("Enter your feedback (type 'exit' to leave): ")
        if task.lower().strip() == "exit":
            break
            
asyncio.run(main())

运行结果:

---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
Endless waves in a dance with the shore,
Whispers of secrets in tales from the roar,
Beneath the vast sky, where horizons blend,
The ocean’s embrace is a timeless friend.
TERMINATE [Prompt tokens: 46, Completion tokens: 48]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 46
Total completion tokens: 48
Duration: 1.63 seconds
Enter your feedback (type 'exit' to leave): Can you make it about a person and its relationship with the ocean
---------- user ----------
Can you make it about a person and its relationship with the ocean
---------- assistant ----------
She walks along the tide, where dreams intertwine,
With every crashing wave, her heart feels aligned,
In the ocean's embrace, her worries dissolve,
A symphony of solace, where her spirit evolves.
TERMINATE [Prompt tokens: 117, Completion tokens: 49]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 117
Total completion tokens: 49
Duration: 1.21 seconds
Enter your feedback (type 'exit' to leave): exit

代码分析

  1. 创建智能体和团队:创建一个 AssistantAgent 和一个 RoundRobinGroupChat 团队,设置 max_turns=1 限制每轮对话中智能体只能发言一次。
  1. 循环运行:使用 while 循环不断运行对话,每次运行后获取用户反馈。
  1. 用户反馈:用户输入反馈,例如 “Can you make it about a person and its relationship with the ocean”,或者输入 “exit” 退出循环。
  1. 多轮对话:根据用户反馈,智能体会不断更新任务,并根据新任务生成新的诗歌。

使用 SelectorGroupChat 实现多智能体协作

以下示例展示如何使用 SelectorGroupChat 创建一个包含规划、搜索和分析功能的智能体团队,解决复杂问题:

import asyncio
from typing import Sequence
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_agentchat.messages import AgentEvent, ChatMessage
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

def search_web_tool(query: str) -> str:
    if "2006-2007" in query:
        return """Here are the total points scored by Miami Heat players in the 2006-2007 season:
        Udonis Haslem: 844 points
        Dwayne Wade: 1397 points
        James Posey: 550 points
      ...
        """
    elif "2007-2008" in query:
        return "The number of total rebounds for Dwayne Wade in the Miami Heat season 2007-2008 is 214."
    elif "2008-2009" in query:
        return "The number of total rebounds for Dwayne Wade in the Miami Heat season 2008-2009 is 398."
    return "No data found."
    
def percentage_change_tool(start: float, end: float) -> float:
    return ((end - start) / start) * 100
    
def create_team() -> SelectorGroupChat:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    planning_agent = AssistantAgent(
        "PlanningAgent",
        description="An agent for planning tasks, this agent should be the first to engage when given a new task.",
        model_client=model_client,
        system_message="""
            You are a planning agent.
            Your job is to break down complex tasks into smaller, manageable subtasks.
            Your team members are:
            Web search agent: Searches for information
            Data analyst: Performs calculations
            You only plan and delegate tasks - you do not execute them yourself.
            When assigning tasks, use this format:
            1. <agent> : <task>
            After all tasks are complete, summarize the findings and end with "TERMINATE".
        """,
    )
    web_search_agent = AssistantAgent(
        "WebSearchAgent",
        description="A web search agent.",
        tools=[search_web_tool],
        model_client=model_client,
        system_message="""
            You are a web search agent.
            Your only tool is search_tool - use it to find information.
            You make only one search call at a time.
            Once you have the results, you never do calculations based on them.
        """,
    )
    data_analyst_agent = AssistantAgent(
        "DataAnalystAgent",
        description="A data analyst agent. Useful for performing calculations.",
        model_client=model_client,
        tools=[percentage_change_tool],
        system_message="""
            You are a data analyst.
            Given the tasks you have been assigned, you should analyze the data and provide results using the tools provided.
        """,
    )
    text_mention_termination = TextMentionTermination("TERMINATE")
    max_messages_termination = MaxMessageTermination(max_messages=25)
    termination = text_mention_termination | max_messages_termination
    
    def selector_func(messages: Sequence[AgentEvent | ChatMessage]) -> str | None:
        if messages[-1].source!= planning_agent.name:
            return planning_agent.name
        return None
        
    team = SelectorGroupChat(
        [planning_agent, web_search_agent, data_analyst_agent],
        model_client=OpenAIChatCompletionClient(model="gpt-4o-mini"),
        termination_condition=termination,
        selector_func=selector_func,
    )
    return team
    
async def main():
    team = create_team()
    task = "Who was the Miami Heat player with the highest points in the 2006-2007 season, and what was the percentage change in his total rebounds between the 2007-2008 and 2008-2009 seasons?"
    await Console(team.run_stream(task=task))
    
asyncio.run(main())

运行结果:

---------- user ----------
Who was the Miami Heat player with the highest points in the 2006-2007 season, and what was the percentage change in his total rebounds between the 2007-2008 and 2008-2009 seasons?
---------- PlanningAgent ----------
To address this request, we will divide the task into manageable subtasks.
1. Web search agent: Identify the Miami Heat player with the highest points in the 2006-2007 season.
2. Web search agent: Gather the total rebounds for the identified player during the 2007-2008 season.
3. Web search agent: Gather the total rebounds for the identified player during the 2008-2009 season.
4. Data analyst: Calculate the percentage change in total rebounds for the identified player between the 2007-2008 and 2008-2009 seasons.
[Prompt tokens: 159, Completion tokens: 122]
---------- WebSearchAgent ----------
[FunctionCall(id='call_xdYlGP2lsqDeWdSiOlwOBNiO', arguments='{"query":"Miami Heat highest points player 2006-2007 season"}', name='search_web_tool')]
[Prompt tokens: 271, Completion tokens: 26]
---------- WebSearchAgent ----------
[FunctionExecutionResult(content='Here are

代码分析

1.创建智能体: 创建 PlanningAgent (规划任务),WebSearchAgent (搜索信息) 和 DataAnalystAgent (数据分析)。

2.定义工具函数: 定义 search_web_tool 用于模拟网络搜索,percentage_change_tool 用于计算百分比变化。

3.设置智能体的系统消息: 通过设置 system_message 来定义各个智能体的角色和行为,包括任务分解,工具使用等。

4.定义选择器函数: selector_func 用于定义消息传递的顺序,保证 PlanningAgent 在其他智能体发言后能够重新规划任务。

5.创建团队: 使用 SelectorGroupChat 创建团队,并指定终止条件和选择器函数。

原理及源码解读

(一)原理层面

在 AutoGen 中,预设智能体的运行基于多智能体协作与交互的原理。每个智能体都有其独特的角色和任务,通过相互之间的消息传递与协作来完成复杂任务。例如,UserProxyAgent 作为用户与系统的交互桥梁,负责接收用户输入并将其传递给其他智能体进行处理;AssistantAgent 则利用 LLM 进行文本生成与推理,根据接收到的消息和设定的规则生成响应。

这种多智能体协作模式的优势在于,它能够将复杂任务分解为多个子任务,每个智能体专注于解决自己擅长的部分,从而提高任务处理的效率和准确性。同时,通过设置不同的终止条件和对话模式,如 TextMentionTermination 和 RoundRobinGroupChat 等,能够灵活地控制智能体之间的交互流程,以适应不同的应用场景需求。

(二)源码层面

以 AssistantAgent 为例,在其源码实现中,核心的on_messages方法负责处理接收到的消息。以下是简化后的关键代码片段:

class AssistantAgent(BaseChatAgent):
    def __init__(self, name, model_client, tools=None, system_message=""):
        self.name = name
        self.model_client = model_client
        self.tools = tools
        self.system_message = system_message
    async def on_messages(self, messages, cancellation_token=None):
        # 根据消息内容和system_message构建提示信息
        prompt = self._build_prompt(messages)
        # 调用LLM模型获取响应
        response = await self.model_client.generate(prompt)
        if self.tools:
            # 判断是否需要调用工具
            tool_call = self._determine_tool_call(response)
            if tool_call:
                tool_result = self._execute_tool(tool_call)
                # 将工具执行结果融入最终响应
                response = self._incorporate_tool_result(response, tool_result)
        return response

在上述代码中,_build_prompt方法根据接收到的消息和设定的system_message构建与 LLM 交互所需的提示信息;model_client.generate方法调用 LLM 模型,获取模型生成的响应;_determine_tool_call方法判断是否需要调用工具;_execute_tool方法执行相应的工具函数;_incorporate_tool_result方法将工具执行结果融入最终响应。

再看 UserProxyAgent,其核心功能是接收用户输入并将其转换为消息发送给其他智能体。以下是其关键源码:

class UserProxyAgent(BaseChatAgent):
    def __init__(self, name, input_func=None):
        self.name = name
        self.input_func = input_func or input
    async def on_messages(self, messages, cancellation_token=None):
        prompt = "Enter your response: "
        user_input = await self._get_input(prompt, cancellation_token)
        return Response(chat_message=TextMessage(content=user_input, source=self.name))
    async def _get_input(self, prompt, cancellation_token):
        try:
            if asyncio.iscoroutinefunction(self.input_func):
                return await self.input_func(prompt, cancellation_token)
            else:
                loop = asyncio.get_event_loop()
                return await loop.run_in_executor(None, self.input_func, prompt)
        except asyncio.CancelledError:
            raise
        except Exception as e:
            raise RuntimeError(f"Failed to get user input: {str(e)}") from e

在这段代码中,__init__方法初始化UserProxyAgent,设置智能体名称和输入函数;on_messages方法在接收到消息时,提示用户输入并获取输入内容,然后将其封装成TextMessage返回;_get_input方法根据输入函数的类型(同步或异步),正确地获取用户输入。

对于 RoundRobinGroupChat 和 SelectorGroupChat 等团队协作模式,其源码实现涉及到智能体之间消息传递顺序的控制、任务分配以及终止条件的判断等逻辑。以 RoundRobinGroupChat 为例,关键代码如下:

class RoundRobinGroupChat:
    def __init__(self, agents, termination_condition):
        self.agents = agents
        self.termination_condition = termination_condition
        self.current_agent_index = 0
    async def run_stream(self, task):
        messages = [TextMessage(content=task, source="user")]
        while not self.termination_condition(messages):
            current_agent = self.agents[self.current_agent_index]
            response = await current_agent.on_messages(messages)
            messages.append(response.chat_message)
            self.current_agent_index = (self.current_agent_index + 1) % len(self.agents)
        return messages

在上述代码中,__init__方法初始化RoundRobinGroupChat,设置参与的智能体列表和终止条件;run_stream方法在接收到任务后,通过循环依次让每个智能体处理消息,直到满足终止条件。在每次循环中,获取当前智能体,调用其on_messages方法处理消息,并将响应添加到消息列表中,然后更新当前智能体的索引。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建,欢迎商务合作。wx: diudiu5555

更多推荐