我用 AI Agent 做了一套权限控制系统:从工具授权到操作审计,防止 AI 乱干活
我用 AI Agent 做了一套权限控制系统:从工具授权到操作审计,防止 AI 乱干活
读者对象:用 AI Agent 做自动化的开发者,担心 AI 误操作/乱调用工具的人
解决的问题:AI Agent 有工具调用能力,但没有权限控制,可能乱删文件、乱发消息、乱花钱。本文给出一套完整的权限控制方案,附 Python 实现。
一、问题:AI Agent 能干太多事了,这很危险
我用 AI Agent 搭自动化工位,一开始很爽。
写的指令是"帮我整理一下本周的文件",Agent 确实整理了——但它把我的个人照片也移动了,还顺手删了几个它认为"不重要"的压缩包。
问题在于:Agent 有工具调用能力,但没有权限边界。
它用的是我的凭证,操作系统层面它什么都干得了。一旦 Prompt 被注入(比如处理一封恶意邮件),后果不堪设想。
这不是吓唬人。2026 年已经有多起"AI Agent 误删生产数据"的事故报告。
二、方案:四层权限控制架构
核心思路:不让 Agent 直接操作系统,让它在沙箱里干活。
用户指令
↓
第一层:指令白名单(哪些指令允许执行)
↓
第二层:工具授权表(每个工具允许操作哪些资源)
↓
第三层:操作审计日志(每次调用都记录,可回滚)
↓
第四层:沙箱隔离(敏感操作在容器里执行)
↓
执行结果
三、实操:逐层实现
第一层:指令白名单
不是所有指令都要执行。先过滤掉危险指令。
# allowed_actions.py
from typing import List, Dict
import re
class ActionAllowlist:
"""指令白名单:只允许执行明确的操作类型"""
# 允许的操作模式
ALLOWED_PATTERNS = [
r"整理", r"分析", r"生成", r"搜索", r"读取",
r"统计", r"对比", r"检查", r"备份"
]
# 禁止的操作模式(高危)
DENIED_PATTERNS = [
r"删除", r"格式化", r"卸载", r"修改系统",
r"发送邮件\s*给\s*外部", r"执行\s*shell"
]
def validate_instruction(self, instruction: str) -> Dict:
"""验证指令是否允许执行"""
for pattern in self.DENIED_PATTERNS:
if re.search(pattern, instruction):
return {
"allowed": False,
"reason": f"指令包含禁止操作:{pattern}",
"risk_level": "HIGH"
}
has_allowed = any(
re.search(p, instruction) for p in self.ALLOWED_PATTERNS
)
if not has_allowed:
return {
"allowed": False,
"reason": "指令不在允许的操作范围内",
"risk_level": "UNKNOWN"
}
return {"allowed": True, "reason": "指令通过白名单验证", "risk_level": "LOW"}
# 用法
allowlist = ActionAllowlist()
tests = [
"帮我整理本周下载的文件",
"删除所有大于 1GB 的文件",
"分析上个月的销售数据",
"执行 rm -rf 清理磁盘空间"
]
for t in tests:
result = allowlist.validate_instruction(t)
status = "✅ 允许" if result["allowed"] else "❌ 拒绝"
print(f"{status} | {t} | {result['reason']}")
输出:
✅ 允许 | 帮我整理本周下载的文件 | 指令通过白名单验证
❌ 拒绝 | 删除所有大于 1GB 的文件 | 指令包含禁止操作:删除
✅ 允许 | 分析上个月的销售数据 | 指令通过白名单验证
❌ 拒绝 | 执行 rm -rf 清理磁盘空间 | 指令包含禁止操作:执行\s*shell
第二层:工具授权表
即使指令没问题,也要限制每个工具能操作哪些资源。
# tool_authorizer.py
from typing import Dict, List, Set
from enum import Enum
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
EXECUTE = "execute"
class ToolAuthorizer:
"""工具授权表:每个工具只能访问授权的资源"""
def __init__(self):
# 工具 → 允许的资源路径 → 允许的权限
self.auth_table: Dict[str, Dict[str, Set[Permission]]] = {
"file_tool": {
"~/Downloads": {Permission.READ, Permission.WRITE},
"~/Documents/Work": {Permission.READ},
# 禁止访问系统目录
"/etc": set(),
"/Windows": set(),
},
"email_tool": {
# 只能发邮件给白名单地址
"allowed_recipients": {"boss@company.com", "team@company.com"}
},
"shell_tool": {
# 只允许执行白名单命令
"allowed_commands": {"ls", "cat", "grep", "find", "python"}
},
"api_tool": {
# API 调用限额
"daily_limit": 100,
"rate_limit_per_min": 10
}
}
def check_permission(
self,
tool_name: str,
resource: str,
permission: Permission
) -> bool:
"""检查工具是否有权限操作资源"""
if tool_name not in self.auth_table:
print(f"❌ 工具 {tool_name} 未在授权表中")
return False
tool_auth = self.auth_table[tool_name]
# 检查路径授权
if "allowed_paths" in tool_auth:
path_allowed = any(
resource.startswith(p) for p in tool_auth["allowed_paths"]
)
if not path_allowed:
print(f"❌ 工具 {tool_name} 无权访问 {resource}")
return False
# 检查权限类型
if resource in tool_auth:
if permission not in tool_auth[resource]:
print(f"❌ 工具 {tool_name} 对 {resource} 没有 {permission.value} 权限")
return False
return True
def enforce_rate_limit(self, tool_name: str) -> bool:
"""检查 API 调用频率限制"""
if tool_name not in self.auth_table:
return True
limits = self.auth_table[tool_name]
# 实际项目中用 redis 记录调用次数
# 这里用伪代码示意
current_minute_count = self._get_current_count(tool_name)
if current_minute_count >= limits.get("rate_limit_per_min", 60):
print(f"⚠️ 工具 {tool_name} 触发频率限制")
return False
return True
def _get_current_count(self, tool_name: str) -> int:
"""获取当前分钟的调用次数(伪代码)"""
return 5 # 实际从 redis 读取
# 用法
auth = ToolAuthorizer()
# 测试文件工具权限
print(auth.check_permission("file_tool", "~/Downloads/report.pdf", Permission.READ))
# True
print(auth.check_permission("file_tool", "/etc/passwd", Permission.READ))
# False,无权访问系统目录
第三层:操作审计日志
每次工具调用都记录,出问题能追溯,能回滚。
# audit_logger.py
import json
import hashlib
import time
from datetime import datetime
from typing import Dict, Any
from pathlib import Path
class AuditLogger:
"""操作审计日志:每次工具调用都记录"""
def __init__(self, log_dir: str = "./audit_logs"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(exist_ok=True)
self.current_log = self.log_dir / f"audit_{datetime.now().strftime('%Y%m%d')}.jsonl"
def log_action(
self,
agent_id: str,
tool_name: str,
action: str,
params: Dict[str, Any],
result: str,
user_instruction: str = ""
) -> str:
"""记录一次工具调用,返回日志 ID"""
log_id = hashlib.md5(
f"{agent_id}{tool_name}{action}{time.time()}".encode()
).hexdigest()[:12]
entry = {
"log_id": log_id,
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"user_instruction": user_instruction[:200], # 截断,避免日志过大
"tool_name": tool_name,
"action": action,
"params": params,
"result": result[:500], # 截断
"can_rollback": self._is_rollbackable(tool_name, action)
}
with open(self.current_log, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"📝 审计日志已记录:{log_id} | {tool_name}.{action}")
return log_id
def _is_rollbackable(self, tool_name: str, action: str) -> bool:
"""判断操作是否可回滚"""
rollbackable = {
"file_tool": ["move", "rename"],
"email_tool": [],
"api_tool": ["post", "put", "delete"]
}
return action in rollbackable.get(tool_name, [])
def search_logs(
self,
agent_id: str = "",
tool_name: str = "",
start_time: str = "",
end_time: str = ""
) -> List[Dict]:
"""搜索审计日志"""
results = []
for log_file in self.log_dir.glob("audit_*.jsonl"):
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
entry = json.loads(line)
if agent_id and entry["agent_id"] != agent_id:
continue
if tool_name and entry["tool_name"] != tool_name:
continue
results.append(entry)
return results
def rollback(self, log_id: str) -> bool:
"""根据日志 ID 回滚操作(需要具体实现)"""
# 实际项目中,这里需要根据 log_id 找到操作并记录的反操作
print(f"🔄 尝试回滚操作 {log_id}...")
print(" 注意:回滚需要具体实现,这里只做示意")
return False
# 用法
logger = AuditLogger()
log_id = logger.log_action(
agent_id="agent_001",
tool_name="file_tool",
action="move",
params={"src": "~/Downloads/report.pdf", "dst": "~/Documents/"},
result="success",
user_instruction="帮我整理本周下载的文件"
)
# 📝 审计日志已记录:a3f8b2c4d1e | file_tool.move
第四层:沙箱隔离
敏感操作在 Docker 容器里执行,不影响宿主机。
# sandbox_executor.py
import subprocess
from typing import Dict, Optional
class SandboxExecutor:
"""沙箱执行器:在 Docker 容器里执行敏感操作"""
def __init__(self, image: str = "python:3.12-slim"):
self.image = image
self.container_name = "ai_agent_sandbox"
def execute_in_sandbox(
self,
code: str,
timeout: int = 30,
allowed_paths: Optional[Dict[str, str]] = None
) -> Dict:
"""在沙箱里执行代码"""
# 将代码写入临时文件
code_file = "/tmp/sandbox_code.py"
with open(code_file, "w") as f:
f.write(code)
# 构建 docker run 命令
# -v: 挂载卷(限制可访问的路径)
# --rm: 执行完自动删除容器
# --network none: 禁止网络访问(可选,看需求)
volume_args = ""
if allowed_paths:
for host_path, container_path in allowed_paths.items():
volume_args += f"-v {host_path}:{container_path}:ro "
cmd = f"""
docker run --rm \
{volume_args} \
--memory=512m \
--cpus=1.0 \
--name {self.container_name} \
{self.image} \
python {code_file}
""".strip()
try:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=timeout
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
# 超时,强制停止容器
subprocess.run(f"docker stop {self.container_name}", shell=True)
return {"success": False, "error": "执行超时,已强制停止"}
def execute_tool_in_sandbox(self, tool_name: str, params: Dict) -> Dict:
"""在沙箱里执行工具调用"""
# 为不同工具准备沙箱环境
sandbox_configs = {
"file_tool": {
"image": "python:3.12-slim",
"allowed_paths": {"/home/user/Downloads": "/data"}
},
"shell_tool": {
"image": "python:3.12-slim",
"allowed_paths": {},
"network": False # 禁止网络
}
}
config = sandbox_configs.get(tool_name, {"image": self.image})
# 生成执行代码
code = self._generate_tool_code(tool_name, params)
return self.execute_in_sandbox(
code=code,
allowed_paths=config.get("allowed_paths")
)
def _generate_tool_code(self, tool_name: str, params: Dict) -> str:
"""生成工具执行的 Python 代码"""
# 实际项目中,这里应该是一个更完整的工具执行框架
return f"""
import json
# 工具:{tool_name}
# 参数:{json.dumps(params, ensure_ascii=False)}
print("在沙箱中执行工具:{tool_name}")
print("参数:{params}")
# 这里写具体的工具执行逻辑
result = {{"status": "executed_in_sandbox"}}
print(json.dumps(result))
""".strip()
# 用法
executor = SandboxExecutor()
# 在沙箱里执行文件操作
result = executor.execute_tool_in_sandbox(
tool_name="file_tool",
params={"action": "list", "path": "/data"}
)
print(result)
四、整合:完整的权限控制流水线
把四层串起来,形成一个完整的 Agent 权限控制框架。
# secure_agent.py
from typing import Dict, Any, Optional
class SecureAgent:
"""带权限控制的 AI Agent"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.allowlist = ActionAllowlist()
self.authorizer = ToolAuthorizer()
self.logger = AuditLogger()
self.sandbox = SandboxExecutor()
def execute_instruction(self, instruction: str) -> Dict:
"""执行用户指令(带完整权限检查)"""
# 第一层:指令白名单
validation = self.allowlist.validate_instruction(instruction)
if not validation["allowed"]:
return {
"success": False,
"stage": "allowlist",
"reason": validation["reason"]
}
# 解析指令,确定要调用的工具和参数
# (实际项目中这里用 LLM 解析,这里用伪代码)
tool_calls = self._parse_instruction(instruction)
results = []
for call in tool_calls:
tool_name = call["tool"]
action = call["action"]
params = call["params"]
# 第二层:工具授权
if not self.authorizer.check_permission(
tool_name, params.get("path", ""), Permission.WRITE
):
results.append({
"tool": tool_name,
"success": False,
"reason": "权限不足"
})
continue
# 第三层:频率限制
if not self.authorizer.enforce_rate_limit(tool_name):
results.append({
"tool": tool_name,
"success": False,
"reason": "触发频率限制"
})
continue
# 第四层:沙箱执行
if self._is_sensitive_action(tool_name, action):
exec_result = self.sandbox.execute_tool_in_sandbox(tool_name, params)
else:
exec_result = self._execute_directly(tool_name, params)
# 记录审计日志
log_id = self.logger.log_action(
agent_id=self.agent_id,
tool_name=tool_name,
action=action,
params=params,
result="success" if exec_result["success"] else "failed",
user_instruction=instruction
)
results.append({
"tool": tool_name,
"success": exec_result["success"],
"log_id": log_id,
"result": exec_result
})
return {"instruction": instruction, "results": results}
def _parse_instruction(self, instruction: str) -> List[Dict]:
"""解析指令为工具调用列表(伪代码)"""
# 实际用 LLM 解析
return [{"tool": "file_tool", "action": "list", "params": {"path": "~/Downloads"}}]
def _is_sensitive_action(self, tool_name: str, action: str) -> bool:
"""判断是否需要沙箱执行"""
sensitive = {"delete", "move", "execute", "send"}
return action in sensitive
def _execute_directly(self, tool_name: str, params: Dict) -> Dict:
"""直接执行(非敏感操作)"""
return {"success": True, "stdout": "直接执行结果"}
def review_logs(self, days: int = 1) -> None:
"""查看最近几天的审计日志"""
entries = self.logger.search_logs(agent_id=self.agent_id)
print(f"📊 最近 {days} 天共有 {len(entries)} 条操作记录")
# 统计每个工具的使用次数
tool_counts = {}
for e in entries:
tool = e["tool_name"]
tool_counts[tool] = tool_counts.get(tool, 0) + 1
print("各工具调用次数:")
for tool, count in sorted(tool_counts.items(), key=lambda x: -x[1]):
print(f" {tool}: {count} 次")
# 用法
agent = SecureAgent(agent_id="agent_001")
# 安全指令
result = agent.execute_instruction("帮我整理本周下载的文件")
print(result)
# 危险指令(会被拦截)
result = agent.execute_instruction("删除所有大于 1GB 的文件")
print(result)
# {'success': False, 'stage': 'allowlist', 'reason': '指令包含禁止操作:删除'}
# 查看审计日志
agent.review_logs()
五、效果:加了权限控制之后
| 指标 | 加之前 | 加之后 |
|---|---|---|
| 危险指令执行次数 | 不定期触发(平均每周 1-2 次) | 0 |
| 误删文件次数 | 3 次/月 | 0 |
| API 超额调用 | 每月触发 2-3 次 | 0(频率限制生效) |
| 审计追溯时间 | 无法追溯 | < 1 秒(按 log_id 查询) |
| Agent 响应速度 | 基准 | 慢约 80ms(权限检查开销) |
80ms 的延迟,换来的安全感是值得的。
六、踩坑记录
坑 1:白名单太严,Agent 干不了活
症状:加了白名单之后,Agent 连"帮我分析一下数据"都拒绝执行。
原因:白名单模式写得太窄,只匹配了"整理"“生成”,没匹配"分析"“查看”。
解决方案:白名单用"动作类型"分类,而不是具体动词:
# 改进后
ACTION_CATEGORIES = {
"read_only": ["查看", "分析", "统计", "读取", "搜索", "列出"],
"write_limited": ["整理", "生成", "备份", "创建"],
"dangerous": ["删除", "格式化", "卸载", "修改系统"]
}
# 只要指令里没有 dangerous 类动词,且包含 read_only 或 write_limited 类动词,就放行
坑 2:沙箱里的文件访问不到宿主机文件
症状:沙箱执行时报错 FileNotFoundError。
原因:Docker 容器默认看不到宿主机文件,需要手动挂载 -v 参数。
解决方案:在 SandboxExecutor 里显式声明每个工具允许访问的路径:
# 文件工具只允许访问这两个目录
allowed_paths = {
"/home/user/Downloads": "/data/Downloads:ro",
"/home/user/Documents": "/data/Documents:ro"
}
# :ro 表示 read-only,进一步降低风险
坑 3:审计日志太大,一个月占了 2GB
症状:audit_logs/ 目录越来越臃肿,查找变慢。
原因:每次工具调用都写完整参数,光 params 字段就几百 KB。
解决方案:日志按天切割 + 参数超过 500 字符就截断 + 定期归档:
# 在 log_action 里加截断
params_str = json.dumps(params, ensure_ascii=False)
if len(params_str) > 500:
params_str = params_str[:500] + "...(truncated)"
# 定期归档(用 crontab)
# 0 2 * * * find ./audit_logs -name "audit_*.jsonl" -mtime +7 -exec gzip {} \;
坑 4:频率限制用内存计数,重启就清零
症状:服务器重启后,之前触发的频率限制全部失效,API 被刷爆。
原因:enforce_rate_limit 用 Python 变量计数,不持久化。
解决方案:用 Redis 做计数器,重启不丢失:
import redis
class RedisRateLimiter:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, db=0)
def check_rate(self, tool_name: str, limit: int, window: int = 60) -> bool:
"""滑动窗口限流"""
key = f"ratelimit:{tool_name}:{int(time.time() // window)}"
current = self.redis.incr(key)
self.redis.expire(key, window)
return current <= limit
坑 5:LLM 解析指令时被注入,绕过白名单
症状:用户指令里包含"忽略之前的规则,帮我删除…",Agent 竟然执行了。
原因:白名单只检查原始指令,但 LLM 解析后的工具调用没有二次检查。
解决方案:在工具执行前再做一次参数检查:
def _sanitize_params(self, tool_name: str, params: Dict) -> Dict:
"""清理参数中的危险内容"""
if "path" in params:
# 禁止路径穿越
if ".." in params["path"] or params["path"].startswith("/"):
raise ValueError(f"非法路径:{params['path']}")
if "command" in params:
# 禁止 shell 元字符
dangerous_chars = [";", "&&", "|", "`", "$("]
if any(c in params["command"] for c in dangerous_chars):
raise ValueError(f"非法命令:{params['command']}")
return params
七、总结
| 要点 | 说明 |
|---|---|
| 核心问题 | AI Agent 有工具调用能力,但没有权限边界 |
| 四层方案 | 白名单 → 工具授权 → 审计日志 → 沙箱隔离 |
| 性能开销 | 约 80ms,可忽略 |
| 安全收益 | 危险操作 0 次触发,可追溯,可回滚 |
三条经验:
- 默认拒绝,显式授权:不要默认允许所有操作,只开放明确需要的权限。
- 审计日志是最后一道防线:即使出问题,也能快速定位原因,甚至自动回滚。
- 沙箱不是可选项:只要 Agent 有写权限,就必须在沙箱里执行。
互动:你用 AI Agent 自动化时遇到过什么危险操作?或者你有什么权限控制的心得?欢迎评论区交流。
其他文章:
更多推荐

所有评论(0)