GLM-4-9B-Chat-1M基础教程:Chainlit消息历史持久化到PostgreSQL完整配置示例

你是不是也遇到过这样的问题:用Chainlit搭建了一个很酷的AI对话应用,每次重启服务后,之前的聊天记录全都没了?用户得从头开始,体验大打折扣。

今天我就来分享一个实用解决方案——把Chainlit的对话历史持久化存储到PostgreSQL数据库。这样不仅能保存完整的聊天记录,还能方便地进行数据分析和查询。我会手把手带你完成整个配置过程,从环境准备到代码实现,保证你能跟着做出来。

1. 环境准备与项目搭建

在开始之前,我们需要先确保环境都准备好了。这个教程假设你已经部署了GLM-4-9B-Chat-1M模型,并且能用Chainlit进行基本的对话调用。

1.1 检查现有环境

首先,登录到你的服务器,确认模型服务正常运行:

# 查看模型服务日志
cat /root/workspace/llm.log

如果看到类似下面的输出,说明模型部署成功了:

INFO:     Started server process [1234]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000

1.2 安装必要的依赖

我们需要安装几个关键的Python包。创建一个新的虚拟环境是个好习惯:

# 创建并激活虚拟环境
python -m venv chat_env
source chat_env/bin/activate

# 安装核心依赖
pip install chainlit==1.0.0
pip install psycopg2-binary
pip install sqlalchemy
pip install openai

这里简单说明一下每个包的作用:

  • chainlit: 我们的对话前端框架
  • psycopg2-binary: PostgreSQL数据库的Python驱动
  • sqlalchemy: 数据库ORM工具,方便操作
  • openai: 用于调用GLM-4-9B-Chat-1M的API客户端

1.3 准备PostgreSQL数据库

如果你还没有PostgreSQL数据库,可以快速安装一个:

# Ubuntu/Debian系统
sudo apt update
sudo apt install postgresql postgresql-contrib

# 启动服务
sudo systemctl start postgresql
sudo systemctl enable postgresql

# 创建数据库和用户
sudo -u postgres psql

在PostgreSQL命令行中执行:

-- 创建数据库
CREATE DATABASE chainlit_chat;

-- 创建用户
CREATE USER chat_user WITH PASSWORD 'your_secure_password';

-- 授权
GRANT ALL PRIVILEGES ON DATABASE chainlit_chat TO chat_user;

-- 退出
\q

2. 配置Chainlit持久化存储

现在进入核心部分——配置Chainlit使用PostgreSQL存储消息历史。

2.1 创建数据库模型

首先,我们需要定义数据库表结构。创建一个名为database.py的文件:

from sqlalchemy import create_engine, Column, String, Text, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import uuid

# 创建基类
Base = declarative_base()

class ChatSession(Base):
    """聊天会话表"""
    __tablename__ = 'chat_sessions'
    
    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    user_id = Column(String(100), nullable=False)
    session_name = Column(String(200), default="新对话")
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    def __repr__(self):
        return f"<ChatSession(id={self.id}, user={self.user_id})>"

class ChatMessage(Base):
    """聊天消息表"""
    __tablename__ = 'chat_messages'
    
    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    session_id = Column(String(36), nullable=False, index=True)
    role = Column(String(20), nullable=False)  # user, assistant, system
    content = Column(Text, nullable=False)
    metadata = Column(JSON, default=dict)  # 存储额外信息,如token数、生成时间等
    created_at = Column(DateTime, default=datetime.utcnow)
    
    def __repr__(self):
        return f"<ChatMessage(role={self.role}, content={self.content[:50]}...)>"

# 数据库连接配置
DATABASE_URL = "postgresql://chat_user:your_secure_password@localhost:5432/chainlit_chat"

# 创建引擎和会话
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def init_db():
    """初始化数据库,创建表"""
    Base.metadata.create_all(bind=engine)
    print("数据库表创建成功!")

def get_db():
    """获取数据库会话"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

2.2 创建Chainlit持久化处理器

接下来,我们创建一个自定义的持久化处理器。新建persistence.py文件:

from chainlit.types import ThreadDict, MessageDict
from sqlalchemy.orm import Session
from database import ChatSession, ChatMessage, SessionLocal
from datetime import datetime
from typing import List, Optional
import json

class PostgreSQLPersistence:
    """PostgreSQL持久化处理器"""
    
    def __init__(self):
        self.db = SessionLocal()
        
    def create_thread(self, thread: ThreadDict) -> str:
        """创建新的聊天线程"""
        try:
            # 创建会话记录
            session = ChatSession(
                id=thread["id"],
                user_id=thread.get("userId", "anonymous"),
                session_name=thread.get("name", "新对话")
            )
            
            self.db.add(session)
            self.db.commit()
            
            # 保存初始消息
            if "messages" in thread:
                for msg in thread["messages"]:
                    self._save_message(thread["id"], msg)
            
            return thread["id"]
            
        except Exception as e:
            self.db.rollback()
            print(f"创建线程失败: {e}")
            raise
    
    def get_thread(self, thread_id: str) -> Optional[ThreadDict]:
        """获取聊天线程"""
        try:
            # 获取会话
            session = self.db.query(ChatSession).filter(ChatSession.id == thread_id).first()
            if not session:
                return None
            
            # 获取所有消息
            messages = self.db.query(ChatMessage).filter(
                ChatMessage.session_id == thread_id
            ).order_by(ChatMessage.created_at).all()
            
            # 转换为Chainlit格式
            message_dicts = []
            for msg in messages:
                message_dicts.append({
                    "id": msg.id,
                    "role": msg.role,
                    "content": msg.content,
                    "createdAt": msg.created_at.isoformat(),
                    "metadata": msg.metadata or {}
                })
            
            return {
                "id": session.id,
                "createdAt": session.created_at.isoformat(),
                "name": session.session_name,
                "userId": session.user_id,
                "messages": message_dicts
            }
            
        except Exception as e:
            print(f"获取线程失败: {e}")
            return None
    
    def update_thread(self, thread_id: str, thread: ThreadDict) -> bool:
        """更新聊天线程"""
        try:
            session = self.db.query(ChatSession).filter(ChatSession.id == thread_id).first()
            if not session:
                return False
            
            # 更新会话信息
            if "name" in thread:
                session.session_name = thread["name"]
            session.updated_at = datetime.utcnow()
            
            # 更新消息
            if "messages" in thread:
                # 先删除旧消息
                self.db.query(ChatMessage).filter(ChatMessage.session_id == thread_id).delete()
                
                # 保存新消息
                for msg in thread["messages"]:
                    self._save_message(thread_id, msg)
            
            self.db.commit()
            return True
            
        except Exception as e:
            self.db.rollback()
            print(f"更新线程失败: {e}")
            return False
    
    def delete_thread(self, thread_id: str) -> bool:
        """删除聊天线程"""
        try:
            # 删除消息
            self.db.query(ChatMessage).filter(ChatMessage.session_id == thread_id).delete()
            
            # 删除会话
            self.db.query(ChatSession).filter(ChatSession.id == thread_id).delete()
            
            self.db.commit()
            return True
            
        except Exception as e:
            self.db.rollback()
            print(f"删除线程失败: {e}")
            return False
    
    def list_threads(self, user_id: str, limit: int = 50) -> List[ThreadDict]:
        """列出用户的聊天线程"""
        try:
            sessions = self.db.query(ChatSession).filter(
                ChatSession.user_id == user_id
            ).order_by(ChatSession.updated_at.desc()).limit(limit).all()
            
            threads = []
            for session in sessions:
                # 获取最后一条消息作为预览
                last_msg = self.db.query(ChatMessage).filter(
                    ChatMessage.session_id == session.id
                ).order_by(ChatMessage.created_at.desc()).first()
                
                thread_dict = {
                    "id": session.id,
                    "createdAt": session.created_at.isoformat(),
                    "updatedAt": session.updated_at.isoformat(),
                    "name": session.session_name,
                    "userId": session.user_id,
                    "messageCount": self.db.query(ChatMessage).filter(
                        ChatMessage.session_id == session.id
                    ).count(),
                    "lastMessage": last_msg.content[:100] + "..." if last_msg else ""
                }
                threads.append(thread_dict)
            
            return threads
            
        except Exception as e:
            print(f"列出线程失败: {e}")
            return []
    
    def _save_message(self, session_id: str, message: MessageDict) -> None:
        """保存单条消息"""
        msg = ChatMessage(
            id=message["id"],
            session_id=session_id,
            role=message["role"],
            content=message["content"],
            metadata=message.get("metadata", {}),
            created_at=datetime.fromisoformat(message["createdAt"].replace('Z', '+00:00'))
        )
        self.db.add(msg)
        self.db.commit()
    
    def close(self):
        """关闭数据库连接"""
        self.db.close()

# 创建全局持久化实例
persistence = PostgreSQLPersistence()

2.3 配置Chainlit应用

现在我们来创建主应用文件app.py,集成GLM-4-9B-Chat-1M模型和持久化功能:

import chainlit as cl
from openai import OpenAI
import os
from persistence import persistence
from database import init_db

# 初始化数据库
init_db()

# 配置GLM-4-9B-Chat-1M API客户端
client = OpenAI(
    base_url="http://localhost:8000/v1",  # vLLM部署的地址
    api_key="not-needed"  # vLLM不需要API key
)

@cl.on_chat_start
async def on_chat_start():
    """聊天开始时的初始化"""
    # 设置持久化处理器
    cl.user_session.set("persistence", persistence)
    
    # 创建新的线程
    thread = {
        "id": cl.context.session.id,
        "userId": cl.context.session.user.id if cl.context.session.user else "anonymous",
        "name": "新对话",
        "messages": []
    }
    
    # 保存到数据库
    thread_id = persistence.create_thread(thread)
    cl.user_session.set("thread_id", thread_id)
    
    # 发送欢迎消息
    welcome_msg = "👋 你好!我是基于GLM-4-9B-Chat-1M的AI助手,支持128K上下文长度。我们的对话将会被保存,下次可以继续聊天哦!"
    await cl.Message(content=welcome_msg).send()

@cl.on_message
async def on_message(message: cl.Message):
    """处理用户消息"""
    thread_id = cl.user_session.get("thread_id")
    persistence = cl.user_session.get("persistence")
    
    # 获取历史消息
    thread = persistence.get_thread(thread_id)
    if not thread:
        await cl.Message(content="抱歉,找不到对话历史").send()
        return
    
    # 构建消息历史(限制长度,避免超过模型上下文)
    messages = []
    for msg in thread["messages"][-10:]:  # 只取最近10条消息
        messages.append({
            "role": msg["role"],
            "content": msg["content"]
        })
    
    # 添加当前用户消息
    messages.append({
        "role": "user",
        "content": message.content
    })
    
    # 调用GLM-4-9B-Chat-1M模型
    try:
        response = client.chat.completions.create(
            model="glm-4-9b-chat-1m",
            messages=messages,
            temperature=0.7,
            max_tokens=2048,
            stream=True
        )
        
        # 流式响应
        msg = cl.Message(content="")
        await msg.send()
        
        full_response = ""
        async for chunk in response:
            if chunk.choices[0].delta.content:
                token = chunk.choices[0].delta.content
                full_response += token
                await msg.stream_token(token)
        
        await msg.update()
        
        # 保存消息到数据库
        user_msg = {
            "id": f"msg_{len(thread['messages'])}_user",
            "role": "user",
            "content": message.content,
            "createdAt": message.created_at.isoformat(),
            "metadata": {}
        }
        
        assistant_msg = {
            "id": f"msg_{len(thread['messages']) + 1}_assistant",
            "role": "assistant",
            "content": full_response,
            "createdAt": cl.context.session.created_at.isoformat(),
            "metadata": {
                "model": "glm-4-9b-chat-1m",
                "tokens": len(full_response) // 4  # 粗略估算token数
            }
        }
        
        # 更新线程
        thread["messages"].extend([user_msg, assistant_msg])
        persistence.update_thread(thread_id, thread)
        
    except Exception as e:
        error_msg = f"调用模型失败: {str(e)}"
        await cl.Message(content=error_msg).send()

@cl.on_chat_resume
async def on_chat_resume(thread_id: str):
    """恢复历史对话"""
    persistence = cl.user_session.get("persistence")
    
    # 从数据库加载历史
    thread = persistence.get_thread(thread_id)
    if thread:
        cl.user_session.set("thread_id", thread_id)
        
        # 显示历史消息
        for msg in thread["messages"][-5:]:  # 显示最近5条
            if msg["role"] == "user":
                await cl.Message(
                    author="User",
                    content=msg["content"]
                ).send()
            else:
                await cl.Message(
                    author="Assistant",
                    content=msg["content"]
                ).send()
        
        resume_msg = f" 已恢复对话 '{thread['name']}',包含 {len(thread['messages'])} 条消息"
        await cl.Message(content=resume_msg).send()
    else:
        await cl.Message(content="未找到指定的对话历史").send()

@cl.on_chat_end
async def on_chat_end():
    """聊天结束时清理资源"""
    persistence = cl.user_session.get("persistence")
    if persistence:
        persistence.close()

# Chainlit应用配置
chainlit_config = """
[project]
name = "GLM-4-9B-Chat-1M 对话助手"
description = "基于GLM-4-9B-Chat-1M和Chainlit的对话应用,支持消息历史持久化"

[UI]
name = "GLM-4-9B Chat"
description = "与GLM-4-9B-Chat-1M模型对话"
"""

# 写入配置文件
with open("chainlit.md", "w") as f:
    f.write(chainlit_config)

2.4 创建Chainlit配置文件

创建一个chainlit.md文件来配置应用界面:

# 欢迎使用 GLM-4-9B-Chat-1M 对话助手

这是一个基于GLM-4-9B-Chat-1M大模型的对话应用,具有以下特点:

##  主要功能
-  智能对话:支持多轮对话,上下文长度达128K
- 💾 历史保存:所有对话自动保存到PostgreSQL数据库
-  对话恢复:随时可以恢复之前的对话记录
- ⚡ 快速响应:基于vLLM高效推理引擎

##  快速开始
1. 在下方输入框输入你的问题
2. 按Enter或点击发送按钮
3. 查看AI助手的回复

##  使用技巧
- 可以问任何问题,模型支持26种语言
- 对话历史会自动保存,下次可以继续
- 支持代码执行、数学计算、长文本分析等功能

## 🔧 技术栈
- 后端:GLM-4-9B-Chat-1M + vLLM
- 前端:Chainlit
- 存储:PostgreSQL
- 部署:Docker + 云服务器

开始你的对话吧! 

3. 部署与运行

现在所有代码都准备好了,让我们来部署运行。

3.1 启动应用

首先确保PostgreSQL服务正在运行:

# 检查PostgreSQL状态
sudo systemctl status postgresql

# 如果未运行,启动服务
sudo systemctl start postgresql

然后启动Chainlit应用:

# 激活虚拟环境
source chat_env/bin/activate

# 启动Chainlit应用
chainlit run app.py -w --port 8500

你会看到类似下面的输出:

Chainlit app starting...
Your app is available at http://localhost:8500
Database connection established

3.2 验证持久化功能

打开浏览器访问 http://你的服务器IP:8500,开始对话:

  1. 测试消息保存:发送几条消息,然后刷新页面或关闭浏览器
  2. 重新打开应用:再次访问应用,应该能看到历史消息
  3. 检查数据库:登录PostgreSQL查看数据是否保存
# 连接到数据库
psql -U chat_user -d chainlit_chat

# 查看会话表
SELECT id, user_id, session_name, created_at FROM chat_sessions;

# 查看消息表
SELECT session_id, role, LEFT(content, 50) as preview, created_at 
FROM chat_messages 
ORDER BY created_at DESC 
LIMIT 5;

# 退出
\q

3.3 配置系统服务(可选)

为了让应用在后台持续运行,可以配置为系统服务:

# 创建服务文件
sudo nano /etc/systemd/system/chainlit-chat.service

添加以下内容:

[Unit]
Description=GLM-4-9B Chat Application
After=network.target postgresql.service

[Service]
Type=simple
User=your_username
WorkingDirectory=/path/to/your/project
Environment="PATH=/path/to/chat_env/bin"
ExecStart=/path/to/chat_env/bin/chainlit run app.py --port 8500
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

启用并启动服务:

sudo systemctl daemon-reload
sudo systemctl enable chainlit-chat.service
sudo systemctl start chainlit-chat.service

# 查看状态
sudo systemctl status chainlit-chat.service

4. 高级功能与优化

基础功能已经实现了,我们还可以添加一些高级功能来提升体验。

4.1 添加对话管理功能

app.py中添加对话管理功能:

@cl.action("list_conversations")
async def list_conversations():
    """列出所有历史对话"""
    persistence = cl.user_session.get("persistence")
    user_id = cl.context.session.user.id if cl.context.session.user else "anonymous"
    
    threads = persistence.list_threads(user_id, limit=20)
    
    if not threads:
        return await cl.Message(content="暂无历史对话").send()
    
    # 创建对话列表
    threads_list = []
    for thread in threads:
        threads_list.append(
            f"**{thread['name']}**\n"
            f"ID: {thread['id']}\n"
            f"消息数: {thread['messageCount']}\n"
            f"最后更新: {thread['updatedAt']}\n"
            f"最后消息: {thread['lastMessage']}\n"
            f"---"
        )
    
    await cl.Message(content="\n\n".join(threads_list)).send()

@cl.action("rename_conversation")
async def rename_conversation():
    """重命名当前对话"""
    # 弹出输入框让用户输入新名称
    res = await cl.AskActionMessage(
        content="请输入新的对话名称:",
        actions=[
            cl.Action(name="confirm_rename", value="confirm", label="确认"),
            cl.Action(name="cancel_rename", value="cancel", label="取消")
        ]
    ).send()
    
    if res and res.get("value") == "confirm":
        new_name = res.get("input")
        if new_name:
            thread_id = cl.user_session.get("thread_id")
            persistence = cl.user_session.get("persistence")
            
            thread = persistence.get_thread(thread_id)
            if thread:
                thread["name"] = new_name
                persistence.update_thread(thread_id, thread)
                await cl.Message(content=f"对话已重命名为: {new_name}").send()

@cl.action("delete_conversation")
async def delete_conversation():
    """删除当前对话"""
    res = await cl.AskActionMessage(
        content="确定要删除这个对话吗?此操作不可恢复。",
        actions=[
            cl.Action(name="confirm_delete", value="confirm", label="确认删除"),
            cl.Action(name="cancel_delete", value="cancel", label="取消")
        ]
    ).send()
    
    if res and res.get("value") == "confirm":
        thread_id = cl.user_session.get("thread_id")
        persistence = cl.user_session.get("persistence")
        
        if persistence.delete_thread(thread_id):
            await cl.Message(content="对话已删除").send()
            # 创建新的对话
            await on_chat_start()

4.2 添加数据备份功能

创建备份脚本backup.py

import subprocess
from datetime import datetime
import os

def backup_database():
    """备份数据库到文件"""
    # 备份配置
    db_name = "chainlit_chat"
    db_user = "chat_user"
    backup_dir = "/path/to/backups"
    
    # 创建备份目录
    os.makedirs(backup_dir, exist_ok=True)
    
    # 生成备份文件名
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_file = f"{backup_dir}/chainlit_backup_{timestamp}.sql"
    
    # 执行备份命令
    try:
        cmd = [
            "pg_dump",
            "-U", db_user,
            "-d", db_name,
            "-f", backup_file,
            "--clean",
            "--if-exists"
        ]
        
        # 设置密码(在生产环境中建议使用.pgpass文件)
        env = os.environ.copy()
        env["PGPASSWORD"] = "your_secure_password"
        
        result = subprocess.run(cmd, env=env, capture_output=True, text=True)
        
        if result.returncode == 0:
            print(f" 备份成功: {backup_file}")
            
            # 清理旧备份(保留最近7天)
            cleanup_old_backups(backup_dir, days=7)
        else:
            print(f" 备份失败: {result.stderr}")
            
    except Exception as e:
        print(f" 备份异常: {e}")

def cleanup_old_backups(backup_dir, days=7):
    """清理旧的备份文件"""
    import time
    import glob
    
    current_time = time.time()
    cutoff = current_time - (days * 24 * 60 * 60)
    
    backup_files = glob.glob(f"{backup_dir}/chainlit_backup_*.sql")
    
    for file_path in backup_files:
        file_time = os.path.getmtime(file_path)
        if file_time < cutoff:
            os.remove(file_path)
            print(f"🗑 删除旧备份: {file_path}")

def restore_database(backup_file):
    """从备份文件恢复数据库"""
    db_name = "chainlit_chat"
    db_user = "chat_user"
    
    try:
        # 先删除现有数据库(谨慎操作!)
        drop_cmd = ["dropdb", "-U", db_user, db_name]
        create_cmd = ["createdb", "-U", db_user, db_name]
        restore_cmd = ["psql", "-U", db_user, "-d", db_name, "-f", backup_file]
        
        env = os.environ.copy()
        env["PGPASSWORD"] = "your_secure_password"
        
        print("正在恢复数据库...")
        
        # 删除并重建数据库
        subprocess.run(drop_cmd, env=env, check=False)
        subprocess.run(create_cmd, env=env, check=True)
        
        # 恢复数据
        result = subprocess.run(restore_cmd, env=env, capture_output=True, text=True)
        
        if result.returncode == 0:
            print(" 数据库恢复成功")
        else:
            print(f" 恢复失败: {result.stderr}")
            
    except Exception as e:
        print(f" 恢复异常: {e}")

if __name__ == "__main__":
    # 执行备份
    backup_database()
    
    # 如果要恢复,取消注释下面这行并指定备份文件
    # restore_database("/path/to/backup/file.sql")

4.3 配置自动备份

设置定时任务自动备份:

# 编辑crontab
crontab -e

# 添加每天凌晨2点备份
0 2 * * * /path/to/chat_env/bin/python /path/to/your/project/backup.py >> /var/log/chainlit_backup.log 2>&1

# 保存并退出

5. 常见问题与解决方案

在实际使用中可能会遇到一些问题,这里整理了一些常见问题的解决方法。

5.1 数据库连接问题

问题:应用启动时连接数据库失败

解决方案

# 修改database.py中的连接配置,添加重试机制
import time
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError

def create_engine_with_retry(url, retries=3, delay=2):
    """带重试的数据库连接"""
    for i in range(retries):
        try:
            engine = create_engine(url, pool_pre_ping=True, pool_recycle=3600)
            # 测试连接
            with engine.connect() as conn:
                conn.execute("SELECT 1")
            print(" 数据库连接成功")
            return engine
        except OperationalError as e:
            print(f" 连接失败 (尝试 {i+1}/{retries}): {e}")
            if i < retries - 1:
                time.sleep(delay)
            else:
                raise

# 使用带重试的连接
DATABASE_URL = "postgresql://chat_user:your_secure_password@localhost:5432/chainlit_chat"
engine = create_engine_with_retry(DATABASE_URL)

5.2 消息历史过长问题

问题:对话历史太长,超过模型上下文限制

解决方案:在app.py中添加历史消息截断逻辑

def truncate_messages(messages, max_tokens=120000):
    """截断消息历史,确保不超过token限制"""
    total_tokens = 0
    truncated_messages = []
    
    # 从最新消息开始添加
    for msg in reversed(messages):
        # 估算token数(中文字符约2倍)
        msg_tokens = len(msg["content"]) * 2
        
        if total_tokens + msg_tokens > max_tokens:
            break
            
        truncated_messages.insert(0, msg)  # 保持顺序
        total_tokens += msg_tokens
    
    return truncated_messages

# 在on_message函数中使用
messages = thread["messages"][-20:]  # 先取最近20条
messages = truncate_messages(messages, max_tokens=120000)

5.3 性能优化建议

如果发现应用响应变慢,可以尝试以下优化:

  1. 数据库索引优化
-- 为常用查询字段添加索引
CREATE INDEX idx_chat_messages_session_id ON chat_messages(session_id);
CREATE INDEX idx_chat_messages_created_at ON chat_messages(created_at);
CREATE INDEX idx_chat_sessions_user_id ON chat_sessions(user_id);
CREATE INDEX idx_chat_sessions_updated_at ON chat_sessions(updated_at);
  1. 连接池配置
# 在database.py中优化连接池
engine = create_engine(
    DATABASE_URL,
    pool_size=20,  # 连接池大小
    max_overflow=10,  # 最大溢出连接数
    pool_pre_ping=True,  # 连接前ping检查
    pool_recycle=3600,  # 连接回收时间(秒)
    echo=False  # 关闭SQL日志
)
  1. 消息批量保存
# 修改persistence.py中的消息保存逻辑
def save_messages_batch(self, session_id: str, messages: List[MessageDict]):
    """批量保存消息,提高性能"""
    try:
        message_objects = []
        for msg in messages:
            message_objects.append(ChatMessage(
                id=msg["id"],
                session_id=session_id,
                role=msg["role"],
                content=msg["content"],
                metadata=msg.get("metadata", {}),
                created_at=datetime.fromisoformat(msg["createdAt"].replace('Z', '+00:00'))
            ))
        
        self.db.bulk_save_objects(message_objects)
        self.db.commit()
        
    except Exception as e:
        self.db.rollback()
        print(f"批量保存消息失败: {e}")

6. 总结

通过这个教程,我们完整实现了Chainlit消息历史持久化到PostgreSQL的功能。现在你的GLM-4-9B-Chat-1M对话应用具备了完整的对话历史管理能力。

6.1 实现的核心功能

  1. 消息持久化:所有对话自动保存到PostgreSQL数据库
  2. 历史恢复:用户可以随时恢复之前的对话
  3. 对话管理:支持重命名、删除、列表查看等操作
  4. 数据备份:提供自动备份和恢复机制
  5. 性能优化:包含数据库优化和连接管理

6.2 关键收获

  • 技术栈整合:学会了如何将Chainlit、PostgreSQL和GLM-4-9B-Chat-1M模型有机结合
  • 实战经验:掌握了数据库设计、ORM使用、异步编程等实用技能
  • 问题解决:了解了常见问题的排查和解决方法
  • 生产就绪:配置了系统服务、定时备份等生产环境需要的功能

6.3 下一步建议

如果你想进一步扩展这个项目,可以考虑:

  1. 添加用户认证:集成用户登录系统,实现多用户隔离
  2. 实现搜索功能:基于对话内容实现全文搜索
  3. 添加数据分析:分析对话数据,了解用户使用模式
  4. 部署到云端:使用云数据库和容器化部署
  5. 添加监控告警:监控应用性能和数据库状态

这个方案不仅适用于GLM-4-9B-Chat-1M,也可以轻松适配其他大模型。希望这个教程对你有所帮助,如果有任何问题或改进建议,欢迎交流讨论!


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐