GLM-4-9B-Chat-1M详细步骤:异步批处理API设计+队列积压告警机制

1. 引言:当长文本模型遇到高并发挑战

想象一下,你刚刚把一个能处理百万字长文档的AI模型部署到公司内网,同事们兴奋地用它分析财报、审阅合同、梳理代码。但没过多久,问题来了:用户抱怨响应太慢,服务器监控显示GPU利用率忽高忽低,甚至有人提交了超长文档后,整个服务直接卡死。

这就是我们今天要解决的核心问题——如何让GLM-4-9B-Chat-1M这样的“长文本巨兽”在高并发场景下稳定、高效地服务

GLM-4-9B-Chat-1M确实强大:100万tokens的上下文窗口,能一口气读完一整部小说;4-bit量化技术,让9B参数的大模型用单张显卡就能跑起来。但它的“长文本”特性也带来了独特挑战:

  • 推理时间长:处理几十万字的文档可能需要几分钟
  • 显存占用大:即使量化了,长上下文依然消耗大量显存
  • 请求阻塞:一个长文档请求会卡住后续所有请求

传统的同步API模式在这里完全失效。用户不可能等几分钟才看到结果,服务器也不可能同时处理多个长文档请求。

本文将带你一步步构建一个异步批处理API系统,并设计智能的队列积压告警机制。这不是简单的技术堆砌,而是一套经过实战检验的工程方案,能让你:

  1. 将长文本处理的平均响应时间从分钟级降到秒级
  2. 让单卡GPU同时服务多个用户请求
  3. 在请求积压时自动告警,防止服务雪崩
  4. 所有代码开箱即用,基于FastAPI和Redis实现

无论你是要搭建企业内部AI助手,还是为产品提供长文档分析服务,这套方案都能直接拿来用。

2. 为什么需要异步批处理?同步API的致命缺陷

在深入代码之前,我们先搞清楚一个关键问题:为什么传统的同步API模式对GLM-4-9B-Chat-1M这样的长文本模型行不通?

2.1 同步处理的“死亡连锁反应”

假设我们用最简单的Flask或同步FastAPI来部署:

from fastapi import FastAPI
from transformers import AutoModelForCausalLM, AutoTokenizer

app = FastAPI()
model = AutoModelForCausalLM.from_pretrained("THUDM/glm-4-9b-chat-1M")
tokenizer = AutoTokenizer.from_pretrained("THUDM/glm-4-9b-chat-1M")

@app.post("/chat")
def chat(request: dict):
    # 用户提交了一个50万字的文档
    text = request["text"]
    
    # 模型开始推理,GPU被100%占用,持续2分钟
    inputs = tokenizer(text, return_tensors="pt").to("cuda")
    outputs = model.generate(**inputs, max_length=1000000)
    response = tokenizer.decode(outputs[0])
    
    return {"response": response}

这个代码看起来没问题,但实际运行时会引发一系列连锁问题:

  1. 请求阻塞:第一个用户提交50万字文档 → GPU占用100%持续2分钟 → 第二个用户的请求在队列中等待2分钟
  2. 超时重试:等待的用户可能多次重试 → 产生重复请求 → 队列进一步积压
  3. 内存泄漏:多个请求同时处理时,显存可能溢出 → 服务崩溃
  4. 用户体验灾难:用户看到的是“请求超时”或“服务器错误”

2.2 长文本模型的三个特殊挑战

GLM-4-9B-Chat-1M这类模型与普通聊天模型有本质区别:

挑战一:极不均匀的处理时间

  • 短问题(100字):1-2秒完成
  • 中长文档(1万字):10-20秒
  • 超长文档(50万字):2-5分钟

如果按先来先服务(FIFO)处理,一个长文档会阻塞后面几十个短问题。

挑战二:显存占用与文本长度正相关

# 显存占用 ≈ 基础模型显存 + 上下文显存
# 上下文显存 ≈ tokens数量 × 每token显存(约0.1MB/千token)
# 100万tokens ≈ 100MB显存(仅上下文部分)

这意味着同时处理多个长文档几乎不可能,必须错开调度。

挑战三:用户期望即时反馈 用户提交文档后,如果几分钟没反应,会认为服务挂了。我们需要“提交即响应”,然后后台处理。

2.3 异步批处理的解决思路

异步批处理的核心思想很简单:将请求接收与模型推理解耦

传统同步模式:
用户请求 → 立即处理 → 返回结果(阻塞等待)

异步批处理模式:
用户请求 → 存入队列 → 立即返回"已接收" → 后台Worker处理 → 结果推送给用户

这样做的好处:

  1. 快速响应:提交请求后立即得到确认(毫秒级)
  2. 资源优化:Worker可以按需启动,批量处理相似长度的文档
  3. 弹性扩展:请求多时增加Worker,请求少时减少
  4. 失败重试:单个请求失败不影响其他请求

下面我们开始构建这个系统。

3. 系统架构设计:从零搭建异步处理管道

我们的目标架构包含四个核心组件:

  1. API网关:接收用户请求,分配任务ID,立即响应
  2. 消息队列:存储待处理任务,实现生产者和消费者解耦
  3. 推理Worker:从队列获取任务,调用模型,存储结果
  4. 结果存储:保存处理结果,支持查询和推送

3.1 技术选型与依赖安装

我们选择以下技术栈,都是经过大规模生产验证的:

# 核心依赖
pip install fastapi uvicorn redis celery transformers torch
pip install "celery[redis]"  # Celery的Redis支持
pip install pydantic python-dotenv loguru

# 可选:监控和告警
pip install prometheus-client psutil

为什么选这些?

  • FastAPI:现代、高性能的Python Web框架,自带API文档
  • Redis:内存数据库,做消息队列和结果缓存都很合适
  • Celery:Python最成熟的分布式任务队列,社区活跃
  • Loguru:比标准logging更好用的日志库

3.2 项目目录结构

先创建清晰的项目结构:

glm-async-api/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI主应用
│   ├── config.py            # 配置文件
│   ├── models.py            # 数据模型
│   ├── tasks.py             # Celery任务定义
│   ├── workers/             # 推理Worker
│   │   ├── __init__.py
│   │   ├── glm_worker.py    # GLM模型Worker
│   │   └── batch_manager.py # 批处理管理器
│   ├── api/                 # API路由
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── endpoints.py # API端点
│   │   │   └── schemas.py   # 请求/响应模型
│   ├── core/                # 核心功能
│   │   ├── __init__.py
│   │   ├── redis_client.py  # Redis客户端
│   │   ├── security.py      # 认证授权
│   │   └── monitoring.py    # 监控告警
│   └── utils/               # 工具函数
│       ├── __init__.py
│       ├── text_utils.py    # 文本处理工具
│       └── metrics.py       # 指标计算
├── docker-compose.yml       # Docker编排
├── requirements.txt         # 依赖列表
├── .env.example             # 环境变量示例
└── README.md                # 项目说明

这个结构看起来复杂,但每个文件都很小,职责单一。我们一步步实现。

4. 核心实现一:异步API网关与任务队列

4.1 配置和基础模型

首先定义配置和数据模型:

# app/config.py
import os
from typing import Optional
from pydantic_settings import BaseSettings
from loguru import logger

class Settings(BaseSettings):
    # Redis配置
    redis_host: str = "localhost"
    redis_port: int = 6379
    redis_db: int = 0
    redis_password: Optional[str] = None
    
    # 模型配置
    model_name: str = "THUDM/glm-4-9b-chat-1M"
    model_cache_dir: str = "./model_cache"
    max_tokens: int = 1000000  # 模型最大支持
    batch_size: int = 4  # 批处理大小
    
    # API配置
    api_host: str = "0.0.0.0"
    api_port: int = 8080
    api_workers: int = 4
    
    # 任务配置
    task_timeout: int = 600  # 任务超时时间(秒)
    task_max_retries: int = 3
    
    # 告警配置
    queue_warning_threshold: int = 50  # 队列积压警告阈值
    queue_critical_threshold: int = 100  # 队列积压严重阈值
    check_interval: int = 30  # 检查间隔(秒)
    
    class Config:
        env_file = ".env"

settings = Settings()

# 初始化日志
logger.add("logs/api_{time}.log", rotation="500 MB", retention="10 days")
# app/models.py
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
from enum import Enum

class TaskStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    TIMEOUT = "timeout"

class ChatRequest(BaseModel):
    """聊天请求模型"""
    text: str = Field(..., min_length=1, max_length=10000000, description="输入文本")
    question: Optional[str] = Field(None, description="针对文本的问题")
    max_length: int = Field(512, ge=1, le=4096, description="生成的最大长度")
    temperature: float = Field(0.7, ge=0.0, le=2.0, description="温度参数")
    
    class Config:
        json_schema_extra = {
            "example": {
                "text": "这是一段很长的文档内容...",
                "question": "请总结文档的核心观点",
                "max_length": 512,
                "temperature": 0.7
            }
        }

class TaskResponse(BaseModel):
    """任务提交响应"""
    task_id: str
    status: str
    message: str
    estimated_time: Optional[int] = None  # 预计等待时间(秒)
    queue_position: Optional[int] = None  # 队列位置
    
class TaskResult(BaseModel):
    """任务结果"""
    task_id: str
    status: TaskStatus
    result: Optional[str] = None
    error: Optional[str] = None
    created_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    processing_time: Optional[float] = None  # 处理耗时(秒)

4.2 Redis客户端封装

Redis是我们的核心基础设施,需要好好封装:

# app/core/redis_client.py
import redis
import json
from typing import Any, Optional, List, Dict
from loguru import logger
from app.config import settings

class RedisClient:
    """Redis客户端封装"""
    
    def __init__(self):
        self.client = redis.Redis(
            host=settings.redis_host,
            port=settings.redis_port,
            db=settings.redis_db,
            password=settings.redis_password,
            decode_responses=True  # 自动解码为字符串
        )
        self._test_connection()
    
    def _test_connection(self):
        """测试Redis连接"""
        try:
            self.client.ping()
            logger.info("Redis连接成功")
        except redis.ConnectionError as e:
            logger.error(f"Redis连接失败: {e}")
            raise
    
    # 任务队列相关方法
    def push_task(self, queue_name: str, task_data: Dict[str, Any]) -> str:
        """推送任务到队列"""
        task_id = task_data.get("task_id")
        if not task_id:
            raise ValueError("任务必须包含task_id")
        
        # 序列化任务数据
        task_json = json.dumps(task_data, ensure_ascii=False)
        
        # 推送到队列
        self.client.lpush(queue_name, task_json)
        
        # 同时保存任务元数据
        self.client.hset(f"task:{task_id}", mapping={
            "status": "pending",
            "created_at": task_data.get("created_at", ""),
            "queue": queue_name
        })
        
        # 设置过期时间(防止任务永远挂起)
        self.client.expire(f"task:{task_id}", settings.task_timeout * 2)
        
        logger.debug(f"任务 {task_id} 已推送到队列 {queue_name}")
        return task_id
    
    def pop_task(self, queue_name: str, timeout: int = 0) -> Optional[Dict[str, Any]]:
        """从队列弹出任务"""
        if timeout > 0:
            # 阻塞式弹出
            result = self.client.brpop(queue_name, timeout=timeout)
            if result:
                _, task_json = result
            else:
                return None
        else:
            # 非阻塞式弹出
            task_json = self.client.rpop(queue_name)
            if not task_json:
                return None
        
        task_data = json.loads(task_json)
        task_id = task_data.get("task_id")
        
        # 更新任务状态
        if task_id:
            self.client.hset(f"task:{task_id}", "status", "processing")
        
        return task_data
    
    def get_task_status(self, task_id: str) -> Dict[str, Any]:
        """获取任务状态"""
        task_key = f"task:{task_id}"
        if not self.client.exists(task_key):
            return {"status": "not_found"}
        
        task_info = self.client.hgetall(task_key)
        
        # 获取结果(如果存在)
        result_key = f"result:{task_id}"
        if self.client.exists(result_key):
            result = self.client.get(result_key)
            task_info["result"] = result
        
        return task_info
    
    def save_task_result(self, task_id: str, result: str, status: str = "completed"):
        """保存任务结果"""
        # 保存结果
        result_key = f"result:{task_id}"
        self.client.setex(result_key, 3600, result)  # 1小时过期
        
        # 更新任务状态
        task_key = f"task:{task_id}"
        self.client.hset(task_key, "status", status)
        
        logger.debug(f"任务 {task_id} 结果已保存,状态: {status}")
    
    # 队列监控相关方法
    def get_queue_length(self, queue_name: str) -> int:
        """获取队列长度"""
        return self.client.llen(queue_name)
    
    def get_all_queue_lengths(self) -> Dict[str, int]:
        """获取所有队列长度"""
        # 这里假设我们只有几个固定队列
        queues = ["glm_chat_queue", "glm_summary_queue", "glm_code_queue"]
        return {queue: self.get_queue_length(queue) for queue in queues}
    
    def get_processing_tasks(self) -> List[str]:
        """获取正在处理的任务列表"""
        # 查找所有状态为processing的任务
        processing_tasks = []
        for key in self.client.scan_iter("task:*"):
            status = self.client.hget(key, "status")
            if status == "processing":
                task_id = key.split(":")[1]
                processing_tasks.append(task_id)
        
        return processing_tasks

# 全局Redis客户端实例
redis_client = RedisClient()

4.3 FastAPI API端点实现

现在实现API端点,这是用户直接交互的部分:

# app/api/v1/endpoints.py
from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends
from typing import Dict, Any
import uuid
import time
from datetime import datetime

from app.api.v1.schemas import ChatRequest, TaskResponse, TaskResult
from app.core.redis_client import redis_client
from app.core.monitoring import monitor_queue_length
from app.utils.text_utils import estimate_processing_time

router = APIRouter()

@router.post("/chat", response_model=TaskResponse)
async def create_chat_task(
    request: ChatRequest,
    background_tasks: BackgroundTasks
):
    """
    创建聊天任务
    
    用户提交长文本和问题,立即返回任务ID,后台异步处理
    """
    # 生成唯一任务ID
    task_id = str(uuid.uuid4())
    
    # 估算处理时间(基于文本长度)
    estimated_time = estimate_processing_time(len(request.text))
    
    # 构建任务数据
    task_data = {
        "task_id": task_id,
        "text": request.text,
        "question": request.question,
        "max_length": request.max_length,
        "temperature": request.temperature,
        "created_at": datetime.now().isoformat(),
        "estimated_time": estimated_time
    }
    
    # 根据文本长度选择队列(优化策略)
    text_length = len(request.text)
    if text_length > 100000:  # 超过10万字
        queue_name = "glm_long_queue"
    elif text_length > 10000:  # 1万-10万字
        queue_name = "glm_medium_queue"
    else:  # 1万字以内
        queue_name = "glm_short_queue"
    
    # 推送到Redis队列
    try:
        redis_client.push_task(queue_name, task_data)
        
        # 获取队列位置
        queue_length = redis_client.get_queue_length(queue_name)
        
        # 触发队列监控(后台执行)
        background_tasks.add_task(monitor_queue_length, queue_name, queue_length)
        
        return TaskResponse(
            task_id=task_id,
            status="pending",
            message="任务已接收,正在排队处理",
            estimated_time=estimated_time,
            queue_position=queue_length
        )
        
    except Exception as e:
        logger.error(f"任务创建失败: {e}")
        raise HTTPException(status_code=500, detail="任务创建失败")

@router.get("/task/{task_id}", response_model=TaskResult)
async def get_task_result(task_id: str):
    """
    查询任务结果
    
    用户可以通过任务ID查询处理进度和结果
    """
    task_info = redis_client.get_task_status(task_id)
    
    if not task_info or task_info.get("status") == "not_found":
        raise HTTPException(status_code=404, detail="任务不存在")
    
    # 构建响应
    status = task_info.get("status", "unknown")
    result = task_info.get("result")
    error = task_info.get("error")
    
    # 计算处理时间(如果已完成)
    processing_time = None
    if status == "completed" and "started_at" in task_info and "completed_at" in task_info:
        # 这里需要从task_info中解析时间,实际实现会更复杂
        pass
    
    return TaskResult(
        task_id=task_id,
        status=status,
        result=result,
        error=error,
        created_at=datetime.fromisoformat(task_info.get("created_at", datetime.now().isoformat())),
        processing_time=processing_time
    )

@router.get("/queue/status")
async def get_queue_status():
    """
    获取队列状态
    
    监控接口,可以查看各队列积压情况
    """
    queue_lengths = redis_client.get_all_queue_lengths()
    processing_tasks = redis_client.get_processing_tasks()
    
    return {
        "queues": queue_lengths,
        "processing_tasks": len(processing_tasks),
        "timestamp": datetime.now().isoformat()
    }
# app/utils/text_utils.py
def estimate_processing_time(text_length: int) -> int:
    """
    估算处理时间(秒)
    
    基于文本长度和模型性能的简单估算
    实际项目中应该基于历史数据进行更准确的预测
    """
    if text_length < 1000:
        return 5  # 5秒
    elif text_length < 10000:
        return 15  # 15秒
    elif text_length < 100000:
        return 60  # 1分钟
    elif text_length < 500000:
        return 180  # 3分钟
    else:
        return 300  # 5分钟(最大)

5. 核心实现二:智能推理Worker与批处理

API网关只是入口,真正的核心是推理Worker。这是系统中最复杂的部分,需要精心设计。

5.1 模型Worker单例模式

GLM-4-9B-Chat-1M模型加载很重,必须确保单例:

# app/workers/glm_worker.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from typing import Dict, Any, Optional, List
import time
from loguru import logger
from app.config import settings

class GLMModelWorker:
    """GLM模型Worker(单例)"""
    
    _instance = None
    _model = None
    _tokenizer = None
    _device = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(GLMModelWorker, cls).__new__(cls)
            cls._instance._initialize()
        return cls._instance
    
    def _initialize(self):
        """初始化模型和tokenizer"""
        logger.info("正在加载GLM-4-9B-Chat-1M模型...")
        
        start_time = time.time()
        
        # 配置4-bit量化
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.float16,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4"
        )
        
        # 加载模型(从缓存或下载)
        self._model = AutoModelForCausalLM.from_pretrained(
            settings.model_name,
            quantization_config=bnb_config,
            device_map="auto",  # 自动分配到GPU
            trust_remote_code=True,
            cache_dir=settings.model_cache_dir
        )
        
        # 加载tokenizer
        self._tokenizer = AutoTokenizer.from_pretrained(
            settings.model_name,
            trust_remote_code=True,
            cache_dir=settings.model_cache_dir
        )
        
        # 获取设备信息
        self._device = self._model.device
        
        load_time = time.time() - start_time
        logger.info(f"模型加载完成,耗时: {load_time:.2f}秒")
        logger.info(f"模型设备: {self._device}")
        logger.info(f"模型参数: {self._model.config}")
    
    @property
    def model(self):
        return self._model
    
    @property
    def tokenizer(self):
        return self._tokenizer
    
    @property
    def device(self):
        return self._device
    
    def generate(self, text: str, question: Optional[str] = None, 
                 max_length: int = 512, temperature: float = 0.7) -> str:
        """
        生成回复
        
        支持两种模式:
        1. 纯文本生成:question为None时,直接续写text
        2. 问答模式:question不为None时,基于text回答question
        """
        try:
            # 构建输入
            if question:
                # 问答模式
                prompt = f"文本:{text}\n\n问题:{question}\n\n回答:"
            else:
                # 续写模式
                prompt = text
            
            # Tokenize
            inputs = self._tokenizer(prompt, return_tensors="pt", 
                                    truncation=True, max_length=settings.max_tokens)
            inputs = inputs.to(self._device)
            
            # 生成参数
            generate_kwargs = {
                "max_length": min(len(inputs["input_ids"][0]) + max_length, settings.max_tokens),
                "temperature": temperature,
                "do_sample": temperature > 0,
                "top_p": 0.9 if temperature > 0 else None,
                "pad_token_id": self._tokenizer.pad_token_id or self._tokenizer.eos_token_id,
            }
            
            # 生成
            with torch.no_grad():
                outputs = self._model.generate(**inputs, **generate_kwargs)
            
            # 解码
            generated_ids = outputs[0][len(inputs["input_ids"][0]):]
            response = self._tokenizer.decode(generated_ids, skip_special_tokens=True)
            
            return response.strip()
            
        except torch.cuda.OutOfMemoryError:
            logger.error("GPU显存不足")
            raise MemoryError("GPU显存不足,请减少输入文本长度或等待其他任务完成")
        except Exception as e:
            logger.error(f"生成失败: {e}")
            raise
    
    def batch_generate(self, tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        批量生成
        
        同时处理多个任务,提高GPU利用率
        注意:所有任务的max_length和temperature必须相同
        """
        if not tasks:
            return []
        
        # 检查任务参数是否一致
        first_task = tasks[0]
        max_length = first_task.get("max_length", 512)
        temperature = first_task.get("temperature", 0.7)
        
        for task in tasks[1:]:
            if task.get("max_length", 512) != max_length or task.get("temperature", 0.7) != temperature:
                raise ValueError("批量处理的任务参数必须一致")
        
        results = []
        
        try:
            # 构建批量输入
            prompts = []
            for task in tasks:
                text = task["text"]
                question = task.get("question")
                
                if question:
                    prompt = f"文本:{text}\n\n问题:{question}\n\n回答:"
                else:
                    prompt = text
                
                prompts.append(prompt)
            
            # 批量Tokenize
            inputs = self._tokenizer(prompts, return_tensors="pt", padding=True,
                                    truncation=True, max_length=settings.max_tokens)
            inputs = inputs.to(self._device)
            
            # 批量生成参数
            generate_kwargs = {
                "max_length": min(inputs["input_ids"].shape[1] + max_length, settings.max_tokens),
                "temperature": temperature,
                "do_sample": temperature > 0,
                "top_p": 0.9 if temperature > 0 else None,
                "pad_token_id": self._tokenizer.pad_token_id or self._tokenizer.eos_token_id,
            }
            
            # 批量生成
            with torch.no_grad():
                outputs = self._model.generate(**inputs, **generate_kwargs)
            
            # 批量解码
            for i, task in enumerate(tasks):
                generated_ids = outputs[i][len(inputs["input_ids"][i]):]
                response = self._tokenizer.decode(generated_ids, skip_special_tokens=True)
                
                results.append({
                    "task_id": task["task_id"],
                    "result": response.strip(),
                    "success": True
                })
            
            return results
            
        except torch.cuda.OutOfMemoryError:
            logger.error("批量处理时GPU显存不足")
            # 回退到单任务处理
            return self._fallback_single_process(tasks)
        except Exception as e:
            logger.error(f"批量生成失败: {e}")
            raise
    
    def _fallback_single_process(self, tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """批量处理失败时,回退到单任务处理"""
        results = []
        for task in tasks:
            try:
                result = self.generate(
                    text=task["text"],
                    question=task.get("question"),
                    max_length=task.get("max_length", 512),
                    temperature=task.get("temperature", 0.7)
                )
                results.append({
                    "task_id": task["task_id"],
                    "result": result,
                    "success": True
                })
            except Exception as e:
                logger.error(f"单任务处理失败 {task['task_id']}: {e}")
                results.append({
                    "task_id": task["task_id"],
                    "result": None,
                    "success": False,
                    "error": str(e)
                })
        
        return results

5.2 批处理管理器

批处理管理器负责智能地组合任务,最大化GPU利用率:

# app/workers/batch_manager.py
import time
import threading
from typing import Dict, Any, List, Optional
from queue import Queue, Empty
from collections import defaultdict
from loguru import logger

from app.workers.glm_worker import GLMModelWorker
from app.core.redis_client import redis_client
from app.config import settings

class BatchManager:
    """批处理管理器"""
    
    def __init__(self, queue_names: List[str]):
        self.queue_names = queue_names
        self.model_worker = GLMModelWorker()
        
        # 批处理队列
        self.batch_queues = {
            "short": Queue(),  # 短文本队列(<1万字)
            "medium": Queue(),  # 中文本队列(1万-10万字)
            "long": Queue()     # 长文本队列(>10万字)
        }
        
        # 批处理配置
        self.batch_configs = {
            "short": {
                "max_batch_size": 8,      # 最大批大小
                "max_wait_time": 2.0,     # 最大等待时间(秒)
                "min_batch_size": 2       # 最小批大小
            },
            "medium": {
                "max_batch_size": 4,
                "max_wait_time": 5.0,
                "min_batch_size": 1
            },
            "long": {
                "max_batch_size": 1,      # 长文本通常单处理
                "max_wait_time": 0.0,
                "min_batch_size": 1
            }
        }
        
        # 监控指标
        self.metrics = {
            "total_processed": 0,
            "batch_processed": 0,
            "avg_batch_size": 0.0,
            "avg_processing_time": 0.0
        }
        
        # 锁和事件
        self.lock = threading.Lock()
        self.stop_event = threading.Event()
        
        # 启动工作线程
        self.worker_threads = []
        for queue_type in ["short", "medium", "long"]:
            thread = threading.Thread(
                target=self._batch_worker,
                args=(queue_type,),
                daemon=True,
                name=f"BatchWorker-{queue_type}"
            )
            thread.start()
            self.worker_threads.append(thread)
        
        logger.info(f"批处理管理器已启动,监控队列: {queue_names}")
    
    def _batch_worker(self, queue_type: str):
        """批处理工作线程"""
        config = self.batch_configs[queue_type]
        batch_queue = self.batch_queues[queue_type]
        
        logger.info(f"批处理工作线程 {queue_type} 已启动")
        
        while not self.stop_event.is_set():
            try:
                # 收集一批任务
                batch = self._collect_batch(batch_queue, config)
                
                if not batch:
                    # 队列为空,短暂休眠
                    time.sleep(0.1)
                    continue
                
                # 处理批次
                self._process_batch(batch, queue_type)
                
            except Exception as e:
                logger.error(f"批处理工作线程 {queue_type} 出错: {e}")
                time.sleep(1)  # 出错后短暂休眠
    
    def _collect_batch(self, batch_queue: Queue, config: Dict[str, Any]) -> List[Dict[str, Any]]:
        """收集一批任务"""
        batch = []
        start_time = time.time()
        
        while len(batch) < config["max_batch_size"]:
            try:
                # 尝试从队列获取任务(非阻塞)
                task = batch_queue.get_nowait()
                batch.append(task)
                
                # 如果达到最小批大小且等待时间超过最大等待时间,立即处理
                if len(batch) >= config["min_batch_size"]:
                    if time.time() - start_time >= config["max_wait_time"]:
                        break
                
            except Empty:
                # 队列为空
                if len(batch) > 0:
                    # 有任务但队列已空,检查是否满足条件
                    if len(batch) >= config["min_batch_size"]:
                        break
                    elif time.time() - start_time >= config["max_wait_time"]:
                        break
                    else:
                        # 继续等待
                        time.sleep(0.01)
                else:
                    # 完全为空,退出循环
                    break
        
        return batch
    
    def _process_batch(self, batch: List[Dict[str, Any]], queue_type: str):
        """处理一批任务"""
        start_time = time.time()
        batch_size = len(batch)
        
        logger.info(f"开始处理批次,类型: {queue_type}, 大小: {batch_size}")
        
        try:
            # 批量生成
            results = self.model_worker.batch_generate(batch)
            
            # 保存结果
            for result in results:
                task_id = result["task_id"]
                
                if result["success"]:
                    redis_client.save_task_result(task_id, result["result"])
                else:
                    redis_client.save_task_result(
                        task_id, 
                        "", 
                        status="failed"
                    )
                    # 记录错误详情
                    redis_client.client.hset(
                        f"task:{task_id}", 
                        "error", 
                        result.get("error", "未知错误")
                    )
            
            # 更新指标
            processing_time = time.time() - start_time
            
            with self.lock:
                self.metrics["total_processed"] += batch_size
                self.metrics["batch_processed"] += 1
                
                # 更新平均批大小
                old_avg = self.metrics["avg_batch_size"]
                new_avg = old_avg + (batch_size - old_avg) / self.metrics["batch_processed"]
                self.metrics["avg_batch_size"] = new_avg
                
                # 更新平均处理时间
                old_time_avg = self.metrics["avg_processing_time"]
                new_time_avg = old_time_avg + (processing_time - old_time_avg) / self.metrics["batch_processed"]
                self.metrics["avg_processing_time"] = new_time_avg
            
            logger.info(f"批次处理完成,大小: {batch_size}, 耗时: {processing_time:.2f}秒")
            
        except Exception as e:
            logger.error(f"批次处理失败: {e}")
            
            # 批次处理失败,回退到单任务处理
            for task in batch:
                try:
                    result = self.model_worker.generate(
                        text=task["text"],
                        question=task.get("question"),
                        max_length=task.get("max_length", 512),
                        temperature=task.get("temperature", 0.7)
                    )
                    redis_client.save_task_result(task["task_id"], result)
                except Exception as task_error:
                    logger.error(f"单任务处理失败 {task['task_id']}: {task_error}")
                    redis_client.save_task_result(
                        task["task_id"], 
                        "", 
                        status="failed"
                    )
    
    def add_task(self, task: Dict[str, Any]):
        """添加任务到合适的批处理队列"""
        text_length = len(task["text"])
        
        if text_length > 100000:  # >10万字
            queue_type = "long"
        elif text_length > 10000:  # 1万-10万字
            queue_type = "medium"
        else:  # <1万字
            queue_type = "short"
        
        self.batch_queues[queue_type].put(task)
    
    def get_metrics(self) -> Dict[str, Any]:
        """获取监控指标"""
        with self.lock:
            return self.metrics.copy()
    
    def stop(self):
        """停止批处理管理器"""
        self.stop_event.set()
        
        for thread in self.worker_threads:
            thread.join(timeout=5.0)
        
        logger.info("批处理管理器已停止")

5.3 Celery Worker集成

虽然我们有自己的批处理管理器,但Celery提供了更好的任务管理和监控:

# app/tasks.py
from celery import Celery
from loguru import logger
from app.config import settings
from app.workers.glm_worker import GLMModelWorker
from app.core.redis_client import redis_client

# 创建Celery应用
celery_app = Celery(
    "glm_tasks",
    broker=f"redis://{settings.redis_host}:{settings.redis_port}/{settings.redis_db}",
    backend=f"redis://{settings.redis_host}:{settings.redis_port}/{settings.redis_db}"
)

# 配置Celery
celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=settings.task_timeout,
    task_soft_time_limit=settings.task_timeout - 30,
    worker_max_tasks_per_child=100,  # 每处理100个任务重启worker,防止内存泄漏
    worker_prefetch_multiplier=1,  # 一次只预取1个任务,适合长任务
)

@celery_app.task(bind=True, max_retries=3)
def process_chat_task(self, task_data: dict):
    """处理聊天任务(Celery任务)"""
    task_id = task_data.get("task_id")
    text = task_data.get("text", "")
    question = task_data.get("question")
    max_length = task_data.get("max_length", 512)
    temperature = task_data.get("temperature", 0.7)
    
    logger.info(f"开始处理任务 {task_id}")
    
    try:
        # 获取模型Worker
        model_worker = GLMModelWorker()
        
        # 生成回复
        result = model_worker.generate(
            text=text,
            question=question,
            max_length=max_length,
            temperature=temperature
        )
        
        # 保存结果
        redis_client.save_task_result(task_id, result)
        
        logger.info(f"任务 {task_id} 处理完成")
        return {"task_id": task_id, "success": True}
        
    except MemoryError as e:
        logger.error(f"任务 {task_id} 内存不足: {e}")
        
        # 内存不足时重试(降低max_length)
        if self.request.retries < self.max_retries:
            # 降低生成长度
            new_max_length = max_length // 2
            if new_max_length < 64:
                new_max_length = 64
            
            logger.info(f"任务 {task_id} 重试,降低max_length到 {new_max_length}")
            
            # 更新任务数据
            task_data["max_length"] = new_max_length
            
            # 重试
            raise self.retry(exc=e, countdown=10)
        else:
            # 重试次数用尽
            redis_client.save_task_result(task_id, "", status="failed")
            redis_client.client.hset(f"task:{task_id}", "error", "内存不足,请减少输入文本长度")
            return {"task_id": task_id, "success": False, "error": str(e)}
            
    except Exception as e:
        logger.error(f"任务 {task_id} 处理失败: {e}")
        
        if self.request.retries < self.max_retries:
            # 重试
            raise self.retry(exc=e, countdown=30)
        else:
            # 重试次数用尽
            redis_client.save_task_result(task_id, "", status="failed")
            redis_client.client.hset(f"task:{task_id}", "error", str(e))
            return {"task_id": task_id, "success": False, "error": str(e)}

@celery_app.task
def batch_process_tasks(tasks_data: list):
    """批量处理任务(Celery任务)"""
    if not tasks_data:
        return []
    
    logger.info(f"开始批量处理 {len(tasks_data)} 个任务")
    
    try:
        # 获取模型Worker
        model_worker = GLMModelWorker()
        
        # 批量生成
        results = model_worker.batch_generate(tasks_data)
        
        # 保存结果
        for result in results:
            task_id = result["task_id"]
            
            if result["success"]:
                redis_client.save_task_result(task_id, result["result"])
            else:
                redis_client.save_task_result(task_id, "", status="failed")
                if "error" in result:
                    redis_client.client.hset(f"task:{task_id}", "error", result["error"])
        
        logger.info(f"批量处理完成,成功: {len([r for r in results if r['success']])}")
        return [r["task_id"] for r in results if r["success"]]
        
    except Exception as e:
        logger.error(f"批量处理失败: {e}")
        
        # 批量失败,回退到单任务处理
        success_tasks = []
        for task_data in tasks_data:
            try:
                # 使用Celery子任务
                result = process_chat_task.apply_async(args=[task_data])
                success_tasks.append(task_data["task_id"])
            except Exception as task_error:
                logger.error(f"单任务处理失败 {task_data['task_id']}: {task_error}")
        
        return success_tasks

6. 核心实现三:队列积压告警机制

队列积压是异步系统的“隐形杀手”。我们需要在问题发生前预警。

6.1 智能监控系统

# app/core/monitoring.py
import time
import threading
from typing import Dict, Any, List, Callable
from datetime import datetime, timedelta
from loguru import logger
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

from app.core.redis_client import redis_client
from app.config import settings

class QueueMonitor:
    """队列监控器"""
    
    def __init__(self):
        self.monitoring = False
        self.monitor_thread = None
        
        # 告警历史(防止重复告警)
        self.alert_history = {}
        
        # 告警规则
        self.alert_rules = {
            "queue_length": {
                "warning": settings.queue_warning_threshold,
                "critical": settings.queue_critical_threshold,
                "duration": 60  # 持续60秒才触发
            },
            "processing_time": {
                "warning": 300,  # 5分钟
                "critical": 600  # 10分钟
            },
            "error_rate": {
                "warning": 0.1,  # 10%错误率
                "critical": 0.3   # 30%错误率
            }
        }
        
        # 监控数据缓存
        self.metrics_cache = {
            "queue_lengths": {},
            "processing_times": [],
            "error_counts": {"hour": 0, "total": 0},
            "success_counts": {"hour": 0, "total": 0}
        }
        
        # 告警接收者
        self.alert_receivers = []  # 实际项目中从配置读取
        
        logger.info("队列监控器初始化完成")
    
    def start_monitoring(self):
        """开始监控"""
        if self.monitoring:
            logger.warning("监控已在运行中")
            return
        
        self.monitoring = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop,
            daemon=True,
            name="QueueMonitor"
        )
        self.monitor_thread.start()
        
        logger.info("队列监控已启动")
    
    def stop_monitoring(self):
        """停止监控"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join(timeout=5.0)
        
        logger.info("队列监控已停止")
    
    def _monitor_loop(self):
        """监控循环"""
        check_interval = settings.check_interval
        
        while self.monitoring:
            try:
                # 收集指标
                self._collect_metrics()
                
                # 检查告警
                self._check_alerts()
                
                # 清理旧数据
                self._cleanup_old_data()
                
                # 记录监控日志(可选)
                if logger.level("DEBUG").no >= logger._core.min_level:
                    self._log_metrics()
                
            except Exception as e:
                logger.error(f"监控循环出错: {e}")
            
            # 等待下一次检查
            time.sleep(check_interval)
    
    def _collect_metrics(self):
        """收集监控指标"""
        # 队列长度
        queue_lengths = redis_client.get_all_queue_lengths()
        self.metrics_cache["queue_lengths"] = queue_lengths
        
        # 处理中的任务
        processing_tasks = redis_client.get_processing_tasks()
        
        # 计算任务处理时间
        current_time = time.time()
        for task_id in processing_tasks:
            task_info = redis_client.client.hgetall(f"task:{task_id}")
            if "started_at" in task_info:
                try:
                    started_at = datetime.fromisoformat(task_info["started_at"])
                    processing_time = (datetime.now() - started_at).total_seconds()
                    
                    # 只记录超过30秒的任务
                    if processing_time > 30:
                        self.metrics_cache["processing_times"].append({
                            "task_id": task_id,
                            "processing_time": processing_time,
                            "timestamp": current_time
                        })
                except (ValueError, KeyError):
                    pass
        
        # 错误率统计(简化版,实际应从任务结果统计)
        # 这里只是示例,实际项目需要更完善的统计
    
    def _check_alerts(self):
        """检查告警条件"""
        current_time = time.time()
        
        # 检查队列积压告警
        for queue_name, length in self.metrics_cache["queue_lengths"].items():
            alert_key = f"queue_{queue_name}"
            
            if length >= self.alert_rules["queue_length"]["critical"]:
                # 严重告警
                if self._should_alert(alert_key, "critical", current_time):
                    self._send_alert(
                        level="critical",
                        title=f"队列严重积压告警 - {queue_name}",
                        message=f"队列 {queue_name} 积压任务数: {length},超过临界阈值 {self.alert_rules['queue_length']['critical']}",
                        data={"queue": queue_name, "length": length}
                    )
                    
            elif length >= self.alert_rules["queue_length"]["warning"]:
                # 警告
                if self._should_alert(alert_key, "warning", current_time):
                    self._send_alert(
                        level="warning",
                        title=f"队列积压警告 - {queue_name}",
                        message=f"队列 {queue_name} 积压任务数: {length},超过警告阈值 {self.alert_rules['queue_length']['warning']}",
                        data={"queue": queue_name, "length": length}
                    )
        
        # 检查处理时间告警
        if self.metrics_cache["processing_times"]:
            avg_processing_time = sum(item["processing_time"] for item in self.metrics_cache["processing_times"]) / len(self.metrics_cache["processing_times"])
            
            if avg_processing_time > self.alert_rules["processing_time"]["critical"]:
                alert_key = "processing_time_critical"
                if self._should_alert(alert_key, "critical", current_time):
                    self._send_alert(
                        level="critical",
                        title="任务处理时间过长",
                        message=f"平均任务处理时间: {avg_processing_time:.1f}秒,超过临界阈值 {self.alert_rules['processing_time']['critical']}秒",
                        data={"avg_processing_time": avg_processing_time}
                    )
    
    def _should_alert(self, alert_key: str, level: str, current_time: float) -> bool:
        """判断是否应该发送告警(防止重复告警)"""
        if alert_key not in self.alert_history:
            self.alert_history[alert_key] = {
                "last_alert_time": 0,
                "last_level": None,
                "alert_count": 0
            }
        
        history = self.alert_history[alert_key]
        
        # 计算时间间隔
        time_since_last_alert = current_time - history["last_alert_time"]
        
        # 不同级别的告警间隔不同
        if level == "critical":
            min_interval = 300  # 5分钟
        else:
            min_interval = 900  # 15分钟
        
        # 如果上次告警时间太近,且级别相同,则抑制告警
        if time_since_last_alert < min_interval and history["last_level"] == level:
            return False
        
        # 更新历史
        history["last_alert_time"] = current_time
        history["last_level"] = level
        history["alert_count"] += 1
        
        return True
    
    def _send_alert(self, level: str, title: str, message: str, data: Dict[str, Any]):
        """发送告警"""
        # 这里实现告警发送逻辑
        # 实际项目中可能发送到:邮件、Slack、钉钉、企业微信等
        
        logger.warning(f"告警 [{level}]: {title} - {message}")
        
        # 示例:打印到日志
        alert_log = {
            "timestamp": datetime.now().isoformat(),
            "level": level,
            "title": title,
            "message": message,
            "data": data
        }
        
        logger.warning(f"告警详情: {alert_log}")
        
        # 实际发送告警的代码(示例)
        # self._send_email_alert(level, title, message)
        # self._send_slack_alert(level, title, message)
    
    def _send_email_alert(self, level: str, title: str, message: str):
        """发送邮件告警(示例)"""
        try:
            # 实际项目中需要配置SMTP服务器
            smtp_server = "smtp.example.com"
            smtp_port = 587
            sender_email = "alerts@example.com"
            sender_password = "password"
            
            # 创建邮件
            msg = MIMEMultipart()
            msg["From"] = sender_email
            msg["To"] = ", ".join(self.alert_receivers)
            msg["Subject"] = f"[{level.upper()}] {title}"
            
            # 邮件正文
            body = f"""
            告警级别: {level}
            时间: {datetime.now().isoformat()}
            标题: {title}
            详情: {message}
            
            请及时处理!
            """
            
            msg.attach(MIMEText(body, "plain"))
            
            # 发送邮件
            with smtplib.SMTP(smtp_server, smtp_port) as server:
                server.starttls()
                server.login(sender_email, sender_password)
                server.send_message(msg)
                
            logger.info(f"邮件告警已发送: {title}")
            
        except Exception as e:
            logger.error(f"发送邮件告警失败: {e}")
    
    def _cleanup_old_data(self):
        """清理旧数据"""
        current_time = time.time()
        cutoff_time = current_time - 3600  # 保留1小时内的数据
        
        # 清理处理时间数据
        self.metrics_cache["processing_times"] = [
            item for item in self.metrics_cache["processing_times"]
            if item["timestamp"] > cutoff_time
        ]
        
        # 清理告警历史(保留24小时)
        alert_cutoff = current_time - 86400
        keys_to_delete = []
        for key, history in self.alert_history.items():
            if history["last_alert_time"] < alert_cutoff and history["alert_count"] < 5:
                keys_to_delete.append(key)
        
        for key in keys_to_delete:
            del self.alert_history[key]
    
    def _log_metrics(self):
        """记录监控指标到日志"""
        queue_lengths = self.metrics_cache["queue_lengths"]
        total_queued = sum(queue_lengths.values())
        
        logger.debug(f"队列监控 - 总排队任务: {total_queued}, 各队列: {queue_lengths}")
    
    def get_status(self) -> Dict[str, Any]:
        """获取监控状态"""
        return {
            "monitoring": self.monitoring,
            "queue_lengths": self.metrics_cache["queue_lengths"],
            "alert_history_size": len(self.alert_history),
            "processing_tasks_count": len(self.metrics_cache["processing_times"]),
            "timestamp": datetime.now().isoformat()
        }

# 全局监控器实例
queue_monitor = QueueMonitor()

def monitor_queue_length(queue_name: str, length: int):
    """监控队列长度(供API调用)"""
    # 这里可以添加实时监控逻辑
    # 例如:如果长度突然增加,立即记录日志
    if length > settings.queue_warning_threshold:
        logger.warning(f"队列 {queue_name} 长度异常: {length}")

6.2 可视化监控面板(可选)

我们可以提供一个简单的Web监控面板:

# app/api/v1/monitor_endpoints.py
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
from datetime import datetime
import json

from app.core.monitoring import queue_monitor
from app.core.redis_client import redis_client

router = APIRouter()

@router.get("/monitor/dashboard", response_class=HTMLResponse)
async def get_monitor_dashboard():
    """监控仪表板"""
    # 获取监控数据
    queue_status = redis_client.get_all_queue_lengths()
    monitor_status = queue_monitor.get_status()
    
    # 构建HTML
    html = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>GLM异步API监控面板</title>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .dashboard {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }}
            .card {{ background: #f5f5f5; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
            .card h3 {{ margin-top: 0; }}
            .queue-item {{ display: flex; justify-content: space-between; margin: 10px 0; padding: 10px; background: white; border-radius: 4px; }}
            .queue-length {{ font-weight: bold; }}
            .warning {{ color: orange; }}
            .critical {{ color: red; }}
            .normal {{ color: green; }}
        </style>
        <meta http-equiv="refresh" content="10">
    </head>
    <body>
        <h1>GLM异步API监控面板</h1>
        <p>更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
        
        <div class="dashboard">
            <div class="card">
                <h3>队列状态</h3>
                {"".join([
                    f'<div class="queue-item">'
                    f'<span>{queue}</span>'
                    f'<span class="queue-length {get_queue_class(length)}">{length}</span>'
                    f'</div>'
                    for queue, length in queue_status.items()
                ])}
                <div class="queue-item">
                    <strong>总计</strong>
                    <strong>{sum(queue_status.values())}</strong>
                </div>
            </div>
            
            <div class="card">
                <h3>系统状态</h3>
                <p>监控状态: {'运行中' if monitor_status['monitoring'] else '已停止'}</p>
                <p>告警记录数: {monitor_status['alert_history_size']}</p>
                <p>长时间处理任务: {monitor_status['processing_tasks_count']}</p>
            </div>
            
            <div class="card">
                <h3>操作</h3>
                <button onclick="location.reload()">刷新</button>
                <button onclick="fetch('/api/v1/queue/status').then(r => r.json()).then(console.log)">
                    查看详细状态
                </button>
            </div>
        </div>
        
        <script>
        function getQueueClass(length) {{
            if (length > 100) return 'critical';
            if (length > 50) return 'warning';
            return 'normal';
        }}
        </script>
    </body>
    </html>
    """
    
    return HTMLResponse(content=html)

def get_queue_class(length: int) -> str:
    """获取队列长度对应的CSS类"""
    if length > settings.queue_critical_threshold:
        return "critical"
    elif length > settings.queue_warning_threshold:
        return "warning"
    else:
        return "normal"

7. 部署与运维指南

7.1 Docker容器化部署

为了简化部署,我们使用Docker Compose:

# docker-compose.yml
version: '3.8'

services:
  # Redis服务
  redis:
    image: redis:7-alpine
    container_name: glm-redis
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    restart: unless-stopped
  
  # API服务
  api:
    build: .
    container_name: glm-api
    ports:
      - "8080:8080"
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - MODEL_NAME=THUDM/glm-4-9b-chat-1M
    volumes:
      - ./model_cache:/app/model_cache
      - ./logs:/app/logs
    command: >
      sh -c "uvicorn app.main:app --host 0.0.0.0 --port 8080 --workers 4"
    restart: unless-stopped
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
  
  # Celery Worker服务
  celery-worker:
    build: .
    container_name: glm-celery-worker
    depends_on:
      - redis
      - api
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - MODEL_NAME=THUDM/glm-4-9b-chat-1M
    volumes:
      - ./model_cache:/app/model_cache
      - ./logs:/app/logs
    command: >
      sh -c "celery -A app.tasks.celery_app worker --loglevel=info --concurrency=2"
    restart: unless-stopped
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
  
  # Celery Beat服务(定时任务)
  celery-beat:
    build: .
    container_name: glm-celery-beat
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_P
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐