AI 网络安全 Agent:LangGraph + DeepSeek-R1 + 安全工具链全栈实战
从零搭建 AI 网络安全 Agent:LangGraph + DeepSeek-R1 + 安全工具链全栈实战
摘要:本文详细记录了如何基于 LangGraph 状态图引擎 + DeepSeek-R1 32B 大模型 + 本地安全工具链(nmap/nuclei/subfinder 等),构建一个具备侦察、漏洞扫描、漏洞验证、人工审批、报告生成全链路能力的 AI 网络安全 Agent。项目支持 CVE RAG 知识库增强、ELK Stack 审计日志、等保/ISO27001 合规检查。附完整源码、部署脚本和用例文档。
一、项目背景与目标
1.1 为什么需要 AI 安全 Agent?
传统的渗透测试流程高度依赖安全工程师的手动操作:
- 侦察阶段:手动运行 nmap、subfinder、httpx,逐个分析输出
- 漏洞扫描:手动配置 nuclei 模板、nikto 参数,人工筛选误报
- 报告编写:手动整理发现、计算 CVSS 评分、撰写修复建议
整个流程耗时数小时甚至数天,且高度依赖个人经验。
AI Agent 的价值:
- 自动编排多工具执行顺序
- 智能分析工具输出,减少误报
- RAG 增强:自动关联 CVE 知识库
- 结构化审计日志,满足合规要求
- 一键生成安全评估报告
1.2 技术选型
| 组件 | 选型 | 理由 |
|---|---|---|
| 推理框架 | LangGraph | 状态管理最稳,支持条件分支、循环、HITL(人机回环) |
| 底层模型 | DeepSeek-R1-Distill-Qwen-32B | 逻辑链极强,能理解复杂混淆流量和利用链 |
| 执行引擎 | vLLM / Ollama | 本地部署,数据不出境,兼容 OpenAI API |
| 向量存储 | ChromaDB | 轻量级,存放 CVE 知识库和攻击指纹,实现 RAG 增强 |
| 审计日志 | ELK Stack | Elasticsearch + Logstash + Kibana,满足等保合规 |
| CLI 框架 | Click + Rich | 命令行交互,丰富的终端输出 |
1.3 系统架构
┌─────────────────────────────────────────────────────────┐
│ 用户接口 (CLI) │
│ sec-agent scan <target> │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────┐
│ LangGraph 状态图引擎 │
│ │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Planner│→│ Recon │→│VulnScan│→│ Report │ │
│ └───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘ │
│ │ │ │ │ │
│ [LLM规划] [HITL审批] [RAG增强] [生成报告] │
└──────┼───────────┼───────────┼───────────┼───────────────┘
│ │ │ │
┌──────▼───────────▼───────────▼───────────▼───────────────┐
│ 工具执行层 (subprocess 沙箱) │
│ nmap │ subfinder │ httpx │ nuclei │ nikto │ shodan │
└──────────────────────┬───────────────────────────────────┘
│
┌──────────────────────▼───────────────────────────────────┐
│ 模型推理: DeepSeek-R1 32B (vLLM/Ollama) │
│ 知识库: ChromaDB (CVE RAG + 攻击指纹) │
│ 审计层: JSONL + Elasticsearch (等保合规) │
└──────────────────────────────────────────────────────────┘
二、环境准备
2.1 系统要求
- 操作系统:Windows 10/11(本文环境)、Linux、macOS
- Python:3.11+
- Docker:用于部署 ELK Stack(可选)
- Go:用于安装安全工具(subfinder/httpx/nuclei)
- GPU:运行 32B 模型需要 24GB+ 显存(或使用量化版本)
2.2 安装 Python 环境
# 创建项目目录
mkdir D:\网络安全agent\sec-agent
cd D:\网络安全agent\sec-agent
# 创建虚拟环境
python -m venv .venv
# 激活虚拟环境(Windows)
.venv\Scripts\activate
# 激活虚拟环境(Linux/macOS)
# source .venv/bin/activate
2.3 安装安全工具
nmap(端口扫描)
# Windows: 使用 winget
winget install Insecure.Nmap
# 或从官网下载安装包
# https://nmap.org/download.html
# 验证安装
nmap --version
Go 工具链(subfinder / httpx / nuclei)
# 先安装 Go: https://go.dev/dl/
# 安装 subfinder(子域名枚举)
go install -v github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest
# 安装 httpx(Web 指纹识别)
go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest
# 安装 nuclei(模板化漏洞扫描)
go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest
# 初始化 nuclei 模板库
nuclei -update-templates
# 验证安装
subfinder -version
httpx -version
nuclei -version
nikto(Web 服务器漏洞扫描)
# 方式一:本地安装(需要 Perl 环境)
# 下载: https://github.com/sullo/nikto
# 方式二:使用 Docker(推荐)
docker run --rm sullo/nikto -h <target>
2.4 配置环境变量
# 复制环境变量模板
copy .env.example .env # Windows
# cp .env.example .env # Linux/macOS
编辑 .env 文件:
# ========== 模型推理 ==========
# vLLM 本地部署(推荐生产环境)
VLLM_API_BASE=http://localhost:8000/v1
VLLM_API_KEY=not-needed
MODEL_NAME=deepseek-ai/DeepSeek-R1-Distill-Qwen-32B
# Ollama 本地部署(推荐开发调试)
# OLLAMA_API_BASE=http://localhost:11434/v1
# MODEL_NAME=deepseek-r1:32b
# ========== 安全工具 ==========
# Shodan API Key(可选,用于互联网资产搜索)
# 获取地址: https://account.shodan.io
SHODAN_API_KEY=
# ========== 审计 / ELK ==========
ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_INDEX=sec-agent-audit
OPERATOR_ID=admin
# ========== Agent 配置 ==========
MAX_ITERATIONS=50
TOOL_TIMEOUT=300
REQUIRE_APPROVAL=true
2.5 安装 Python 依赖
# 安装项目依赖
pip install -e .
# 开发模式(包含测试依赖)
pip install -e ".[dev]"
核心依赖说明:
| 包名 | 版本 | 用途 |
|---|---|---|
| langgraph | >=0.2.0 | 状态图引擎 |
| langgraph-checkpoint-sqlite | >=2.0.0 | 持久化 checkpointer |
| openai | >=1.50.0 | 连接 vLLM/Ollama(OpenAI 兼容 API) |
| chromadb | >=0.5.0 | 向量数据库(CVE RAG) |
| elasticsearch | >=8.16.0 | 审计日志推送到 ES |
| pydantic | >=2.9.0 | 数据模型验证 |
| click | >=8.1.0 | CLI 框架 |
| rich | >=13.9.0 | 终端美化输出 |
| httpx | >=0.28.0 | HTTP 请求(Shodan API) |
| xmltodict | >=0.14.0 | 解析 nmap XML 输出 |
三、项目结构详解
sec-agent/
├── cli.py # CLI 入口(7 个命令)
├── pyproject.toml # 依赖配置
├── .env.example # 环境变量模板
├── USAGE.md # 用例文档
│
├── config/ # 配置层
│ ├── settings.py # 全局配置(Pydantic Settings)
│ └── prompts.py # 系统提示词 + 报告模板
│
├── agent/ # Agent 核心层
│ ├── state.py # LangGraph 状态定义
│ ├── graph.py # 状态图编排
│ ├── nodes.py # 8 个节点实现
│ └── edges.py # 条件路由逻辑
│
├── tools/ # 工具层
│ ├── registry.py # 工具注册中心(自动发现)
│ ├── recon/ # 侦察工具
│ │ ├── nmap_tool.py # 端口扫描
│ │ ├── subfinder_tool.py # 子域名枚举
│ │ ├── httpx_tool.py # Web 指纹识别
│ │ └── shodan_tool.py # 互联网资产搜索
│ └── vuln/ # 漏洞扫描工具
│ ├── nuclei_tool.py # 模板化漏洞检测
│ └── nikto_tool.py # Web 服务器漏洞
│
├── memory/ # 知识库层
│ ├── vector_store.py # ChromaDB 向量存储
│ ├── cve_loader.py # NVD 数据加载器
│ └── session_memory.py # 会话级内存
│
├── audit/ # 审计层
│ ├── logger.py # 结构化审计日志
│ └── compliance.py # 等保/ISO27001 合规检查
│
├── sandbox/ # 执行沙箱
│ ├── executor.py # subprocess 执行器
│ └── validator.py # 输出解析验证
│
├── elk/ # ELK 部署
│ ├── docker-compose.yml # 一键部署
│ └── logstash.conf # 管道配置
│
└── tests/ # 测试
├── conftest.py
├── test_registry.py
├── test_state.py
└── test_validator.py
四、核心模块实现
4.1 工具注册中心(tools/registry.py)
这是整个工具链的基石。每个工具文件在导入时自动调用 registry.register() 注册自己。
@dataclass
class SecurityTool:
name: str # 工具名称
category: str # 分类: recon | vuln | exploit | audit
risk_level: str # 风险等级: low | medium | high | critical
requires_approval: bool # 是否需要人工审批
binary: str # 可执行文件名(API 工具留空)
description: str = "" # 工具描述
timeout: int = 300 # 超时时间(秒)
schema: dict = field(default_factory=dict) # 参数 JSON Schema
关键设计:API 工具 vs CLI 工具的区分
def check_available(self, name: str) -> tuple[bool, str]:
tool = self._tools.get(name)
if not tool:
return False, f"Tool '{name}' not registered"
# API 类工具(如 Shodan)没有本地 binary,直接返回可用
if not tool.binary:
return True, ""
# CLI 类工具需要检查 binary 是否在 PATH 中
binary_path = shutil.which(tool.binary)
if not binary_path:
return False, f"Binary '{tool.binary}' not found in PATH"
return True, ""
自动发现机制:
def discover_tools() -> list[str]:
"""扫描 tools/ 下所有 *_tool.py 文件,自动导入注册"""
tools_dir = Path(__file__).resolve().parent
imported = []
for subdir in ["recon", "vuln", "exploit", "audit", "report"]:
pkg_dir = tools_dir / subdir
if not pkg_dir.is_dir():
continue
for py_file in sorted(pkg_dir.glob("*_tool.py")):
mod_name = f"tools.{subdir}.{py_file.stem}"
importlib.import_module(mod_name)
imported.append(mod_name)
return imported
只需在 tools/recon/ 下创建 xxx_tool.py 并调用 registry.register(),无需手动注册。
4.2 工具执行器(sandbox/executor.py)
统一的 subprocess 封装,带超时控制和错误处理:
def run_command(cmd: list[str], timeout: int | None = None) -> ExecutionResult:
timeout = timeout or settings.tool_timeout
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout
)
return ExecutionResult(
success=proc.returncode == 0,
stdout=proc.stdout,
stderr=proc.stderr,
returncode=proc.returncode,
)
except subprocess.TimeoutExpired:
return ExecutionResult(
success=False, stderr=f"Command timed out after {timeout}s",
returncode=-1, timed_out=True,
)
4.3 侦察工具示例(tools/recon/nmap_tool.py)
以 nmap 为例,展示工具的标准实现模式:
def nmap_scan(target: str, ports: str = "1-10000", scan_type: str = "sV") -> dict:
# 构建命令
cmd = ["nmap", f"-{scan_type}", "-p", ports, "--open", "-oX", "-"]
cmd.append(target)
# 执行
result = run_command(cmd)
if not result.success:
return {"error": result.stderr, "success": False}
# 解析 XML 输出
parsed = xmltodict.parse(result.stdout)
hosts = parsed.get("nmaprun", {}).get("host", [])
# ... 解析端口和服务信息 ...
return {
"success": True,
"target": target,
"services": services,
"total_open_ports": len(services),
}
# 注册工具
registry.register(
tool=SecurityTool(
name="nmap_scan",
category="recon",
risk_level="low",
requires_approval=False,
binary="nmap",
description="Port scanning and service detection using nmap.",
schema={...},
),
handler=nmap_scan,
)
4.4 LangGraph 状态图(agent/graph.py)
这是 Agent 的"大脑",定义了任务的执行流程:
def build_security_agent(checkpointer=None):
discover_tools()
graph = StateGraph(AgentState)
# 注册节点
graph.add_node("planner", plan_task) # LLM 规划
graph.add_node("recon", run_recon) # 侦察
graph.add_node("vuln_scan", run_vuln_scan) # 漏洞扫描
graph.add_node("validate", validate_vulns) # 漏洞验证
graph.add_node("human_approval", wait_approval) # 人工审批
graph.add_node("exploit", run_exploit) # 漏洞利用
graph.add_node("audit_check", run_audit) # 安全审计
graph.add_node("report", generate_report) # 报告生成
# 定义执行流程
graph.set_entry_point("planner")
# Planner → 根据任务类型路由
graph.add_conditional_edges("planner", route_task, {
"recon": "recon",
"vuln_scan": "vuln_scan",
"audit": "audit_check",
})
# Recon → VulnScan → 判断是否需要利用
graph.add_edge("recon", "vuln_scan")
graph.add_conditional_edges("vuln_scan", check_exploit_needed, {
"human_approval": "human_approval", # 有高危漏洞 → 请求审批
"validate": "validate", # 直接验证
"report": "report", # 无高危 → 直接报告
})
# 审批 → 验证 → 利用 → 报告
graph.add_edge("human_approval", "validate")
graph.add_edge("validate", "exploit")
graph.add_edge("exploit", "report")
graph.add_edge("audit_check", "report")
graph.add_edge("report", END)
# interrupt_before 实现 HITL:exploit 节点执行前暂停等待人工确认
return graph.compile(
checkpointer=checkpointer or _build_checkpointer(),
interrupt_before=["exploit"],
)
状态定义(agent/state.py):
class AgentState(TypedDict):
target: str # 目标
task_type: Literal["recon", "vuln_scan", "exploit", "audit", "full"]
messages: Annotated[list, add_messages]
recon_results: dict # 侦察结果
vuln_results: list # 漏洞列表
exploit_results: list # 利用结果
risk_score: float # 综合风险评分
requires_approval: bool # 是否需要审批
approval_granted: bool # 是否已批准
current_phase: str # 当前阶段
report: str # 最终报告
4.5 节点实现(agent/nodes.py)
Planner 节点 — LLM 智能规划
def plan_task(state: AgentState) -> dict:
"""调用 DeepSeek-R1 决定下一步操作"""
state_summary = f"Target: {target} | Task: {task_type} | Phase: {current_phase}"
# ... 构建状态摘要 ...
response = call_llm([
{"role": "system", "content": PLANNER_PROMPT},
{"role": "user", "content": f"Plan next step for target: {target}"},
], json_mode=True)
plan = json.loads(response)
return {"next_action": plan["next_action"], "current_phase": plan["next_action"]}
Recon 节点 — 带审计和 RAG 增强
def run_recon(state: AgentState) -> dict:
audit = AuditLogger(session_id=state.get("audit_log_id"))
target = state["target"]
results = {}
# 域名才跑 subfinder
if not _is_ip(target):
results["subfinder"] = _safe_execute("subfinder_enum", {"domain": target}, audit)
# httpx 指纹 + nmap 端口扫描
results["httpx"] = _safe_execute("httpx_probe", {"target": target}, audit)
results["nmap"] = _safe_execute("nmap_scan", {"target": target}, audit)
# RAG: 根据发现的服务查询相关 CVE
vs = VectorStore()
for svc in results["nmap"].get("services", [])[:5]:
cve_hits = vs.search_cves(f"{svc['product']} {svc['version']}")
results["rag_cves"].extend(cve_hits)
return {"recon_results": results, "current_phase": "recon_done"}
关键设计:_safe_execute 容错包装
def _safe_execute(tool_name: str, args: dict, audit_logger=None) -> dict:
"""单个工具失败不会崩溃整个流程"""
try:
return registry.execute(tool_name, args, audit_logger=audit_logger, force=True)
except ToolNotAvailable as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": str(e)}
4.6 CVE RAG 知识库(memory/vector_store.py)
使用 ChromaDB 存储 CVE 数据,实现语义检索:
class VectorStore:
def __init__(self):
self.client = chromadb.PersistentClient(path=str(persist_dir))
self.cve_collection = self.client.get_or_create_collection(
name="cve_knowledge",
metadata={"hnsw:space": "cosine"},
)
def add_cves(self, cves: list[dict]) -> int:
"""批量写入 CVE 数据"""
for cve in cves:
doc_text = f"{cve['cve_id']}: {cve['description']} Severity: {cve['severity']}"
self.cve_collection.upsert(
ids=[cve["cve_id"]],
documents=[doc_text],
metadatas=[{"severity": cve["severity"], "cvss": str(cve["cvss"])}],
)
def search_cves(self, query: str, n_results: int = 5) -> list[dict]:
"""语义检索相关 CVE"""
results = self.cve_collection.query(
query_texts=[query],
n_results=n_results,
include=["documents", "metadatas", "distances"],
)
return [{"cve_id": doc_id, "document": doc, ...} for doc_id, doc in ...]
从 NVD 加载 CVE 数据:
def fetch_cves_from_nvd(keyword: str) -> list[dict]:
"""调用 NVD API 2.0 获取 CVE 数据"""
resp = httpx.get("https://services.nvd.nist.gov/rest/json/cves/2.0",
params={"keywordSearch": keyword, "resultsPerPage": 200})
# 解析 CVSS 评分、影响产品、描述等
# ...
4.7 审计日志层(audit/logger.py)
满足等保 2.0 和 ISO 27001 的审计要求:
class AuditLogger:
def _write(self, doc: dict) -> None:
doc["timestamp"] = datetime.now(timezone.utc).isoformat()
doc["session_id"] = self.session_id
doc["operator"] = settings.operator_id
# 写入本地 JSONL 文件
with open(self._log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(doc, ensure_ascii=False) + "\n")
# 推送到 Elasticsearch
es = self._get_es()
if es:
es.index(index=settings.elasticsearch_index, document=doc)
def log_tool_call(self, tool_name, args, risk_level):
self._write({"event_type": "tool_call", "tool_name": tool_name, ...})
def log_approval(self, tool_name, approved, approver):
self._write({"event_type": "human_approval", ...})
合规检查器:
class ComplianceChecker:
REQUIRED_EVENTS = ["tool_call", "tool_result", "human_approval", "phase_change", "error"]
REQUIRED_FIELDS = ["timestamp", "session_id", "operator", "event_type"]
def validate_log_completeness(self) -> dict:
"""检查日志是否包含所有必需事件和字段"""
# ...
return {"compliant": True/False, "missing_events": [...], "field_violations": [...]}
五、部署模型推理服务
5.1 方案一:vLLM 部署(推荐生产环境)
# 安装 vLLM
pip install vllm
# 启动服务(A100/H100 24GB+ 显存)
vllm serve deepseek-ai/DeepSeek-R1-Distill-Qwen-32B \
--host 0.0.0.0 \
--port 8000 \
--max-model-len 32768 \
--gpu-memory-utilization 0.9
# 如果显存不足,使用量化版本
vllm serve deepseek-ai/DeepSeek-R1-Distill-Qwen-32B-GPTQ \
--host 0.0.0.0 \
--port 8000 \
--quantization gptq
5.2 方案二:Ollama 部署(推荐开发调试)
# 安装 Ollama: https://ollama.com
# 拉取模型
ollama pull deepseek-r1:32b
# 启动服务(默认端口 11434)
ollama serve
# .env 配置
# VLLM_API_BASE=http://localhost:11434/v1
# MODEL_NAME=deepseek-r1:32b
5.3 验证模型服务
# vLLM
curl http://localhost:8000/v1/models
# Ollama
curl http://localhost:11434/api/tags
# 测试推理
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-32B","messages":[{"role":"user","content":"Hello"}]}'
六、部署 ELK Stack(可选)
6.1 一键部署
cd elk
docker-compose up -d
6.2 服务地址
| 服务 | 地址 | 用途 |
|---|---|---|
| Elasticsearch | http://localhost:9200 | 数据存储与搜索 |
| Kibana | http://localhost:5601 | 可视化仪表板 |
| Logstash | localhost:5044 | 日志收集管道 |
6.3 验证部署
# 检查 Elasticsearch
curl http://localhost:9200/_cluster/health
# 使用 CLI 检查
python cli.py elk-status
6.4 Kibana 配置
- 打开 http://localhost:5601
- 进入 Stack Management → Index Patterns
- 创建索引模式:
sec-agent-audit-* - 进入 Discover 查看审计日志
- 可按
event_type、tool_name、risk_level、operator等字段过滤
七、使用指南
7.1 查看工具状态
python cli.py tools
输出示例:
┌───────────── Available Security Tools ─────────────┐
│ Name Category Risk Level Status │
├─────────────────────────────────────────────────────┤
│ nmap_scan recon low OK │
│ subfinder_enum recon low OK │
│ httpx_probe recon low OK │
│ nuclei_scan vuln medium OK │
│ nikto_scan vuln medium OK │
│ shodan_search recon low OK │
└─────────────────────────────────────────────────────┘
7.2 全栈安全扫描
# 完整扫描(侦察 → 漏洞扫描 → 审批 → 报告)
python cli.py scan 192.168.1.1
# 指定任务类型
python cli.py scan example.com -t recon # 仅侦察
python cli.py scan example.com -t vuln_scan # 仅漏洞扫描
python cli.py scan example.com -t audit # 仅安全审计
# 保存报告
python cli.py scan example.com -o report.md
# 跳过人工审批(自动化模式)
python cli.py scan example.com --no-approval
7.3 快捷命令
python cli.py recon example.com # 快速侦察
python cli.py vuln example.com # 快速漏洞扫描
python cli.py audit 192.168.1.1 # 快速安全审计
7.4 加载 CVE 知识库
# 从 NVD 在线加载(按关键词)
python cli.py load-cves -k apache -k nginx -k openssl -k log4j -l 200
# 注意:NVD API 有速率限制(无 API Key 时 5 req/30s)
# 代码中已内置 6 秒间隔限流
7.5 合规检查
# 检查当前会话的审计日志合规性
python cli.py compliance
# 检查指定会话
python cli.py compliance <session_id>
输出示例:
# 审计合规检查报告
## 检查结果: PASS
## 详情
- 总日志条目: 42
- 发现的事件类型: tool_call, tool_result, human_approval, phase_change, error
- 缺失的事件类型: 无
## 合规标准
- 等保 2.0: 要求所有安全操作可追溯
- ISO 27001: 要求完整的审计日志链
八、人工审批流程(HITL)
这是本项目的核心安全设计。高危操作(漏洞利用)必须经过人工确认。
8.1 工作原理
- Agent 执行侦察和漏洞扫描
- 发现高危漏洞(CVSS ≥ 7.0)时自动暂停
- 在终端展示漏洞列表,等待用户确认
- 用户批准后继续执行漏洞利用
- 用户拒绝则跳过利用,直接生成报告
8.2 代码流程
# cli.py 中的审批处理
if result.get("requires_approval") and not no_approval:
# 展示高危漏洞
for v in high_risk:
table.add_row(v["severity"], v["name"], str(v["risk_score"]))
console.print(table)
# 等待用户确认
if click.confirm("Approve exploitation?", default=False):
result = agent.invoke({"approval_granted": True}, config=config)
audit.log_approval("exploit", True, operator)
else:
audit.log_approval("exploit", False, operator)
8.3 LangGraph interrupt_before 机制
graph.compile(
checkpointer=checkpointer,
interrupt_before=["exploit"], # exploit 节点执行前暂停
)
暂停后通过 agent.invoke({"approval_granted": True}) 恢复执行。
九、Python API 调用
除了 CLI,也可以直接在 Python 中调用:
from agent.graph import run_agent
# 完整扫描
result = run_agent("192.168.1.1", task_type="full")
# 查看结果
print(result["report"]) # 安全报告
print(result["vuln_results"]) # 漏洞列表
print(result["risk_score"]) # 风险评分
# 单独调用工具
from tools.recon.nmap_tool import nmap_scan
from tools.vuln.nuclei_tool import nuclei_scan
nmap_result = nmap_scan("192.168.1.1", ports="1-1000")
nuclei_result = nuclei_scan("http://192.168.1.1", severity="critical,high")
# CVE RAG 检索
from memory.vector_store import VectorStore
vs = VectorStore()
cves = vs.search_cves("apache log4j remote code execution", n_results=5)
# 添加攻击指纹
vs.add_fingerprint(
fp_id="sql_injection_union",
description="SQL Injection via UNION SELECT",
indicators=["UNION SELECT", "ORDER BY", "-- ", "OR 1=1"]
)
十、添加自定义工具
只需 3 步即可添加新工具:
步骤 1:创建工具文件
在 tools/recon/ 或 tools/vuln/ 下创建 my_tool.py:
from sandbox.executor import run_command
from tools.registry import SecurityTool, registry
def my_scan(target: str, mode: str = "fast") -> dict:
result = run_command(["my_tool", "--mode", mode, target])
return {"success": result.success, "output": result.stdout}
registry.register(
tool=SecurityTool(
name="my_scan",
category="recon", # 分类
risk_level="low", # 风险等级
requires_approval=False, # 是否需要审批
binary="my_tool", # 可执行文件名(API 工具留空 "")
description="My custom scanner",
schema={
"type": "object",
"properties": {
"target": {"type": "string", "description": "Scan target"},
"mode": {"type": "string", "default": "fast"},
},
"required": ["target"],
},
),
handler=my_scan,
)
步骤 2:安装工具
确保 my_tool 在系统 PATH 中。
步骤 3:验证
python cli.py tools # 应该能看到 my_scan
无需修改任何其他文件,自动发现机制会处理一切。
十一、项目源码文件清单
| 文件 | 行数 | 功能 |
|---|---|---|
cli.py |
214 | CLI 入口,7 个命令 |
config/settings.py |
48 | Pydantic Settings 配置 |
config/prompts.py |
72 | 系统提示词 + 报告模板 |
agent/state.py |
23 | LangGraph 状态定义 |
agent/graph.py |
124 | 状态图编排 |
agent/nodes.py |
385 | 8 个节点实现 |
agent/edges.py |
30 | 条件路由逻辑 |
tools/registry.py |
142 | 工具注册中心 |
tools/recon/nmap_tool.py |
89 | nmap 端口扫描 |
tools/recon/subfinder_tool.py |
63 | 子域名枚举 |
tools/recon/httpx_tool.py |
90 | Web 指纹识别 |
tools/recon/shodan_tool.py |
143 | Shodan 互联网资产搜索 |
tools/vuln/nuclei_tool.py |
97 | nuclei 漏洞扫描 |
tools/vuln/nikto_tool.py |
95 | nikto Web 漏洞扫描 |
memory/vector_store.py |
145 | ChromaDB 向量存储 |
memory/cve_loader.py |
151 | NVD 数据加载器 |
memory/session_memory.py |
47 | 会话级内存 |
audit/logger.py |
104 | 审计日志(JSONL + ES) |
audit/compliance.py |
103 | 等保/ISO27001 合规检查 |
sandbox/executor.py |
74 | subprocess 执行器 |
sandbox/validator.py |
64 | 输出解析验证 |
十二、测试
# 运行全部测试
python -m pytest tests/ -v
# 运行单个测试文件
python -m pytest tests/test_registry.py -v
# 运行单个测试
python -m pytest tests/test_registry.py::TestToolRegistry::test_register_and_get -v
测试覆盖:
- 工具注册与发现
- API 工具 vs CLI 工具的可用性判断
- 人工审批机制
- 输出解析(JSON / NDJSON / 纯文本)
- 目标验证(IP / 域名 / URL)
- LangGraph 状态字段完整性
十三、常见问题
Q1: 工具未找到?
python cli.py tools # 查看工具状态
# 确保工具已安装并在 PATH 中
# Windows 需要重启终端让 PATH 生效
Q2: 模型连接失败?
# 检查 vLLM
curl http://localhost:8000/v1/models
# 检查 Ollama
curl http://localhost:11434/api/tags
# 确认 .env 中的 VLLM_API_BASE 配置正确
Q3: ELK 连接失败?
python cli.py elk-status
# 确保 Docker 容器正在运行
docker ps | findstr sec-agent
# 查看容器日志
docker logs sec-agent-es
Q4: NVD API 被限流?
代码已内置 6 秒间隔限流。如果仍然被限流:
- 申请 NVD API Key:https://nvd.nist.gov/developers/request-an-api-key
- 在
.env中配置NVD_API_KEY
Q5: 显存不足运行 32B 模型?
# 方案一:使用量化版本
vllm serve deepseek-ai/DeepSeek-R1-Distill-Qwen-32B-GPTQ --quantization gptq
# 方案二:使用更小的蒸馏版本
# MODEL_NAME=deepseek-ai/DeepSeek-R1-Distill-Qwen-7B
# 方案三:使用 Ollama 的 4bit 量化
ollama pull deepseek-r1:32b-q4_K_M
十四、总结
本文实现了一个完整的 AI 网络安全 Agent,核心特性:
- LangGraph 状态图:清晰定义侦察 → 扫描 → 验证 → 利用 → 报告的执行流程
- DeepSeek-R1 32B:强大的推理能力,智能规划任务和分析漏洞
- 自动工具发现:新增工具只需创建文件,无需修改任何其他代码
- CVE RAG 增强:扫描结果自动关联 CVE 知识库,提供上下文信息
- 人工审批(HITL):高危操作必须经过人工确认,安全可控
- 审计合规:完整的审计日志链,满足等保 2.0 和 ISO 27001 要求
- 容错设计:单个工具失败不影响整体流程
项目地址:https//:github.com/ggjj-hub
下一步计划:
- 集成 sqlmap(SQL 注入自动化验证)
- 集成 Metasploit(漏洞利用框架)
- 支持并发扫描多目标
- Web UI 仪表板
- 定时扫描任务
免责声明:本项目仅用于授权的安全测试和研究目的。未经授权对他人系统进行安全测试是违法行为。请确保在使用前获得目标系统的明确授权。
求助:本人在网络安全大模型的自主训练有所准备,但由于相关的信息较为敏感以及封闭,所以目前还停留在数据集收集阶段以及框架搭建阶段,能否有大佬提供思路,介绍资源,在此,由衷感谢!
更多推荐


所有评论(0)