AI Agent Harness Engineering 与大数据分析:海量数据处理与深度洞察提取
在当今数据驱动的时代,企业面临着前所未有的数据增长挑战。IDC 预测,到 2025 年,全球数据总量将达到 175 ZB(泽字节)。然而,数据量的增长并不直接转化为价值——大多数组织都在苦苦挣扎,试图从海量数据中提取有意义的洞察。传统的大数据分析方法虽然强大,但往往需要大量的人工干预、专业的数据科学知识,并且难以适应快速变化的业务需求。这正是AI Agent Harness Engineering
AI Agent Harness Engineering 与大数据分析:海量数据处理与深度洞察提取
第一部分:引言与基础
1. 引人注目的标题
主标题: AI Agent Harness Engineering 与大数据分析:海量数据处理与深度洞察提取
副标题: 构建智能代理系统,从 PB 级数据中自动发现商业价值的完整指南
2. 摘要/引言
在当今数据驱动的时代,企业面临着前所未有的数据增长挑战。IDC 预测,到 2025 年,全球数据总量将达到 175 ZB(泽字节)。然而,数据量的增长并不直接转化为价值——大多数组织都在苦苦挣扎,试图从海量数据中提取有意义的洞察。
传统的大数据分析方法虽然强大,但往往需要大量的人工干预、专业的数据科学知识,并且难以适应快速变化的业务需求。这正是 AI Agent Harness Engineering(人工智能代理工程) 能够发挥变革性作用的地方。
本文将深入探讨如何设计、构建和部署智能 AI 代理系统,使其能够自主地处理海量数据、识别模式、生成洞察,并以自然语言或可视化方式呈现给决策者。我们将结合实际案例,展示这一技术如何彻底改变数据分析的游戏规则。
读完本文后,你将:
- 理解 AI Agent Harness Engineering 的核心概念与架构
- 掌握构建数据处理 AI 代理的关键技术与方法
- 了解如何将 AI 代理与现有的大数据基础设施集成
- 获得可直接应用于实际项目的代码示例与最佳实践
文章将从基础概念开始,逐步深入到实际实现,最后探讨未来发展趋势,为你提供一份全面的技术指南。
3. 目标读者与前置知识
目标读者:
- 数据科学家与数据工程师
- AI/ML 工程师与研究人员
- 大数据架构师与解决方案专家
- 希望利用 AI 提升数据分析能力的技术领导者
前置知识:
- 基础的 Python 编程能力
- 对机器学习与深度学习概念有基本了解
- 熟悉大数据处理框架(如 Spark、Hadoop)的基本概念
- 了解基本的 API 设计与微服务架构
4. 文章目录
第二部分:核心内容
5. 问题背景与动机
5.1 大数据分析的现状与挑战
在过去十年中,大数据技术取得了显著进展。Hadoop、Spark、Flink 等框架的出现,使得我们能够存储和处理 TB 甚至 PB 级别的数据。然而,随着数据量的持续增长和业务需求的日益复杂,传统方法面临着诸多挑战:
- 人力瓶颈:数据分析流程的许多环节仍然依赖人工操作,从数据清洗、特征工程到模型选择和结果解释,都需要专业的数据科学家参与。
- 时效性问题:传统的批处理方法难以满足实时决策的需求,而构建实时分析系统又需要大量的工程投入。
- 复杂性管理:随着数据来源的多样化和分析任务的复杂化,数据管道变得越来越难以管理和维护。
- 洞察发现的被动性:大多数分析系统都是"查询驱动"的,只有当用户提出具体问题时才会返回结果,而不是主动发现潜在的有价值模式。
5.2 AI Agent 技术的兴起
与此同时,人工智能技术,特别是大语言模型(LLMs)的突破,为解决这些问题提供了新的思路。AI 代理(AI Agents)作为一种能够感知环境、做出决策并执行行动的智能系统,正逐渐从研究实验室走向实际应用。
AI 代理具有以下关键特性,使其特别适合解决大数据分析的挑战:
- 自主性:能够在没有持续人工干预的情况下完成任务
- 反应性:能够感知环境变化并及时做出响应
- 主动性:不仅能响应环境,还能主动采取行动实现目标
- 社交能力:能够与其他代理或人类进行交互和协作
5.3 为什么需要 AI Agent Harness Engineering?
虽然 AI 代理技术前景广阔,但将其有效地应用于大数据分析场景并非易事。我们需要一套系统化的方法论来指导 AI 代理的设计、开发、部署和管理——这正是 AI Agent Harness Engineering 所要解决的问题。
这个新兴领域关注的是:
- 如何设计适合数据处理任务的代理架构
- 如何确保代理的行为可预测、可解释
- 如何高效地管理代理的资源使用
- 如何评估代理的性能与业务价值
- 如何将代理与现有的数据基础设施集成
6. 核心概念与理论基础
6.1 什么是 AI Agent?
核心概念:
AI Agent(人工智能代理)是一种能够感知环境、通过推理做出决策、并执行行动以实现特定目标的计算系统。
在大数据分析的语境下,AI Agent 可以被定义为:
一个自主系统,能够访问数据源,理解分析需求,选择合适的分析方法,执行数据处理任务,解释结果,并以人类可理解的方式呈现洞察。
6.2 AI Agent 的核心组成要素
一个典型的 AI Agent 系统由以下几个核心要素组成:
- 感知模块(Perception Module):负责从环境中收集信息,包括数据接入、状态监控等。
- 推理引擎(Reasoning Engine):核心决策单元,基于感知到的信息和内部知识做出决策。
- 行动执行器(Action Executor):负责执行推理引擎做出的决策,如运行查询、调用 API、生成报告等。
- 记忆系统(Memory System):存储代理的历史经验、学习到的知识和当前状态。
- 目标管理(Goal Management):定义和维护代理的目标,指导代理的行为。
6.3 概念之间的关系
为了更好地理解这些核心概念及其相互关系,我们可以通过以下图表来展示:
6.3.1 概念核心属性维度对比
| 概念 | 主要功能 | 自主性要求 | 实时性要求 | 复杂度 | 依赖关系 |
|---|---|---|---|---|---|
| 感知模块 | 数据收集与预处理 | 低 | 高 | 中 | 环境/数据源 |
| 推理引擎 | 决策与规划 | 高 | 中 | 高 | 感知模块/记忆系统 |
| 行动执行器 | 任务执行与反馈 | 中 | 高 | 中 | 推理引擎 |
| 记忆系统 | 知识存储与检索 | 低 | 中 | 高 | 所有其他模块 |
| 目标管理 | 目标定义与优先级 | 高 | 低 | 中 | 业务需求/用户输入 |
6.3.2 AI Agent 核心组件 ER 实体关系图
6.3.3 AI Agent 交互关系图
6.4 大数据分析 AI Agent 的类型
根据功能和应用场景,我们可以将大数据分析 AI Agent 分为以下几类:
- 数据探索代理(Data Exploration Agents):帮助用户自动发现数据集中的模式、异常和关系。
- 查询生成代理(Query Generation Agents):将自然语言问题转换为 SQL、Spark 或其他查询语言。
- 流水线优化代理(Pipeline Optimization Agents):自动优化数据处理流水线,提高效率和降低成本。
- 洞察解释代理(Insight Interpretation Agents):将复杂的分析结果转换为业务人员可理解的自然语言解释。
- 预测性分析代理(Predictive Analytics Agents):自动构建、训练和部署预测模型,提供前瞻性洞察。
6.5 AI Agent Harness Engineering 的理论基础
AI Agent Harness Engineering 建立在多个学科的理论基础之上:
- 控制论(Cybernetics):研究系统如何通过反馈控制来实现目标。
- 决策理论(Decision Theory):提供在不确定条件下做出最优决策的数学框架。
- 机器学习(Machine Learning):使代理能够从数据中学习并改进其性能。
- 多智能体系统(Multi-Agent Systems):研究多个代理如何协作完成复杂任务。
- 软件工程(Software Engineering):提供构建可靠、可维护系统的方法论。
6.6 数学模型:马尔可夫决策过程(MDP)
在设计 AI Agent 时,我们经常使用马尔可夫决策过程(Markov Decision Process, MDP)来形式化决策问题。MDP 可以用一个五元组 <S,A,P,R,γ><S, A, P, R, \gamma><S,A,P,R,γ> 来表示:
- SSS:状态空间,代表环境所有可能状态的集合
- AAA:动作空间,代表代理可以执行的所有动作的集合
- P(s′∣s,a)P(s'|s, a)P(s′∣s,a):状态转移概率,表示在状态 sss 执行动作 aaa 后转移到状态 s′s's′ 的概率
- R(s,a,s′)R(s, a, s')R(s,a,s′):奖励函数,表示在状态 sss 执行动作 aaa 转移到状态 s′s's′ 时获得的即时奖励
- γ∈[0,1]\gamma \in [0, 1]γ∈[0,1]:折扣因子,用于权衡当前奖励与未来奖励的重要性
代理的目标是找到一个策略 π:S→A\pi: S \rightarrow Aπ:S→A,使得长期累积奖励的期望最大化:
E[∑t=0∞γtR(st,at,st+1)] E\left[\sum_{t=0}^{\infty} \gamma^t R(s_t, a_t, s_{t+1})\right] E[t=0∑∞γtR(st,at,st+1)]
在大数据分析场景中,状态 sss 可能代表当前数据处理进度、可用资源、已发现的模式等;动作 aaa 可能代表选择特定的分析算法、调整参数、分配计算资源等;奖励 RRR 可能基于分析结果的质量、资源使用效率、时间成本等因素来定义。
7. 环境准备
在开始构建我们的 AI Agent 大数据分析系统之前,我们需要准备一个合适的开发环境。本节将详细介绍所需的软件、库和框架。
7.1 系统要求
- 操作系统:Linux(推荐 Ubuntu 20.04+)、macOS 10.15+ 或 Windows 10/11(使用 WSL2)
- 内存:至少 16GB RAM(推荐 32GB+)
- 存储:至少 50GB 可用空间
- Python:3.9 或更高版本
7.2 核心软件与库
我们将使用以下开源库和框架来构建我们的系统:
| 类别 | 工具/库 | 版本 | 用途 |
|---|---|---|---|
| 编程语言 | Python | 3.9+ | 主要开发语言 |
| AI 框架 | LangChain | 0.1.0+ | 构建 AI 应用的框架 |
| AI 框架 | LlamaIndex | 0.10.0+ | 数据框架,连接 LLM 与私有数据 |
| 大语言模型 | OpenAI GPT-4 / Claude 3 | - | 提供智能推理能力 |
| 向量数据库 | ChromaDB / Pinecone | 0.4.0+ | 存储和检索向量嵌入 |
| 大数据处理 | PySpark | 3.4+ | 分布式数据处理 |
| 工作流编排 | Apache Airflow | 2.6+ | 调度和监控工作流 |
| API 框架 | FastAPI | 0.100+ | 构建高性能 API |
| 容器化 | Docker | 24.0+ | 应用容器化 |
| 可视化 | Matplotlib / Plotly | 3.7+ / 5.15+ | 数据可视化 |
7.3 安装步骤
7.3.1 基础环境设置
首先,让我们创建一个虚拟环境并安装必要的依赖:
# 创建项目目录
mkdir ai-agent-bigdata && cd ai-agent-bigdata
# 创建虚拟环境
python3 -m venv venv
# 激活虚拟环境
source venv/bin/activate # Linux/macOS
# 或
.\venv\Scripts\activate # Windows
# 升级 pip
pip install --upgrade pip
7.3.2 安装核心库
创建一个 requirements.txt 文件,包含以下内容:
# AI/LLM 相关
langchain==0.1.10
langchain-openai==0.0.5
llama-index==0.10.10
chromadb==0.4.24
openai==1.13.3
tiktoken==0.6.0
# 大数据处理
pyspark==3.5.1
pandas==2.2.1
numpy==1.26.4
# API 和 Web
fastapi==0.110.0
uvicorn==0.27.1
pydantic==2.6.3
# 工作流和调度
apache-airflow==2.8.3
# 可视化
matplotlib==3.8.3
plotly==5.19.0
seaborn==0.13.2
# 工具和辅助
python-dotenv==1.0.1
python-json-logger==2.0.7
tenacity==8.2.3
然后安装这些依赖:
pip install -r requirements.txt
7.3.3 环境变量配置
创建一个 .env 文件来存储敏感信息和配置:
# OpenAI API 配置
OPENAI_API_KEY=your_openai_api_key_here
OPENAI_MODEL_NAME=gpt-4-turbo-preview
# 向量数据库配置
CHROMA_DB_PATH=./data/chroma_db
# Spark 配置
SPARK_HOME=/path/to/spark/home
SPARK_DRIVER_MEMORY=8g
SPARK_EXECUTOR_MEMORY=4g
# 应用配置
APP_NAME=AIAgentBigDataAnalytics
APP_HOST=0.0.0.0
APP_PORT=8000
LOG_LEVEL=INFO
请确保将 your_openai_api_key_here 替换为你的实际 OpenAI API 密钥,并根据需要调整其他配置。
7.3.4 Docker 环境(可选)
如果你希望使用 Docker 来容器化你的应用,可以创建一个 Dockerfile:
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
openjdk-11-jdk \
&& rm -rf /var/lib/apt/lists/*
# 设置 JAVA_HOME
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
# 复制 requirements.txt
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建必要的目录
RUN mkdir -p ./data/chroma_db ./logs
# 暴露端口
EXPOSE 8000
# 启动应用
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
同时创建一个 docker-compose.yml 文件来编排服务:
version: '3.8'
services:
ai-agent-bigdata:
build: .
ports:
- "8000:8000"
volumes:
- ./data:/app/data
- ./logs:/app/logs
env_file:
- .env
depends_on:
- chromadb
networks:
- ai-agent-network
chromadb:
image: chromadb/chroma:latest
ports:
- "8001:8000"
volumes:
- chroma-data:/chroma/chroma
networks:
- ai-agent-network
spark-master:
image: bitnami/spark:3.5
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- "8080:8080"
- "7077:7077"
networks:
- ai-agent-network
spark-worker:
image: bitnami/spark:3.5
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_EXECUTOR_MEMORY=4G
- SPARK_WORKER_CORES=2
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
depends_on:
- spark-master
networks:
- ai-agent-network
volumes:
chroma-data:
networks:
ai-agent-network:
driver: bridge
8. 分步实现
现在我们已经准备好开发环境,让我们开始构建我们的 AI Agent 大数据分析系统。我们将按照以下步骤进行:
- 系统架构设计
- 数据感知模块实现
- 推理引擎实现
- 行动执行器实现
- 记忆系统实现
- 目标管理模块实现
- API 接口实现
- 前端界面实现(可选)
8.1 系统架构设计
首先,让我们设计整体系统架构。我们的系统将采用模块化设计,各组件之间通过定义良好的接口进行通信。
┌─────────────────────────────────────────────────────────────────┐
│ 用户界面层 (UI) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 自然语言查询 │ │ 可视化仪表盘 │ │ 洞察报告生成 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ API 网关层 (API Gateway) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 认证授权 │ 请求路由 │ 速率限制 │ 日志记录 │ 监控 │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ AI Agent 协调层 (Orchestration) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ 任务分解器 │ │ Agent 池管理 │ │ 结果聚合器 │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 数据探索 Agent │ │ 查询生成 Agent │ │ 洞察解释 Agent │
├──────────────────┤ ├──────────────────┤ ├──────────────────┤
│ • 感知模块 │ │ • 感知模块 │ │ • 感知模块 │
│ • 推理引擎 │ │ • 推理引擎 │ │ • 推理引擎 │
│ • 行动执行器 │ │ • 行动执行器 │ │ • 行动执行器 │
│ • 记忆系统 │ │ • 记忆系统 │ │ • 记忆系统 │
│ • 目标管理 │ │ • 目标管理 │ │ • 目标管理 │
└──────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
└───────────────────┼───────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ 数据与服务层 (Data & Services) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 数据湖/仓库 │ │ Spark 集群 │ │ 向量数据库 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ LLM API │ │ 外部数据源 │ │ 结果存储 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
现在让我们创建项目目录结构:
ai-agent-bigdata/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 主应用
│ ├── api/ # API 路由
│ │ ├── __init__.py
│ │ ├── queries.py
│ │ ├── insights.py
│ │ └── agents.py
│ ├── agents/ # AI Agent 实现
│ │ ├── __init__.py
│ │ ├── base.py # 基础 Agent 类
│ │ ├── data_explorer.py # 数据探索 Agent
│ │ ├── query_generator.py # 查询生成 Agent
│ │ └── insight_interpreter.py # 洞察解释 Agent
│ ├── core/ # 核心组件
│ │ ├── __init__.py
│ │ ├── perception.py # 感知模块
│ │ ├── reasoning.py # 推理引擎
│ │ ├── action.py # 行动执行器
│ │ ├── memory.py # 记忆系统
│ │ └── goals.py # 目标管理
│ ├── services/ # 外部服务
│ │ ├── __init__.py
│ │ ├── spark_service.py
│ │ ├── vector_db_service.py
│ │ └── llm_service.py
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── schemas.py
│ │ └── database.py
│ └── utils/ # 工具函数
│ ├── __init__.py
│ ├── logging.py
│ └── helpers.py
├── data/ # 数据存储
│ ├── chroma_db/ # ChromaDB 数据
│ ├── sample_data/ # 示例数据
│ └── results/ # 分析结果
├── notebooks/ # Jupyter notebooks
├── tests/ # 测试代码
├── logs/ # 日志文件
├── .env # 环境变量
├── requirements.txt # Python 依赖
├── Dockerfile # Docker 配置
├── docker-compose.yml # Docker Compose 配置
└── README.md # 项目说明
8.2 数据感知模块实现
感知模块负责从各种数据源收集信息,并将其转换为代理可以理解的格式。让我们实现这个模块。
首先,在 app/core/perception.py 中:
"""
感知模块 - 负责从环境中收集信息
"""
import os
import logging
from typing import List, Dict, Any, Optional, Union
from abc import ABC, abstractmethod
from datetime import datetime
import pandas as pd
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType
from app.utils.logging import get_logger
logger = get_logger(__name__)
class DataSource(ABC):
"""数据源抽象基类"""
@abstractmethod
def connect(self) -> bool:
"""连接到数据源"""
pass
@abstractmethod
def disconnect(self) -> None:
"""断开与数据源的连接"""
pass
@abstractmethod
def list_tables(self) -> List[str]:
"""列出数据源中的所有表/数据集"""
pass
@abstractmethod
def get_schema(self, table_name: str) -> StructType:
"""获取指定表的 schema"""
pass
@abstractmethod
def load_data(self, table_name: str,
filters: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None) -> Union[pd.DataFrame, DataFrame]:
"""加载数据"""
pass
@abstractmethod
def execute_query(self, query: str) -> Union[pd.DataFrame, DataFrame]:
"""执行查询"""
pass
class SparkDataSource(DataSource):
"""基于 Spark 的数据源实现"""
def __init__(self, spark_session: Optional[SparkSession] = None,
warehouse_path: Optional[str] = None):
self.spark = spark_session
self.warehouse_path = warehouse_path or os.getenv("SPARK_WAREHOUSE_PATH", "./data/spark-warehouse")
self.connected = False
def connect(self) -> bool:
"""连接到 Spark"""
try:
if not self.spark:
logger.info("创建新的 Spark 会话")
self.spark = SparkSession.builder \
.appName("AIAgentBigData") \
.config("spark.sql.warehouse.dir", self.warehouse_path) \
.config("spark.driver.memory", os.getenv("SPARK_DRIVER_MEMORY", "8g")) \
.config("spark.executor.memory", os.getenv("SPARK_EXECUTOR_MEMORY", "4g")) \
.enableHiveSupport() \
.getOrCreate()
self.connected = True
logger.info("成功连接到 Spark")
return True
except Exception as e:
logger.error(f"连接 Spark 失败: {str(e)}")
return False
def disconnect(self) -> None:
"""断开与 Spark 的连接"""
if self.spark and self.connected:
self.spark.stop()
self.connected = False
logger.info("已断开与 Spark 的连接")
def list_tables(self) -> List[str]:
"""列出所有表"""
if not self.connected:
raise RuntimeError("数据源未连接")
try:
tables = self.spark.catalog.listTables()
return [table.name for table in tables]
except Exception as e:
logger.error(f"获取表列表失败: {str(e)}")
return []
def get_schema(self, table_name: str) -> StructType:
"""获取表的 schema"""
if not self.connected:
raise RuntimeError("数据源未连接")
try:
df = self.spark.table(table_name)
return df.schema
except Exception as e:
logger.error(f"获取表 {table_name} 的 schema 失败: {str(e)}")
raise
def load_data(self, table_name: str,
filters: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None) -> DataFrame:
"""加载数据"""
if not self.connected:
raise RuntimeError("数据源未连接")
try:
df = self.spark.table(table_name)
# 应用过滤条件
if filters:
for col, val in filters.items():
if isinstance(val, list):
df = df.filter(df[col].isin(val))
else:
df = df.filter(df[col] == val)
# 限制行数
if limit:
df = df.limit(limit)
return df
except Exception as e:
logger.error(f"加载表 {table_name} 的数据失败: {str(e)}")
raise
def execute_query(self, query: str) -> DataFrame:
"""执行 Spark SQL 查询"""
if not self.connected:
raise RuntimeError("数据源未连接")
try:
logger.info(f"执行 Spark SQL 查询: {query}")
return self.spark.sql(query)
except Exception as e:
logger.error(f"执行查询失败: {str(e)}")
raise
class DataProcessor:
"""数据处理器 - 负责数据预处理和转换"""
def __init__(self):
pass
def profile_data(self, df: Union[pd.DataFrame, DataFrame]) -> Dict[str, Any]:
"""生成数据概览"""
profile = {
"timestamp": datetime.now().isoformat(),
"row_count": None,
"column_count": None,
"columns": [],
"sample_data": None
}
try:
# 处理 Spark DataFrame
if hasattr(df, "count") and callable(getattr(df, "count")):
profile["row_count"] = df.count()
profile["column_count"] = len(df.columns)
# 获取列信息
for field in df.schema.fields:
col_info = {
"name": field.name,
"type": str(field.dataType),
"nullable": field.nullable
}
profile["columns"].append(col_info)
# 获取样本数据
profile["sample_data"] = df.limit(5).toPandas().to_dict(orient="records")
# 处理 Pandas DataFrame
else:
profile["row_count"] = len(df)
profile["column_count"] = len(df.columns)
# 获取列信息
for col in df.columns:
col_info = {
"name": col,
"type": str(df[col].dtype),
"nullable": df[col].isnull().any()
}
profile["columns"].append(col_info)
# 获取样本数据
profile["sample_data"] = df.head().to_dict(orient="records")
logger.info(f"数据概览生成完成: {profile['row_count']} 行, {profile['column_count']} 列")
return profile
except Exception as e:
logger.error(f"生成数据概览失败: {str(e)}")
raise
def detect_anomalies(self, df: Union[pd.DataFrame, DataFrame],
column: str, method: str = "iqr") -> Dict[str, Any]:
"""检测异常值"""
# 简化的异常检测实现
pass
def compute_statistics(self, df: Union[pd.DataFrame, DataFrame],
columns: Optional[List[str]] = None) -> Dict[str, Any]:
"""计算统计指标"""
# 简化的统计计算实现
pass
class PerceptionModule:
"""感知模块 - 整合数据源和数据处理功能"""
def __init__(self, data_sources: Optional[List[DataSource]] = None):
self.data_sources = data_sources or []
self.data_processor = DataProcessor()
self.active_source = None
def add_data_source(self, source: DataSource) -> None:
"""添加数据源"""
self.data_sources.append(source)
logger.info(f"添加数据源: {type(source).__name__}")
def connect_to_source(self, source_index: int = 0) -> bool:
"""连接到指定数据源"""
if source_index >= len(self.data_sources):
logger.error(f"数据源索引 {source_index} 超出范围")
return False
source = self.data_sources[source_index]
if source.connect():
self.active_source = source
return True
return False
def perceive(self, perception_request: Dict[str, Any]) -> Dict[str, Any]:
"""
感知环境 - 主感知方法
Args:
perception_request: 感知请求,包含感知类型和参数
Returns:
感知结果
"""
if not self.active_source:
raise RuntimeError("没有活跃的数据源,请先连接到数据源")
perception_type = perception_request.get("type", "data_profile")
params = perception_request.get("params", {})
logger.info(f"执行感知操作: {perception_type}")
try:
if perception_type == "list_tables":
# 列出所有表
tables = self.active_source.list_tables()
return {
"type": "table_list",
"tables": tables,
"count": len(tables)
}
elif perception_type == "get_schema":
# 获取表结构
table_name = params.get("table_name")
if not table_name:
raise ValueError("需要指定 table_name")
schema = self.active_source.get_schema(table_name)
return {
"type": "schema",
"table_name": table_name,
"schema": {field.name: str(field.dataType) for field in schema.fields}
}
elif perception_type == "data_profile":
# 数据概览
table_name = params.get("table_name")
query = params.get("query")
if query:
df = self.active_source.execute_query(query)
elif table_name:
df = self.active_source.load_data(table_name, limit=params.get("limit"))
else:
raise ValueError("需要指定 table_name 或 query")
profile = self.data_processor.profile_data(df)
return {
"type": "data_profile",
"profile": profile
}
elif perception_type == "execute_query":
# 执行查询
query = params.get("query")
if not query:
raise ValueError("需要指定 query")
df = self.active_source.execute_query(query)
result_profile = self.data_processor.profile_data(df)
return {
"type": "query_result",
"query": query,
"result_profile": result_profile
}
else:
raise ValueError(f"未知的感知类型: {perception_type}")
except Exception as e:
logger.error(f"感知操作失败: {str(e)}")
return {
"type": "error",
"error": str(e)
}
8.3 记忆系统实现
记忆系统负责存储代理的历史经验、学习到的知识和当前状态。这对于代理的长期学习和上下文理解至关重要。
在 app/core/memory.py 中:
"""
记忆系统 - 负责存储和检索代理的经验与知识
"""
import os
import json
import logging
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from enum import Enum
from abc import ABC, abstractmethod
import chromadb
from chromadb.utils import embedding_functions
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from app.utils.logging import get_logger
logger = get_logger(__name__)
class MemoryType(Enum):
"""记忆类型枚举"""
EPISODIC = "episodic" # 情景记忆 - 存储事件和经历
SEMANTIC = "semantic" # 语义记忆 - 存储事实和知识
PROCEDURAL = "procedural" # 程序记忆 - 存储技能和流程
WORKING = "working" # 工作记忆 - 临时存储当前任务相关信息
class MemoryItem:
"""记忆项 - 表示一条记忆"""
def __init__(self, content: Any, memory_type: MemoryType,
metadata: Optional[Dict[str, Any]] = None,
timestamp: Optional[datetime] = None):
self.content = content
self.memory_type = memory_type
self.metadata = metadata or {}
self.timestamp = timestamp or datetime.now()
self.access_count = 0
self.last_accessed = None
self.importance = metadata.get("importance", 0.5) # 0-1 范围
def access(self) -> None:
"""记录记忆访问"""
self.access_count += 1
self.last_accessed = datetime.now()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"content": self.content,
"memory_type": self.memory_type.value,
"metadata": self.metadata,
"timestamp": self.timestamp.isoformat(),
"access_count": self.access_count,
"last_accessed": self.last_accessed.isoformat() if self.last_accessed else None,
"importance": self.importance
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MemoryItem":
"""从字典创建"""
item = cls(
content=data["content"],
memory_type=MemoryType(data["memory_type"]),
metadata=data.get("metadata", {}),
timestamp=datetime.fromisoformat(data["timestamp"])
)
item.access_count = data.get("access_count", 0)
item.last_accessed = datetime.fromisoformat(data["last_accessed"]) if data.get("last_accessed") else None
item.importance = data.get("importance", 0.5)
return item
class MemoryStore(ABC):
"""记忆存储抽象基类"""
@abstractmethod
def add(self, item: MemoryItem) -> str:
"""添加一条记忆,返回记忆 ID"""
pass
@abstractmethod
def get(self, memory_id: str) -> Optional[MemoryItem]:
"""根据 ID 获取记忆"""
pass
@abstractmethod
def search(self, query: str, memory_type: Optional[MemoryType] = None,
limit: int = 10, **kwargs) -> List[Tuple[MemoryItem, float]]:
"""
搜索相关记忆
Args:
query: 查询文本
memory_type: 可选的记忆类型过滤
limit: 返回结果数量限制
**kwargs: 其他过滤条件
Returns:
(记忆项, 相关度分数) 列表
"""
pass
@abstractmethod
def update(self, memory_id: str, item: MemoryItem) -> bool:
"""更新记忆"""
pass
@abstractmethod
def delete(self, memory_id: str) -> bool:
"""删除记忆"""
pass
@abstractmethod
def get_recent(self, memory_type: Optional[MemoryType] = None,
limit: int = 10) -> List[MemoryItem]:
"""获取最近的记忆"""
pass
class VectorMemoryStore(MemoryStore):
"""基于向量数据库的记忆存储实现"""
def __init__(self, persist_directory: Optional[str] = None,
collection_name: str = "ai_agent_memories",
embedding_model: str = "text-embedding-ada-002"):
self.persist_directory = persist_directory or os.getenv("CHROMA_DB_PATH", "./data/chroma_db")
self.collection_name = collection_name
self.embedding_model = embedding_model
# 初始化 ChromaDB 客户端
self.client = chromadb.PersistentClient(path=self.persist_directory)
# 初始化嵌入函数
self.embedding_function = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name=embedding_model
)
# 获取或创建集合
self.collection = self.client.get_or_create_collection(
name=collection_name,
embedding_function=self.embedding_function,
metadata={"description": "AI Agent 记忆存储"}
)
# 内存缓存,提高访问速度
self._cache: Dict[str, MemoryItem] = {}
logger.info(f"向量记忆存储初始化完成,集合: {collection_name}")
def _serialize_content(self, content: Any) -> str:
"""将内容序列化为字符串"""
if isinstance(content, str):
return content
elif isinstance(content, dict) or isinstance(content, list):
return json.dumps(content, ensure_ascii=False)
else:
return str(content)
def add(self, item: MemoryItem) -> str:
"""添加一条记忆"""
memory_id = f"mem_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
# 准备文档
content_str = self._serialize_content(item.content)
# 准备元数据
metadata = item.metadata.copy()
metadata.update({
"memory_type": item.memory_type.value,
"timestamp": item.timestamp.isoformat(),
"access_count": item.access_count,
"importance": item.importance
})
# 添加到向量数据库
self.collection.add(
ids=[memory_id],
documents=[content_str],
metadatas=[metadata]
)
# 添加到缓存
self._cache[memory_id] = item
logger.debug(f"添加记忆: {memory_id}, 类型: {item.memory_type.value}")
return memory_id
def get(self, memory_id: str) -> Optional[MemoryItem]:
"""根据 ID 获取记忆"""
# 先检查缓存
if memory_id in self._cache:
item = self._cache[memory_id]
item.access()
return item
# 从向量数据库获取
try:
results = self.collection.get(ids=[memory_id])
if not results["ids"]:
return None
# 重建记忆项
metadata = results["metadatas"][0]
content = results["documents"][0]
# 尝试解析 JSON 内容
try:
content = json.loads(content)
except json.JSONDecodeError:
pass
item = MemoryItem
更多推荐


所有评论(0)