使用FastAPI+ aiohttp构建异步LLM 调用接口(异步调用 Deepseek实战)
·
在构建 Agent 系统(如 RAG、AutoGPT、工具调用 Agent) 时,一个非常常见的问题是:
Agent 需要频繁调用 LLM,如果接口是同步阻塞的,会严重影响系统吞吐量。
例如:
-
多个 Agent 同时推理
-
RAG 检索 + LLM 生成
-
Tool 调用后再次调用 LLM
-
多轮推理链
很容易出现API 服务吞吐量低、线程阻塞或者高并发性能差的问题
针对这个问题,本文将使用 FastAPI + aiohttp 构建异步 LLM 调用接口。
项目结构
my_pj │ ├── API_run.py │ ├── tools │ ├── config.py │ ├── fetch.py │ └── chat_req.py
调用的LLM接口:https://cloud.siliconflow.cn/i/hQjWsIw3
1、fetch.py代码
这个是之前做爬虫的时候写的一套通用的异步请求框架,基本上所有的请求都会经过这里
from tools.config import logging
import aiohttp
from urllib.parse import parse_qs
def get_header():
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9",
"priority": "u=0, i",
"sec-ch-ua": "\"Not(A:Brand\";v=\"99\", \"Google Chrome\";v=\"133\", \"Chromium\";v=\"133\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
return headers
async def fetch(url: str, method='GET', headers=None, params=None, data=None, files=None, json=None, timeout=180,
logger_print=True):
if not headers:
headers = get_header()
if method not in ["GET", "POST", "HEAD", "OPTIONS", "TRACE", "PUT", "DELETE"]:
response_data = 'method not in ["GET", "POST", "HEAD", "OPTIONS", "TRACE", "PUT", "DELETE"]'
response_status = 400
return response_data, response_status
# 处理表单数据 / 文件, 支持同时处理类似requests请求中的 files + data两个参数
form_data = None
if files or data:
form_data = aiohttp.FormData()
# ---------- 1. data 是 dict----------
if data and isinstance(data, dict):
for key, value in data.items():
# 表单不支持直接传入 int 类型跟 float 类型
if isinstance(value, int) or isinstance(value, float):
form_data.add_field(key, str(value))
else:
form_data.add_field(key, value)
# ---------- 2. data 是 str, "a=1&b=2" → {"a":["1"],"b":["2"]}----------
elif isinstance(data, str):
parsed = parse_qs(data) # {'task_id': ['xxx']}
for key, values in parsed.items():
form_data.add_field(key, values[0]) # 只取第一个值
# ---------- 3. data 是 bytes ----------
elif isinstance(data, bytes):
decoded = data.decode("utf-8")
parsed = parse_qs(decoded)
for key, values in parsed.items():
form_data.add_field(key, values[0])
if files:
# 重置指针, 防止文件多次读写时 files.read() 为空
await files.seek(0)
# 此处的files是通过http api传输过来的文件对象,与本地读取的不通用
form_data.add_field(
'file', # 文件字段名称
await files.read(), # 文件内容
filename=files.filename, # 指定文件名
content_type=files.content_type # 指定内容类型
)
if logger_print:
logging.info(f"正在发起请求 - url:{url}")
timeout = aiohttp.ClientTimeout(total=timeout) # 180 秒
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.request(method, url, headers=headers, data=form_data, json=json, params=params) as response:
response_status = response.status
msg = ""
if params:
msg += f", params:{params}"
if form_data:
msg += f", data:{form_data}"
if json:
msg += f", json:{str(json)[:200]}"
pass
# 根据响应内容类型解析数据
if response.status in [200, 201, 204]:
msg_heard = f"请求成功 - url:{url}"
msg = msg_heard + msg
if logger_print:
logging.info(msg)
else:
msg_heard = f"请求失败!- status:{response_status} - url:{url}"
msg = msg_heard + msg
if logger_print:
logging.error(msg)
else:
logging.error(msg_heard)
# 尝试解析为 JSON
try:
response_data = await response.json()
except Exception:
# 如果不是 JSON,强制返回文本
response_data = await response.text()
return response_data, response_status
2、config.py代码
这个可要可不要,主要是fastapi打印的时候会好看一点
import logging
from colorama import init, Fore, Style
init(autoreset=True) # 必需:初始化 colorama
logging.basicConfig(
level=logging.INFO,
format=f'{Style.BRIGHT}%(asctime)s [%(levelname)s] - %(message)s',
datefmt='%H:%M:%S',
handlers=[logging.StreamHandler()]
)
# 修改日志颜色(精简版)
logging.addLevelName(logging.INFO, f"{Fore.CYAN}{Style.BRIGHT}{logging.getLevelName(logging.INFO)}")
logging.addLevelName(logging.WARNING, f"{Fore.YELLOW}{Style.BRIGHT}{logging.getLevelName(logging.WARNING)}")
logging.addLevelName(logging.ERROR, f"{Fore.RED}{Style.BRIGHT}{logging.getLevelName(logging.ERROR)}")
加了之后打印样式就会变成这样:

3、chat_req.py代码
我这里使用的是硅基流动的api,
注册地址:https://cloud.siliconflow.cn/i/hQjWsIw3
import time
from tools.config import logging
from tools.fetch import fetch
from pydantic import BaseModel
from fastapi import APIRouter
base_url = "https://api.siliconflow.cn/v1"
API_KEY = ""
MODEL = "deepseek-ai/DeepSeek-V3.2"
class ChatRequest(BaseModel):
system: str = "你是一个有用的助手"
content: str
router = APIRouter(tags=["Knowledge_RAG"])
@router.post(f"/chat_ai")
async def chat_ai(req:ChatRequest):
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
messages = [
{"role": "system", "content": req.system},
{"role": "user", "content": req.content}
]
payload = {
"model": MODEL,
"messages": messages
}
start = time.time()
AI_URL = base_url + "/chat/completions"
response_data, status = await fetch(
url=AI_URL,
method="POST",
headers=headers,
json=payload
)
cost = time.time() - start
logging.info(f"AI请求耗时: {cost:.3f} 秒")
if status != 200:
res = {
"error": response_data
}
else:
res = {
"reply": response_data["choices"][0]["message"]["content"]
}
logging.info(res)
4、API_run.py对应代码
import fastapi_cdn_host
import uvicorn
from fastapi import FastAPI
from tools import chat_req
app = FastAPI(docs_url="/docs", redoc_url="/redoc") # 离线下打开 /docs
fastapi_cdn_host.patch_docs(app)
app.include_router(chat_req.router, prefix="/api_pj")
if __name__ == "__main__":
uvicorn.run(
"API_run:app",
host="0.0.0.0",
port=6011
)
FastApi运行
最后我们调试一下,使用这个链接打开:http://0.0.0.0:6000/docs

直接点击运行即可,成功了说明就通了,后续可以直接调用接口
更多推荐


所有评论(0)