1. 项目概述:在Mac Mini上构建一个自主运行的Claude代码代理

最近在折腾一个挺有意思的项目:让Claude写的代码能在一个独立的Mac Mini上,像拥有自主意识一样持续运行、自我迭代。这听起来有点像科幻电影里的场景,但实际做下来,发现技术路径比想象中清晰。核心思路是搭建一个闭环系统,让Claude生成的代码不仅能执行,还能根据执行结果和环境反馈,自动分析问题、调整策略、生成新的代码,形成一个自我驱动的循环。

这个项目的价值在于,它把大语言模型的代码生成能力从“一次性工具”升级为“持续运行的智能体”。想象一下,你有一个数据分析任务,传统的做法是你写脚本、跑数据、看结果、改代码,循环往复。而在这个系统里,你只需要定义好目标(比如“监控服务器日志,发现异常模式并自动告警”),Claude代理就会自己写出初始的监控脚本,运行它,分析日志输出,如果发现脚本有bug或者监控逻辑不完善,它会自己诊断问题,生成修复补丁或优化版本,然后再次部署运行。整个过程无需人工干预,Mac Mini就成了一个24小时在线的“数字员工”。

为什么选择Mac Mini?这其实是个很务实的选择。首先,它的功耗和噪音控制得非常好,适合7x24小时运行,放在家里或办公室角落都不会打扰。其次,M系列芯片的能效比惊人,处理日常的脚本任务、轻量级服务游刃有余,而且macOS系统对开发者极其友好,各种命令行工具和开发环境一应俱全。最后,它的成本相对可控,是个人开发者或小团队进行AI智能体实验的理想硬件平台。这个项目本质上是在探索如何将云端大模型的“思考”能力,与本地硬件的“执行”和“持久化”能力深度结合,创造出一个稳定、私密且可长期运行的自动化解决方案。

2. 核心架构设计与技术选型

要让Claude代码真正自主运行,不能只靠一个简单的脚本。我们需要设计一个具备感知、决策、执行和学习能力的完整架构。经过几轮迭代,我最终确定了一个分层架构,它主要包含四个核心模块: 任务调度与状态管理中枢 Claude API交互与代码生成器 代码安全沙箱与执行引擎 ,以及 结果分析与迭代决策器

2.1 架构分层解析

第一层是 任务调度与状态管理中枢 。这是整个系统的大脑,负责一切流程控制。它需要维护一个任务队列,解析用户或系统设定的长期目标(例如“保持网站健康度”),并将其分解为一系列可执行的原子任务(如“检查API响应时间”、“扫描错误日志”)。同时,它还必须持久化保存整个智能体的状态,包括历史任务记录、生成的代码版本、执行结果、学习到的经验等。我选择了SQLite作为状态数据库,因为它轻量、无需额外服务,非常适合Mac Mini这种单机环境。用一个Python脚本来实现这个调度器,它就是个永不退出的守护进程。

第二层是 Claude API交互与代码生成器 。这个模块负责与Claude对话。它的输入是当前任务描述、相关上下文(如之前的错误信息)以及系统对代码的约束(比如必须用Python 3.9,不能访问网络等)。它的输出是一段可执行的代码。这里的关键是设计高质量的提示词(Prompt)。你不能简单地说“写个监控脚本”,而要给出详细的规格:函数名、输入输出格式、需要使用的库、异常处理要求等。同时,必须建立一个“对话历史”的上下文管理机制,让Claude能基于之前的交互持续改进代码,而不是每次都从零开始。

第三层是 代码安全沙箱与执行引擎 。这是安全保障和实际运行的地方。绝对不能让Claude生成的代码拥有无限制的权限。因此,我引入了 Docker容器 作为安全沙箱。每一个由Claude生成的代码任务,都会被放入一个全新的、资源受限的Docker容器中运行。这个容器只有必要的运行权限,文件系统是只读的(除了特定的数据卷),网络访问也被严格限制。执行引擎负责创建容器、注入代码和输入数据、启动执行、捕获输出(包括标准输出、标准错误和退出码),最后在任务完成后销毁容器。这确保了即使生成的代码有恶意行为,也不会影响到宿主机Mac Mini的安全。

第四层是 结果分析与迭代决策器 。代码执行完毕后,会产生结果。这个模块负责分析结果:任务成功了吗?输出是否符合预期?有没有报错?如果有错误,错误信息是什么?它需要从自然语言的错误信息或非预期的输出中,提取出结构化的问题描述。然后,决策器会根据预设的策略(例如:语法错误直接重试、逻辑错误则提供更多上下文给Claude重新生成)来决定下一步动作:是认为任务成功并结束,还是生成一个新的调试任务交给Claude去修复代码。这一步是“自主”的核心,它让系统具备了从失败中学习并调整的能力。

2.2 关键技术选型理由

  1. 编程语言:Python 。这是不二之选。它有极其丰富的库来支持所有我们需要的东西:HTTP请求(调用Claude API)、子进程管理(执行命令)、Docker SDK(控制容器)、SQLite操作、以及各种文本处理工具。生态成熟,开发效率高。
  2. 容器化技术:Docker Desktop for Mac 。在Mac Mini上运行Docker非常方便。Docker Desktop提供了完整的容器运行时,并且与M系列芯片(ARM架构)兼容性好。使用容器而非原生进程,提供了最强的隔离性和环境一致性,确保Claude生成的代码在任何时候都能在预期的环境中运行。
  3. 状态存储:SQLite 。简单、可靠、零配置。所有任务历史、代码片段、执行日志都可以存在一个本地 .db 文件里,方便查询和调试。对于单机智能体来说,它比任何外部数据库都合适。
  4. 进程管理:Supervisor 。我们需要让调度中枢(一个Python脚本)长期运行。Supervisor是一个进程控制工具,可以确保我们的主脚本在崩溃后自动重启,同时还能方便地管理日志轮转。通过Homebrew安装和配置Supervisor,能让整个系统更健壮。

注意 :在提示词中必须明确禁止Claude生成任何形式的网络代理、隧道或绕过网络限制的代码。安全策略中也要在Docker容器级别禁用非常规端口的出站连接。这是保障系统合规运行的底线。

这个架构看起来有点复杂,但每个模块的职责都很清晰。搭建起来后,它就像一个精密的自动化工厂:调度中心是生产计划科,Claude是研发工程师,Docker沙箱是安全的生产车间,分析决策器是质量检测与工艺改进部门。整个流水线协同工作,最终实现代码的自主生产和进化。

3. 环境准备与核心配置

在Mac Mini上搭建这个自主代理,第一步是把地基打牢。这个地基包括开发环境、必要的服务以及安全配置。下面我会详细拆解每一步的操作和背后的考量。

3.1 基础开发环境搭建

首先,确保你的Mac Mini运行的是较新版本的macOS(Sonoma或更高推荐),并安装了Xcode Command Line Tools。打开终端,执行 xcode-select --install 即可。这一步提供了基础的编译器和工具链。

接下来是Python环境的管理。我强烈推荐使用 Miniconda 来创建独立的Python环境,而不是直接用系统自带的Python。为什么?因为我们的项目依赖特定版本的库,隔离的环境可以避免与系统或其他项目的包发生冲突。

# 下载并安装Miniconda (适用于Apple Silicon的版本)
curl -O https://repo.anaconda.com/miniconda/Miniconda3-latest-MacOSX-arm64.sh
bash Miniconda3-latest-MacOSX-arm64.sh -b -p $HOME/miniconda
# 初始化conda,将conda加入PATH
$HOME/miniconda/bin/conda init zsh # 如果你用Zsh,bash用户则用 bash
# 关闭并重新打开终端,然后创建专属环境
conda create -n claude-agent python=3.9 -y
conda activate claude-agent

选择Python 3.9是因为它在稳定性和对新库的兼容性上取得了很好的平衡。环境创建好后,我们先安装一些核心的依赖库:

pip install anthropic  # Claude官方SDK
pip install docker    # Docker Python SDK,用于控制容器
pip install sqlalchemy # ORM,方便操作SQLite数据库
pip install schedule  # 轻量级任务调度库

3.2 Docker Desktop安装与关键配置

自主代理的核心安全依赖于Docker。从Docker官网下载适用于Apple Silicon的Docker Desktop安装包并安装。安装完成后,务必在设置中进行以下几项关键配置:

  1. 资源分配 :在 Resources 选项卡中,根据你的Mac Mini配置(如8GB或16GB内存),为Docker分配适量的CPU和内存。对于大多数自动化脚本任务,分配2-4个CPU核心和4GB内存已经绰绰有余。过度分配会影响宿主机性能。
  2. 镜像仓库 :考虑到网络稳定性,可以在 Docker Engine 配置中添加国内镜像加速器,这能大幅提升拉取基础镜像(如Python官方镜像)的速度。
  3. 权限管理 :确保在 Security 设置中,不会将敏感目录(如整个用户目录)默认共享给容器。

安装配置完成后,在终端运行 docker run hello-world 来验证Docker是否正常工作。你应该能看到一个成功的欢迎信息。

3.3 Claude API密钥与项目初始化

你需要一个Claude API密钥。前往Anthropic的开发者平台注册并获取。 安全第一:永远不要将API密钥硬编码在代码中。

最佳实践是使用环境变量。你可以创建一个名为 .env 的文件在项目根目录( 切记将该文件加入 .gitignore ),内容如下:

ANTHROPIC_API_KEY=your_api_key_here

然后在你的Python代码中使用 python-dotenv 库来加载它:

pip install python-dotenv
# config.py
import os
from dotenv import load_dotenv

load_dotenv()
ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY')
if not ANTHROPIC_API_KEY:
    raise ValueError("请在 .env 文件中设置 ANTHROPIC_API_KEY")

接下来,初始化项目目录结构。一个清晰的结构能让后续开发维护更轻松:

claude_autonomous_agent/
├── .env                    # 环境变量(本地机密,不上传)
├── .gitignore             # 忽略.env, __pycache__, logs等
├── requirements.txt       # Python依赖列表
├── main.py               # 主调度程序入口
├── core/                 # 核心模块
│   ├── __init__.py
│   ├── scheduler.py      # 任务调度器
│   ├── claude_client.py  # Claude API交互封装
│   ├── code_executor.py  # Docker代码执行器
│   └── analyzer.py       # 结果分析器
├── db/                   # 数据库相关
│   ├── models.py         # SQLAlchemy数据模型
│   └── session.py        # 数据库会话管理
├── tasks/                # 任务定义与队列
│   └── example_tasks.py  # 示例任务(如网站监控、数据处理)
├── logs/                 # 日志目录
└── workspace/            # 供Docker容器挂载的共享工作区

创建好这个结构后,在 requirements.txt 中固化你的依赖: pip freeze > requirements.txt 。这样,在任何新的环境中都可以通过 pip install -r requirements.txt 快速复现。

实操心得 :在M1/M2 Mac Mini上,所有步骤中最容易出问题的是Docker对ARM架构镜像的兼容性。务必在拉取基础镜像时指定ARM版本,例如 python:3.9-slim 镜像本身是多架构的,Docker会自动选择 arm64 版本。但如果某些镜像没有ARM版,你需要寻找替代品或自己构建。

4. 核心模块实现详解

环境就绪后,我们开始实现架构中的四大核心模块。我会从代码执行器这个最关键的安保环节开始,因为它是整个系统安全运行的基石。

4.1 代码安全沙箱与执行引擎实现

code_executor.py 是这个模块的核心。它的职责是:接收一段代码和输入参数,在一个安全的Docker容器中运行它,并返回输出。我们选择 python:3.9-slim 作为基础镜像,因为它体积小,包含了Python运行所需的最小环境。

首先,我们定义一个 DockerExecutor 类:

# core/code_executor.py
import docker
import tempfile
import os
import logging
from typing import Dict, Any, Tuple

class DockerExecutor:
    def __init__(self):
        self.client = docker.from_env()
        self.image_name = "python:3.9-slim"
        self._pull_image_if_needed()
        
    def _pull_image_if_needed(self):
        """确保基础镜像存在"""
        try:
            self.client.images.get(self.image_name)
            logging.info(f"镜像 {self.image_name} 已存在")
        except docker.errors.ImageNotFound:
            logging.info(f"拉取镜像 {self.image_name}...")
            self.client.images.pull(self.image_name)
    
    def execute_code(self, code: str, timeout_seconds: int = 30) -> Dict[str, Any]:
        """
        在Docker容器中执行代码。
        返回包含 stdout, stderr, exit_code, duration 的字典。
        """
        # 1. 创建临时目录,用于存放代码文件和供容器挂载
        with tempfile.TemporaryDirectory() as tmpdir:
            code_file_path = os.path.join(tmpdir, 'task.py')
            with open(code_file_path, 'w') as f:
                f.write(code)
            
            # 2. 准备容器挂载卷,将临时目录映射到容器内的 /workspace
            volumes = {tmpdir: {'bind': '/workspace', 'mode': 'ro'}}  # 只读挂载
            
            # 3. 构建容器运行命令
            # 限制资源,禁用网络,设置工作目录
            container_config = {
                'image': self.image_name,
                'command': f"timeout {timeout_seconds} python /workspace/task.py",
                'working_dir': '/workspace',
                'volumes': volumes,
                'network_disabled': True,  # 关键!禁用容器网络访问
                'mem_limit': '256m',       # 限制内存为256MB
                'cpu_period': 100000,
                'cpu_quota': 50000,        # 限制CPU使用为50%的一个核心
                'stderr': True,
                'stdout': True,
            }
            
            try:
                # 4. 创建并运行容器
                container = self.client.containers.run(**container_config, detach=True)
                # 等待容器执行完毕,或超时
                result = container.wait(timeout=timeout_seconds + 5)  # 额外给5秒缓冲
                exit_code = result['StatusCode']
                
                # 5. 获取日志输出
                stdout = container.logs(stdout=True, stderr=False).decode('utf-8', errors='ignore')
                stderr = container.logs(stdout=False, stderr=True).decode('utf-8', errors='ignore')
                
                # 6. 清理容器
                container.remove()
                
                return {
                    'success': exit_code == 0,
                    'exit_code': exit_code,
                    'stdout': stdout.strip(),
                    'stderr': stderr.strip(),
                    'timed_out': (exit_code == 137 or exit_code == 124)  # 137是SIGKILL, 124是timeout命令超时
                }
                
            except docker.errors.ContainerError as e:
                logging.error(f"容器执行错误: {e}")
                return {'success': False, 'error': str(e), 'stderr': str(e)}
            except Exception as e:
                logging.error(f"执行器未知错误: {e}")
                return {'success': False, 'error': str(e)}

关键设计解析

  1. 网络禁用 ( network_disabled=True ) : 这是最重要的安全措施。它彻底阻止了容器内的代码访问外部网络,从根本上杜绝了任何数据外泄或对外攻击的可能性。如果任务确实需要网络(例如一个健康检查需要 ping ),这必须在更高层的任务设计中被排除,或通过极严格的白名单机制另行处理。
  2. 资源限制 ( mem_limit , cpu_quota ) : 防止恶意或 bug 代码耗尽系统资源。256MB内存对运行大多数脚本任务已经足够。
  3. 超时控制 ( timeout 命令) : 双重超时保障。在容器命令中使用 timeout ,同时在 container.wait 中设置超时。防止代码陷入死循环。
  4. 只读挂载 ( mode: 'ro' ) : 容器对挂载的代码目录只有读权限,防止代码修改宿主机的文件。
  5. 临时目录 ( tempfile.TemporaryDirectory ) : 每次执行都使用全新的临时目录,执行完毕后自动清理,确保任务间无残留干扰。

这个执行器提供了一个安全、隔离的代码运行环境。接下来,我们需要一个“大脑”来生成要执行的代码。

4.2 Claude API交互与智能提示词工程

claude_client.py 模块负责与Claude对话。直接调用API并不难,难的是设计出能让Claude稳定生成高质量、可执行代码的提示词。我们的提示词必须包含几个关键部分: 角色设定 任务上下文 输出格式约束 安全规则

# core/claude_client.py
import anthropic
import logging
from typing import List, Dict, Any
from .config import ANTHROPIC_API_KEY

class ClaudeCoder:
    def __init__(self, model="claude-3-haiku-20240307"):
        self.client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
        self.model = model
        self.conversation_history: List[Dict[str, str]] = []  # 保存对话历史
        
    def _build_system_prompt(self) -> str:
        """构建系统提示词,定义Claude的角色和能力边界"""
        return """你是一个顶尖的自动化脚本编写专家。你的唯一任务是根据用户需求,编写出安全、健壮、可直接在隔离的Python 3.9环境中运行的代码。

你必须严格遵守以下规则:
1. 代码必须完全自包含。只能使用Python 3.9标准库。绝对禁止导入任何第三方库(如requests, numpy, pandas等)。
2. 绝对禁止任何形式的网络访问代码。包括但不限于:socket, urllib, http.client, ftplib等模块的任何调用。
3. 代码必须包含完善的错误处理(try-except)。当发生错误时,必须将清晰的错误信息打印到标准错误输出。
4. 代码的入口点是一个main()函数。我们会执行这个函数。
5. 代码必须高效,避免无限循环。如果涉及循环,必须有明确的退出条件。
6. 输出结果必须打印到标准输出,且格式尽量简洁、易于后续程序解析。

如果用户的需求无法在以上约束下实现,你必须明确说明原因,并提出一个在约束内可行的替代方案。"""
    
    def generate_code(self, task_description: str, context: str = "") -> str:
        """
        根据任务描述和上下文生成代码。
        task_description: 新任务的具体描述。
        context: 之前的错误信息或相关上下文,用于迭代改进。
        """
        user_prompt = f"""
        任务描述:
        {task_description}

        相关上下文(之前的错误或说明):
        {context if context else "无"}

        请严格按照系统提示的要求,编写完成上述任务的Python代码。
        只输出代码本身,不要有任何额外的解释、Markdown代码块标记或注释。
        """
        
        # 构建消息历史:系统提示 + 历史对话 + 本次用户输入
        messages = [{"role": "user", "content": self._build_system_prompt()}]
        messages.extend(self.conversation_history[-6:])  # 只保留最近3轮对话(假设一轮是user+assistant),防止上下文过长
        messages.append({"role": "user", "content": user_prompt})
        
        try:
            response = self.client.messages.create(
                model=self.model,
                max_tokens=2048,
                temperature=0.2,  # 低温度,确保代码稳定性,减少随机性
                messages=messages
            )
            generated_code = response.content[0].text.strip()
            
            # 清理可能出现的markdown代码块标记
            if generated_code.startswith('```python'):
                generated_code = generated_code[9:]
            if generated_code.startswith('```'):
                generated_code = generated_code[3:]
            if generated_code.endswith('```'):
                generated_code = generated_code[:-3]
            generated_code = generated_code.strip()
            
            # 更新对话历史
            self.conversation_history.append({"role": "user", "content": user_prompt})
            self.conversation_history.append({"role": "assistant", "content": generated_code})
            
            logging.info(f"Claude生成代码长度:{len(generated_code)}字符")
            return generated_code
            
        except Exception as e:
            logging.error(f"调用Claude API失败: {e}")
            raise
    
    def add_feedback_to_history(self, feedback: str):
        """将执行结果的反馈加入历史,供下次生成参考"""
        self.conversation_history.append({"role": "user", "content": f"上次代码执行反馈:{feedback}"})

提示词工程要点

  1. 系统提示词定基调 :在 _build_system_prompt 中,我们明确、强硬地规定了Claude的角色和不可逾越的红线(无网络、仅标准库)。这比在每次对话中重复强调有效得多。
  2. 低温度(Temperature=0.2) :这个参数控制输出的随机性。对于代码生成,我们需要的是确定性、可靠的输出,而不是创造性。0.2是一个比较保守的值,能确保相同的输入产生大致相同的输出。
  3. 上下文管理 conversation_history 保存了对话历史。当代码执行失败后,我们可以将错误信息作为 context 传入下一次 generate_code 调用,同时也会通过 add_feedback_to_history 将反馈固化到历史中。这样Claude就能基于之前的错误进行调试和改进,实现了简单的“学习”能力。
  4. 输出清洗 :Claude有时会返回被Markdown代码块包裹的文本。我们需要将其剥离,得到纯净的代码字符串。

有了能生成代码的“大脑”和能安全执行代码的“双手”,我们需要一个“中枢神经系统”来协调它们。

4.3 任务调度与状态管理中枢实现

scheduler.py 是这个自主代理的指挥中心。它需要管理任务队列、持久化状态、并协调各个模块工作。我们采用一个简单的循环调度机制,配合SQLite数据库来记录一切。

首先,定义数据库模型来保存任务和代码执行记录:

# db/models.py
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func

Base = declarative_base()

class Task(Base):
    __tablename__ = 'tasks'
    id = Column(Integer, primary_key=True)
    description = Column(Text, nullable=False)  # 任务描述
    status = Column(String(50), default='pending')  # pending, running, success, failed
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    scheduled_for = Column(DateTime(timezone=True))  # 计划执行时间
    priority = Column(Integer, default=5)  # 1最高,10最低
    
class CodeExecution(Base):
    __tablename__ = 'code_executions'
    id = Column(Integer, primary_key=True)
    task_id = Column(Integer, nullable=False)
    generated_code = Column(Text, nullable=False)  # 生成的代码
    execution_result = Column(Text)  # 存储序列化的结果(如JSON字符串)
    exit_code = Column(Integer)
    stdout = Column(Text)
    stderr = Column(Text)
    timed_out = Column(Boolean, default=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    iteration = Column(Integer, default=1)  # 第几次尝试

接着,实现调度器本身。它会持续检查数据库,取出待执行的任务,协调生成和执行代码,并根据结果决定下一步。

# core/scheduler.py
import time
import logging
import schedule
from typing import Optional
from sqlalchemy.orm import sessionmaker
from db.models import Base, Task, CodeExecution, engine
from .claude_client import ClaudeCoder
from .code_executor import DockerExecutor
from .analyzer import ResultAnalyzer

# 创建数据库表
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)

class AutonomousScheduler:
    def __init__(self):
        self.claude = ClaudeCoder()
        self.executor = DockerExecutor()
        self.analyzer = ResultAnalyzer()
        self.db_session = SessionLocal()
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        
    def create_task(self, description: str, priority: int = 5):
        """用户或系统调用此方法来创建新任务"""
        new_task = Task(description=description, priority=priority, status='pending')
        self.db_session.add(new_task)
        self.db_session.commit()
        logging.info(f"创建新任务: {description[:50]}... (ID: {new_task.id})")
        return new_task.id
    
    def _process_next_task(self):
        """处理下一个最高优先级的待处理任务"""
        # 1. 从数据库获取一个pending状态的任务
        task = self.db_session.query(Task).filter_by(status='pending').order_by(Task.priority, Task.created_at).first()
        if not task:
            time.sleep(5)  # 没有任务,休眠5秒
            return
        
        task.status = 'running'
        self.db_session.commit()
        logging.info(f"开始处理任务 ID:{task.id} - {task.description[:30]}...")
        
        max_iterations = 3  # 最大尝试次数
        current_iteration = 1
        context = ""  # 提供给Claude的上下文
        
        while current_iteration <= max_iterations:
            # 2. 让Claude生成代码
            try:
                code = self.claude.generate_code(task.description, context)
            except Exception as e:
                logging.error(f"任务 {task.id} 第{current_iteration}次迭代,生成代码失败: {e}")
                self._record_execution(task.id, None, error=f"生成代码失败: {e}", iteration=current_iteration)
                break
            
            # 3. 在Docker中执行代码
            execution_result = self.executor.execute_code(code)
            
            # 4. 记录执行结果到数据库
            exec_record = self._record_execution(task.id, code, execution_result, current_iteration)
            
            # 5. 分析结果
            analysis = self.analyzer.analyze(execution_result, task.description)
            
            if analysis['should_continue']:
                # 任务成功完成
                task.status = 'success'
                self.db_session.commit()
                logging.info(f"任务 {task.id} 在第{current_iteration}次尝试后成功完成。")
                break
            else:
                # 任务失败,准备下一次迭代
                logging.warning(f"任务 {task.id} 第{current_iteration}次尝试失败。原因: {analysis['feedback']}")
                # 将错误反馈加入Claude的对话历史,作为下次生成的上下文
                self.claude.add_feedback_to_history(analysis['feedback'])
                context = analysis['feedback']  # 也作为下一次generate_code的直接上下文
                current_iteration += 1
        else:
            # while循环正常结束(即达到最大迭代次数仍未成功)
            task.status = 'failed'
            self.db_session.commit()
            logging.error(f"任务 {task.id} 在{max_iterations}次尝试后仍失败。")
        
    def _record_execution(self, task_id, code, execution_result, iteration):
        """将单次代码执行记录到数据库"""
        record = CodeExecution(
            task_id=task_id,
            generated_code=code if code else "",
            execution_result=str(execution_result),
            exit_code=execution_result.get('exit_code') if execution_result else None,
            stdout=execution_result.get('stdout')[:500] if execution_result else None,  # 只存前500字符
            stderr=execution_result.get('stderr')[:500] if execution_result else None,
            timed_out=execution_result.get('timed_out', False) if execution_result else False,
            iteration=iteration
        )
        self.db_session.add(record)
        self.db_session.commit()
        return record
    
    def run(self):
        """启动调度器主循环"""
        logging.info("自主代理调度器启动...")
        # 使用schedule库每隔10秒尝试处理任务(也可以改用while True循环)
        schedule.every(10).seconds.do(self._process_next_task)
        
        while True:
            schedule.run_pending()
            time.sleep(1)

这个调度器实现了一个简单的 重试与学习循环 。当一个任务失败时,分析器会生成反馈,这个反馈被加入到Claude的对话历史中。下一次为同一任务生成代码时,Claude就能看到“上次你写的代码因为XXX原因失败了”,从而生成修复后的版本。最多尝试3次,避免陷入无限循环。

4.4 结果分析与决策逻辑剖析

分析器 analyzer.py 的智能程度,直接决定了系统的“自主”能力。它需要从代码执行结果中判断:成功还是失败?如果失败,原因是什么?下一步该做什么?

# core/analyzer.py
import re
import logging
from typing import Dict, Any

class ResultAnalyzer:
    def analyze(self, execution_result: Dict[str, Any], original_task: str) -> Dict[str, Any]:
        """
        分析执行结果,决定下一步行动。
        返回: {
            'should_continue': bool,  # True表示任务成功,False表示需要继续/失败
            'feedback': str,           # 给Claude的反馈信息
            'reason': str              # 内部记录的分析原因
        }
        """
        if not execution_result.get('success', False):
            # 执行器层面失败(如容器启动失败)
            return {
                'should_continue': False,
                'feedback': f"代码执行环境失败:{execution_result.get('error', '未知错误')}。请检查代码是否包含了不允许的操作(如网络访问)。",
                'reason': 'executor_failure'
            }
        
        exit_code = execution_result.get('exit_code', -1)
        stdout = execution_result.get('stdout', '')
        stderr = execution_result.get('stderr', '')
        timed_out = execution_result.get('timed_out', False)
        
        # 情况1: 超时
        if timed_out:
            return {
                'should_continue': False,
                'feedback': f"代码执行超时被终止。这可能是因为代码包含了无限循环或过于耗时的操作。请优化代码逻辑,确保所有循环都有明确的退出条件,并且操作能在30秒内完成。",
                'reason': 'timeout'
            }
        
        # 情况2: 进程异常退出 (exit_code != 0)
        if exit_code != 0:
            # 尝试从stderr中提取有用的错误信息
            error_summary = self._extract_error_from_stderr(stderr)
            feedback = f"代码执行失败,退出码 {exit_code}。错误信息:{error_summary}。请修复代码中的错误。"
            return {
                'should_continue': False,
                'feedback': feedback,
                'reason': f'non_zero_exit_{exit_code}'
            }
        
        # 情况3: 进程正常退出 (exit_code == 0),需要判断输出是否符合任务预期
        # 这是一个简化的判断逻辑,实际中可能需要更复杂的语义分析
        task_lower = original_task.lower()
        stdout_lower = stdout.lower()
        
        # 示例:如果任务是“计算”,我们期望输出中包含数字
        if '计算' in task_lower or 'calculate' in task_lower:
            if any(char.isdigit() for char in stdout):
                return {'should_continue': True, 'feedback': '任务成功完成。', 'reason': 'success_with_expected_output'}
            else:
                return {
                    'should_continue': False,
                    'feedback': f"代码已执行完毕且未报错,但输出中未找到预期的数字结果。当前输出为:'{stdout[:100]}'。请确保代码正确计算并打印了结果。",
                    'reason': 'success_but_unexpected_output'
                }
        
        # 默认情况:只要没报错,就认为成功(对于监控、检查类任务)
        # 更复杂的系统可以在这里集成一个轻量级的LLM来判断输出是否满足任务描述
        return {'should_continue': True, 'feedback': '任务成功完成。', 'reason': 'success_default'}
    
    def _extract_error_from_stderr(self, stderr: str) -> str:
        """从错误信息中提取最相关的部分"""
        if not stderr:
            return "无错误信息输出。"
        
        # 匹配常见的Python错误行,如 "File \"<string>\", line 5, in <module>"
        error_patterns = [
            r'File.*line \d+.*',
            r'SyntaxError:.*',
            r'NameError:.*',
            r'TypeError:.*',
            r'ValueError:.*',
            r'IndexError:.*',
            r'KeyError:.*',
            r'AttributeError:.*',
        ]
        
        lines = stderr.split('\n')
        relevant_lines = []
        for line in lines:
            for pattern in error_patterns:
                if re.search(pattern, line):
                    relevant_lines.append(line.strip())
                    break
            if len(relevant_lines) >= 3:  # 最多取3行关键错误
                break
        
        if relevant_lines:
            return ' '.join(relevant_lines[:3])
        else:
            # 没有匹配到标准错误格式,返回前100个字符
            return stderr[:100] + ('...' if len(stderr) > 100 else '')

这个分析器目前还比较简单,主要依赖退出码和简单的关键字匹配。但它已经能处理几种典型情况:超时、语法错误、运行时异常。对于更复杂的逻辑错误(比如代码运行了但结果不对),我们有一个兜底策略:如果任务描述中包含“计算”等关键词,我们会检查输出中是否有数字。在实际应用中,你可以根据具体任务类型定制更复杂的分析逻辑,甚至集成一个小型的文本分类模型或规则引擎来判断任务是否真正成功。

5. 系统集成、运行与监控

各个核心模块准备好后,我们需要把它们组装起来,形成一个可以持续运行的系统,并为其添加必要的监控和日志功能,方便我们了解这个“数字员工”的工作状态。

5.1 主程序入口与服务化

创建一个 main.py 作为整个应用的启动入口。为了让它在Mac Mini上稳定地作为后台服务运行,我们除了使用Supervisor,还可以利用macOS自带的 launchd

# main.py
#!/usr/bin/env python3
"""
Claude自主代理主程序入口。
"""
import signal
import sys
import logging
from core.scheduler import AutonomousScheduler

def setup_logging():
    """配置日志,同时输出到控制台和文件"""
    log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    logging.basicConfig(
        level=logging.INFO,
        format=log_format,
        handlers=[
            logging.FileHandler('logs/agent.log', encoding='utf-8'),
            logging.StreamHandler(sys.stdout)
        ]
    )

def signal_handler(sig, frame):
    """处理优雅关机信号"""
    logging.info("接收到关机信号,正在清理资源...")
    # 这里可以添加数据库会话关闭等清理操作
    sys.exit(0)

def main():
    setup_logging()
    logging.info("=== Claude自主代理启动 ===")
    
    # 注册信号处理器,用于优雅关机
    signal.signal(signal.SIGINT, signal_handler)  # Ctrl+C
    signal.signal(signal.SIGTERM, signal_handler) # kill命令
    
    scheduler = AutonomousScheduler()
    
    # 可以在这里预置一些初始任务
    # scheduler.create_task("编写一个Python脚本,读取当前目录下的list.txt文件,计算其中所有数字的总和并打印。")
    # scheduler.create_task("编写一个脚本,检查/tmp目录是否存在,如果存在则列出其中的前5个文件。")
    
    logging.info("调度器初始化完成,开始运行主循环...")
    try:
        scheduler.run()  # 这个方法内部是无限循环
    except KeyboardInterrupt:
        logging.info("用户中断操作。")
    except Exception as e:
        logging.critical(f"调度器发生未捕获异常: {e}", exc_info=True)
    finally:
        logging.info("=== Claude自主代理停止 ===")

if __name__ == "__main__":
    main()

为了让这个脚本在后台运行,并能在开机时自动启动,我们创建一个 launchd 的plist配置文件:

<!-- ~/Library/LaunchAgents/com.user.claudeagent.plist -->
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
    <key>Label</key>
    <string>com.user.claudeagent</string>
    <key>ProgramArguments</key>
    <array>
        <string>/Users/你的用户名/miniconda3/envs/claude-agent/bin/python</string>
        <string>/Users/你的用户名/code/claude_autonomous_agent/main.py</string>
    </array>
    <key>WorkingDirectory</key>
    <string>/Users/你的用户名/code/claude_autonomous_agent</string>
    <key>StandardOutPath</key>
    <string>/Users/你的用户名/code/claude_autonomous_agent/logs/launchd.stdout.log</string>
    <key>StandardErrorPath</key>
    <string>/Users/你的用户名/code/claude_autonomous_agent/logs/launchd.stderr.log</string>
    <key>EnvironmentVariables</key>
    <dict>
        <key>PATH</key>
        <string>/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin</string>
    </dict>
    <key>RunAtLoad</key>
    <true/>
    <key>KeepAlive</key>
    <true/>
</dict>
</plist>

然后加载这个服务:

launchctl load ~/Library/LaunchAgents/com.user.claudeagent.plist

这样,你的Claude自主代理就会在Mac Mini启动时自动运行,并在崩溃后自动重启。

5.2 基础监控与日志分析

一个自主运行的系统,我们必须能随时了解它的健康状况和工作内容。除了前面已经配置的文件日志,我们还可以增加一个简单的状态查询接口。

创建一个新的模块 monitor.py ,它提供一个简单的HTTP API来查看当前任务状态:

# core/monitor.py
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
from db.models import engine, Task, CodeExecution
from sqlalchemy.orm import sessionmaker
import threading

SessionLocal = sessionmaker(bind=engine)

class StatusHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/status':
            session = SessionLocal()
            try:
                # 获取任务统计
                total_tasks = session.query(Task).count()
                pending_tasks = session.query(Task).filter_by(status='pending').count()
                running_tasks = session.query(Task).filter_by(status='running').count()
                success_tasks = session.query(Task).filter_by(status='success').count()
                failed_tasks = session.query(Task).filter_by(status='failed').count()
                
                # 获取最近5次执行记录
                recent_execs = session.query(CodeExecution).order_by(CodeExecution.created_at.desc()).limit(5).all()
                recent_data = []
                for exec in recent_execs:
                    recent_data.append({
                        'task_id': exec.task_id,
                        'iteration': exec.iteration,
                        'exit_code': exec.exit_code,
                        'timed_out': exec.timed_out,
                        'created_at': exec.created_at.isoformat() if exec.created_at else None
                    })
                
                status_info = {
                    'system': 'running',
                    'task_stats': {
                        'total': total_tasks,
                        'pending': pending_tasks,
                        'running': running_tasks,
                        'success': success_tasks,
                        'failed': failed_tasks
                    },
                    'recent_executions': recent_data
                }
                
                self.send_response(200)
                self.send_header('Content-type', 'application/json')
                self.end_headers()
                self.wfile.write(json.dumps(status_info, indent=2, ensure_ascii=False).encode('utf-8'))
                
            except Exception as e:
                self.send_response(500)
                self.end_headers()
                self.wfile.write(f"Error: {str(e)}".encode('utf-8'))
            finally:
                session.close()
        else:
            self.send_response(404)
            self.end_headers()

def start_monitor_server(port=8080):
    """在一个独立的线程中启动监控服务器"""
    server = HTTPServer(('localhost', port), StatusHandler)
    print(f"监控服务器启动在 http://localhost:{port}/status")
    server.serve_forever()

# 在主程序中启动监控服务器(在单独线程中)
# monitor_thread = threading.Thread(target=start_monitor_server, daemon=True)
# monitor_thread.start()

现在,你可以在浏览器中访问 http://localhost:8080/status ,或者用 curl 命令,来获取系统的实时状态。这比翻看日志文件要直观得多。

5.3 实战任务示例与效果演示

理论说再多,不如看实际效果。让我们给系统布置几个真实的任务,看看它如何工作。

任务一:文件系统检查任务 我们通过Python交互界面或一个简单的脚本向调度器添加任务:

from core.scheduler import AutonomousScheduler
scheduler = AutonomousScheduler()
task_id = scheduler.create_task("编写一个Python脚本,检查/tmp目录下是否存在名为'test_marker.txt'的文件。如果存在,打印'文件存在';如果不存在,打印'文件不存在'。")

系统处理流程:

  1. 调度器获取到这个新任务。
  2. Claude生成代码。由于提示词限制,它生成的代码会只使用 os.path 标准库。
  3. 执行器在Docker容器中运行代码。
  4. 分析器检查结果:如果代码退出码为0,并且输出中包含“文件存在”或“文件不存在”,则认为任务成功。
  5. 在数据库的 code_executions 表中,你会看到生成的代码和执行记录。如果 /tmp/test_marker.txt 不存在,输出会是“文件不存在”。

任务二:带迭代修复的数据处理任务 布置一个更有挑战性的任务:

task_id = scheduler.create_task("编写一个Python脚本,读取当前目录下的data.txt文件,该文件每行一个数字,计算所有数字的平均值并打印。")

假设当前目录下没有 data.txt 文件。

  1. 第一次执行,Claude生成的代码直接 open('data.txt') ,导致 FileNotFoundError
  2. 执行失败,分析器从 stderr 中提取到 FileNotFoundError 信息。
  3. 这个错误反馈被加入Claude的上下文。
  4. 第二次生成代码时,Claude看到了“上次代码因FileNotFoundError失败”,于是它生成的代码会加入 try-except 处理,或者先检查文件是否存在。
  5. 第二次执行,可能因为文件不存在而打印一个友好的错误信息。分析器发现输出中没有“平均值”或数字,判断为“成功但输出不符合预期”。
  6. 第三次生成代码,Claude可能会生成一个更健壮的版本:先检查文件是否存在,如果不存在则创建一个示例文件或给出明确提示。
  7. 最终,任务可能以“成功”或“失败”结束,但整个过程完全自动化,展示了从错误中学习的能力。

通过查看数据库和日志,你可以清晰地看到这三次迭代中代码的演变,这正是“自主”的体现。

6. 高级优化与安全加固

基础系统跑起来后,我们可以从性能、可靠性和安全性方面进行深度优化,让它更健壮、更智能。

6.1 性能优化策略

  1. Docker镜像预热与缓存 :每次执行都拉取镜像是不可能的。我们在 DockerExecutor __init__ 中已经做了检查。可以更进一步,预先拉取并构建一个包含更常用工具的基础镜像。例如,创建一个 Dockerfile
    FROM python:3.9-slim
    RUN apt-get update && apt-get install -y --no-install-recommends \
        curl wget vim-tiny \
        && rm -rf /var/lib/apt/lists/*
    
    构建并标记为 my-python-base:3.9 。然后在代码中指定使用这个镜像。这能减少容器启动时的微小开销。
  2. 数据库连接池与会话管理 :在主调度循环中,我们为每个任务创建了数据库会话。当任务量大时,这会产生开销。可以使用 scoped_session 来管理会话生命周期,或者引入连接池。更简单的方法是,将 _process_next_task 方法中的数据库操作批量处理。
  3. Claude API调用优化
    • 异步调用 :如果任务队列较长,可以使用 asyncio anthropic 的异步客户端来并行处理多个代码生成请求,而不是同步等待。
    • 提示词缓存 :对于相似的任务,可以缓存之前成功的提示词和代码模板,减少对API的调用次数和token消耗。
    • 模型选择 :对于简单的脚本生成,可以使用更便宜、更快的模型如 claude-3-haiku ;对于复杂的逻辑,再切换到 claude-3-sonnet claude-3-opus 。可以在 ClaudeCoder 类中根据任务复杂度动态选择模型。

6.2 可靠性增强措施

  1. 任务优先级与饥饿预防 :当前的调度是简单的FIFO(先进先出)加优先级。可以引入更复杂的调度算法,比如考虑任务等待时间,防止低优先级任务永远得不到执行(饥饿)。例如,每次调度时,可以计算 优先级权重 = 基础优先级 - log(等待时间秒数) ,优先执行权重高的任务。
  2. 执行结果持久化与回放 CodeExecution 表记录了每次执行的代码和结果。我们可以利用这个实现“回放”功能。当一个新的任务与历史任务高度相似时,可以直接复用历史上成功的代码,而无需调用Claude API。这需要实现一个简单的任务相似度匹配算法(如基于任务描述文本的嵌入向量余弦相似度)。
  3. 心跳与健康检查 :除了监控服务器,主调度器本身也应该定期向一个健康检查文件或数据库表写入“心跳”。可以再写一个简单的看门狗脚本,定期检查这个心跳。如果超过一定时间没有更新,则认为主调度器可能已僵死,看门狗脚本可以尝试重启它或发送警报。

6.3 安全边界再加固

安全永远是第一位的。除了已经实施的网络禁用和资源限制,还需要考虑更多层面:

  1. 文件系统访问白名单 :当前是只读挂载临时目录。如果任务需要读取特定输入文件,可以建立一个白名单机制。在调度器创建任务时,指定该任务允许访问的宿主目录列表。 DockerExecutor 根据这个列表动态构建 volumes 字典,并且始终以 ro (只读)模式挂载,除非任务明确需要写入(这种情况要极其谨慎)。
  2. 代码静态分析 :在将代码送入Docker执行前,可以进行一次快速的静态安全检查。例如,使用Python的 ast (抽象语法树)模块解析代码,检查是否导入了黑名单中的模块(如 os.system , subprocess , socket 等)。虽然Docker提供了隔离,但多层防御总是更保险。
    import ast
    class CodeSecurityChecker:
        BLACKLISTED_MODULES = {'os', 'subprocess', 'socket', 'shutil', 'requests', 'urllib', ...}
        def check(self, code: str) -> bool:
            try:
                tree = ast.parse(code)
                for node in ast.walk(tree):
                    if isinstance(node, ast.Import):
                        for alias in node.names:
                            if alias.name in self.BLACKLISTED_MODULES:
                                return False
                    elif isinstance(node, ast.ImportFrom):
                        if node.module in self.BLACKLISTED_MODULES:
                            return False
                return True
            except SyntaxError:
                # 语法错误交给执行器去发现
                return True
    
  3. 资源使用监控与熔断 :Docker虽然限制了资源上限,但我们需要知道代码实际消耗了多少。可以在 DockerExecutor.execute_code 方法中,在 container.wait() 之后,使用 container.stats(stream=False) 获取容器的CPU、内存使用统计。如果某个任务连续多次消耗资源异常(接近限制值),可以将其标记为“可疑”,并降低其优先级或暂停执行同类任务。
  4. API密钥轮换与审计 :Claude API密钥是重要资产。不要在代码中硬编码,我们已经用了环境变量。更进一步,可以考虑使用密钥管理服务,或者定期轮换密钥。同时,在数据库中记录每一次API调用的元数据(任务ID、时间、消耗token数),便于审计和成本分析。

7. 常见问题与故障排查实录

在实际部署和运行过程中,你几乎一定会遇到下面这些问题。这里我整理了完整的排查清单和解决方案,希望能帮你节省大量时间。

7.1 环境与依赖问题

问题1:Docker命令执行报错 Cannot connect to the Docker daemon

  • 现象 :启动代理时, DockerExecutor 初始化失败,提示无法连接到Docker守护进程。
  • 原因 :Docker Desktop没有运行,或者当前用户不在 docker 用户组。
  • 解决
    1. 打开Docker Desktop应用,确保它在运行(菜单栏有Docker图标)。
    2. 在终端执行 docker ps ,确认命令可以正常执行。
    3. 如果还是不行,尝试将当前用户加入docker组: sudo dscl . append /Groups/docker GroupMembership $(whoami) ,然后 完全注销并重新登录

问题2:拉取Python镜像速度极慢或失败

  • 现象 :程序卡在 _pull_image_if_needed 阶段,或者拉取失败。
  • 原因 :网络连接Docker Hub不稳定。
  • 解决
    1. 在Docker Desktop设置中配置镜像加速器。国内用户可以使用阿里云、腾讯云等提供的加速地址。
    2. 手动提前拉取镜像:在终端执行 docker pull python:3.9-slim ,确保镜像已本地存在。
    3. 考虑使用更小的镜像,如 python:3.9-alpine ,但需注意Alpine镜像的某些标准库行为可能与slim不同。

问题3:ImportError: No module named 'anthropic' 或其他Python包缺失

  • 现象 :运行 main.py 时提示找不到模块。
  • 原因 :没有在正确的conda环境下安装依赖,或者依赖版本冲突。
  • 解决
    1. 确认终端已激活环境: conda activate claude-agent
    2. 在项目根目录下,执行 pip install -r requirements.txt 重新安装所有依赖。
    3. 如果问题依旧,尝试删除虚拟环境重建: conda deactivate && conda env remove -n claude-agent ,然后重新执行环境准备步骤。

7.2 运行时与逻辑问题

问题4:Claude生成的代码格式错误,无法执行

  • 现象 :执行器报错,查看生成的代码发现被Markdown代码块包裹,或者包含了非代码文本。
  • 原因 :Claude有时会返回包含解释的文本,即使提示词要求“只输出代码”。
  • 解决
    1. 加强 claude_client.py generate_code 方法后的清洗逻辑。除了移除````python`,还可以用正则表达式匹配更复杂的模式。
    2. 在提示词中更严厉地强调:“你的响应必须且只能是纯粹的、可执行的Python代码,不能包含任何其他文字、解释、注释(除非是代码内的注释)或Markdown标记。”
    3. 增加一层验证:在将代码送给执行器前,用 ast.parse() 尝试解析它。如果解析失败,则直接认为生成无效,立即请求重试,而不是浪费一次容器执行。

问题5:任务陷入无限重试循环

  • 现象 :一个任务反复失败-重试,永远无法成功,达到最大迭代次数后标记为失败。
  • 原因
    • 任务本身不可能完成 :例如,要求“读取不存在的文件并计算”,而Claude始终无法生成先创建文件的代码。
    • 分析器逻辑有缺陷 :错误地将失败判断为成功,导致调度器不停止。
    • Claude上下文混乱 :对话历史过长或包含矛盾信息,导致生成质量下降。
  • 解决
    1. 检查日志 :查看每次迭代生成的代码和执行结果,定位问题根源。
    2. 简化任务 :将复杂任务拆分成更小的、原子性的子任务。例如,将“读取文件并计算”拆成“检查文件是否存在”和“如果存在则读取计算”两个任务。
    3. 重置对话历史 :在 ClaudeCoder 类中添加一个 clear_history() 方法,当检测到对同一任务连续失败多次时,清空该任务的对话历史,让Claude“重新开始思考”。
    4. 改进分析器 :对于模糊的成功判断,引入更严格的规则。例如,对于计算任务,要求输出必须匹配特定的正则表达式(如 ^平均值为: \d+\.?\d*$ )才算成功。

问题6:Docker容器执行超时,但任务实际需要更长时间

  • 现象 :一些合法的长时任务(如处理大量数据)被 timeout 命令杀死。
  • 原因 :默认的30秒超时设置对于所有任务一刀切。
  • 解决
    1. 任务级超时设置 :在创建任务时,允许指定一个 timeout 参数。 scheduler.create_task(description, priority=5, timeout_seconds=120)
    2. 动态调整 DockerExecutor.execute_code 方法接收这个参数,并传递给容器命令和 container.wait
    3. 资源考量 :注意,长时间运行的任务会占用容器资源。确保你的Mac Mini有足够的内存和CPU,并监控Docker Desktop的资源使用情况。

问题7:数据库文件锁或连接泄漏

  • 现象 :运行一段时间后,程序报数据库锁错误 sqlite3.OperationalError: database is locked ,或连接数过多。
  • 原因 :SQLite不适合高并发写。调度器循环中频繁创建、提交会话,可能在某些时刻产生冲突。
  • 解决
    1. 使用连接池与会话隔离 :确保每个线程/任务使用独立的数据库会话,并在操作后及时关闭。可以使用 contextmanager 来管理会话生命周期。
    2. 减少提交频率 :不是每次 _record_execution 都提交,可以批量提交。
    3. 考虑更强大的数据库 :如果任务量非常大,可以考虑切换到PostgreSQL或MySQL,但这会引入额外的服务依赖。对于个人项目,优化SQLite的使用方式通常足够。

7.3 监控与维护问题

问题8:日志文件过大,磁盘空间不足

  • 现象 logs/agent.log 文件增长迅速,尤其是调试时日志级别为INFO或DEBUG。
  • 解决
    1. 使用日志轮转 :Python的 logging.handlers.RotatingFileHandler 可以设置文件大小上限和备份数量。
      from logging.handlers import RotatingFileHandler
      handler = RotatingFileHandler('logs/agent.log', maxBytes=10*1024*1024, backupCount=5) # 10MB一个文件,保留5个备份
      
    2. 调整日志级别 :生产运行时,可以将日志级别调整为 WARNING ,只记录错误和重要事件。
    3. 定期清理 :写一个简单的清理脚本,通过 launchd cron 定期执行,删除过旧的日志文件。

问题9:如何知道代理正在干什么?当前任务卡住了吗?

  • 现象 :代理启动后,没有明显的输出,不知道它是否在正常工作。
  • 解决
    1. 使用监控接口 :启动前面实现的监控服务器( monitor.py ),通过浏览器访问 http://localhost:8080/status 查看实时状态。
    2. 查看数据库 :直接用SQLite浏览器打开项目目录下的数据库文件(默认由SQLAlchemy创建,如 sqlite:///./claude_agent.db ),查看 tasks code_executions 表。
    3. 增强日志 :在调度器的 _process_next_task 方法开始和结束处增加更详细的日志,打印任务ID和状态。

问题10:Claude API调用费用超预期

  • 现象 :月底收到账单,发现API调用费用比预期高。
  • 原因 :任务重试、提示词过长或任务本身复杂导致token消耗大。
  • 解决
    1. 记录与审计 :修改 ClaudeCoder 类,在每次调用API后,记录请求和响应的token数量到数据库。这能帮你分析消耗大户。
    2. 优化提示词 :精简系统提示词和用户提示词,移除不必要的描述。使用更高效的模型(Haiku)进行简单任务。
    3. 实现缓存 :如前所述,对成功的任务代码进行缓存。当相似的新任务出现时,直接使用缓存代码,避免API调用。
    4. 设置预算警报 :在Anthropic控制台设置每月预算和警报。

这个自主代理系统就像你亲手搭建并训练的一个数字生命体。从最初简单的脚本执行,到如今能根据环境反馈自我调试、持续运行,整个过程充满了工程上的挑战和乐趣。最关键的是,你建立了一套可扩展的框架。未来,你可以轻松地为它添加新的“技能”,比如集成文件操作、调用其他本地工具,甚至通过安全的RPC方式与外部服务进行有限交互。记住,安全是这一切的基石,任何时候都不能为了功能而放松沙箱的约束。现在,你的Mac Mini不再只是一台安静的电脑,而是一个不知疲倦、不断学习的自动化伙伴了。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐