AI客服系统 - WebSocket实时通信系统实现
·
一、概述
本文档详细解析 AI客服系统中的 WebSocket 实时通信架构设计与代码实现,基于 /backend/websocket/ 目录下的核心源码展开分析。
二、核心架构设计
2.1 系统组件关系
┌─────────────────┐
│ 前端 (访客/客服) │
└────────┬────────┘
│ WebSocket 连接
│
┌────────▼────────┐
│ HandleWebSocket │ HTTP 升级处理
└────────┬────────┘
│
┌────────▼────────┐
│ Hub │ 连接管理核心
└────────┬────────┘
│
┌─────┴─────┐
│ │
┌──▼──┐ ┌────▼──┐
│Client│ │ Redis │ 分布式事件总线
└─────┘ └───────┘
2.2 核心数据结构
Hub 结构体: (hub.go:24-45)
type Hub struct {
// 每个对话ID对应的客户端连接列表
conversations map[uint]map[*Client]bool
// 注册/注销通道
register chan *Client
unregister chan *Client
// 消息广播通道
broadcast chan *Message
// 互斥锁
mu sync.RWMutex
// 回调函数
onConnect OnClientConnectCallback
onDisconnect OnClientDisconnectCallback
// 分布式事件总线
bus DistributedBus
}
Message 结构体: (hub.go:49-56)
type Message struct {
ConversationID uint `json:"conversation_id"`
Data interface{} `json:"data"`
Type string `json:"type"`
Scope string `json:"scope,omitempty"` // conversation | all_agents
FromRemote bool `json:"-"`
}
三、核心实现详解
3.1 WebSocket 连接处理
连接升级与初始化: (handler.go:23-95)
func HandleWebSocket(hub *Hub, userRepo *repository.UserRepository) gin.HandlerFunc {
return func(c *gin.Context) {
// 1. 解析参数
conversationIDStr := c.Query("conversation_id")
isVisitorStr := c.DefaultQuery("is_visitor", "true")
isVisitor := isVisitorStr == "true" || isVisitorStr == "1"
// 2. 客服身份验证
var agentID uint
if !isVisitor {
agentIDStr := c.Query("agent_id")
agentID = ... // 解析并验证
wsToken := c.Query("ws_token")
if !utils.ValidateWSToken(wsToken, agentID) {
c.JSON(http.StatusUnauthorized, ...)
return
}
}
// 3. HTTP 升级为 WebSocket
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
// 4. 创建 Client 并注册到 Hub
client := NewClient(hub, conn, uint(conversationID), isVisitor, agentID)
hub.register <- client
// 5. 启动读写协程
go client.WritePump()
go client.ReadPump()
}
}
关键点说明:
-
使用 gorilla/websocket 库实现协议升级
-
访客连接无特殊权限,客服连接需要 token 验证
-
每个连接独立运行两个协程处理读写
3.2 Hub 消息循环
主事件循环: (hub.go:85-182)
func (h *Hub) Run() {
for {
select {
// 新客户端连接
case client := <-h.register:
h.mu.Lock()
if h.conversations[client.conversationID] == nil {
h.conversations[client.conversationID] = make(map[*Client]bool)
}
h.conversations[client.conversationID][client] = true
h.mu.Unlock()
if h.onConnect != nil {
visitorCount := 0 // 统计访客连接数
h.onConnect(client.conversationID, client.isVisitor, visitorCount, client.agentID)
}
// 客户端断开
case client := <-h.unregister:
h.mu.Lock()
if clients, ok := h.conversations[client.conversationID]; ok {
if _, ok := clients[client]; ok {
delete(clients, client)
close(client.send)
if len(clients) == 0 {
delete(h.conversations, client.conversationID)
}
}
}
h.mu.Unlock()
// 广播消息
case message := <-h.broadcast:
if message.Scope == "all_agents" {
clients := h.snapshotAllAgents()
h.sendToClients(clients, message)
} else {
clients := h.snapshotConversationClients(message.ConversationID)
h.sendToClients(clients, message)
}
// 分布式广播
if h.bus != nil && !message.FromRemote {
h.bus.Publish(message)
}
}
}
}
设计亮点:
-
Channel 代替锁竞争: 使用 channel 进行事件通知,而非频繁加锁
-
Snapshot 模式: 发送消息前先对连接列表做快照,避免锁竞争
-
分布式支持: 通过 bus 接口可接入 Redis 等消息中间件实现多实例同步
3.3 消息广播实现
两种广播范围: (hub.go:184-204)
// 1. 对话内广播
func (h *Hub) BroadcastMessage(conversationID uint, messageType string, data interface{}) {
h.broadcast <- &Message{
ConversationID: conversationID,
Type: messageType,
Data: data,
Scope: "conversation",
}
}
// 2. 所有客服广播
func (h *Hub) BroadcastToAllAgents(messageType string, data interface{}) {
h.broadcast <- &Message{
Type: messageType,
Data: data,
Scope: "all_agents",
}
}
消息发送逻辑: (hub.go:248-272)
func (h *Hub) sendToClients(clients []*Client, message *Message) {
for _, client := range clients {
select {
case client.send <- message:
// 发送成功
default:
// 发送失败,清理客户端
h.mu.Lock()
if cc, ok := h.conversations[client.conversationID]; ok {
delete(cc, client)
if len(cc) == 0 {
delete(h.conversations, client.conversationID)
}
}
h.mu.Unlock()
safeClose(client.send)
}
}
}
关键点说明:
-
使用
select-default模式非阻塞发送 -
发送失败的客户端自动清理
-
避免因单个慢客户端阻塞整个广播
四、分布式支持
4.1 Redis 事件总线
事件订阅与发布: (redis_bus.go)
func (bus *RedisBus) Subscribe(handler func(*Message)) {
pubsub := bus.rdb.Subscribe(bus.ctx, bus.channel)
ch := pubsub.Channel()
go func() {
for msg := range ch {
var m Message
if err := json.Unmarshal([]byte(msg.Payload), &m); err == nil {
m.FromRemote = true
handler(&m)
}
}
}()
}
func (bus *RedisBus) Publish(msg *Message) error {
data, err := json.Marshal(msg)
if err != nil {
return err
}
return bus.rdb.Publish(bus.ctx, bus.channel, data).Err()
}
4.2 初始化集成
创建支持分布式的 Hub: (hub.go:58-83)
func NewHub(onConnect OnClientConnectCallback,
onDisconnect OnClientDisconnectCallback,
bus DistributedBus) *Hub {
h := &Hub{
conversations: make(map[uint]map[*Client]bool),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan *Message, 256),
bus: bus,
}
if bus != nil {
bus.Subscribe(func(msg *Message) {
msg.FromRemote = true
select {
case h.broadcast <- msg:
default:
// 队列拥塞,丢弃消息
}
})
}
return h
}
五、性能优化策略
5.1 连接管理优化
-
读写分离: ReadPump 负责读,WritePump 负责写,职责单一
-
连接池设计: 按 conversation 分组管理,避免全量遍历
-
心跳机制: 通过 Ping/Pong 帧检测连接健康状态
5.2 消息处理优化
// broadcast 通道带缓冲
broadcast: make(chan *Message, 256)
// 快照方式发送,减少锁持有时间
func (h *Hub) snapshotConversationClients(conversationID uint) []*Client {
更多推荐

所有评论(0)