一、概述

本文档详细解析 AI客服系统中的 WebSocket 实时通信架构设计与代码实现,基于 /backend/websocket/ 目录下的核心源码展开分析。

二、核心架构设计

2.1 系统组件关系

┌─────────────────┐
│  前端 (访客/客服) │
└────────┬────────┘
         │ WebSocket 连接
         │
┌────────▼────────┐
│  HandleWebSocket │ HTTP 升级处理
└────────┬────────┘
         │
┌────────▼────────┐
│      Hub        │ 连接管理核心
└────────┬────────┘
         │
   ┌─────┴─────┐
   │           │
┌──▼──┐   ┌────▼──┐
│Client│   │ Redis │ 分布式事件总线
└─────┘   └───────┘

2.2 核心数据结构

Hub 结构体: (hub.go:24-45)

type Hub struct {
    // 每个对话ID对应的客户端连接列表
    conversations map[uint]map[*Client]bool
    
    // 注册/注销通道
    register   chan *Client
    unregister chan *Client
    
    // 消息广播通道
    broadcast chan *Message
    
    // 互斥锁
    mu sync.RWMutex
    
    // 回调函数
    onConnect    OnClientConnectCallback
    onDisconnect OnClientDisconnectCallback
    
    // 分布式事件总线
    bus DistributedBus
}

Message 结构体: (hub.go:49-56)

type Message struct {
    ConversationID uint            `json:"conversation_id"`
    Data           interface{}     `json:"data"`
    Type           string          `json:"type"`
    Scope          string          `json:"scope,omitempty"` // conversation | all_agents
    FromRemote     bool            `json:"-"`
}

三、核心实现详解

3.1 WebSocket 连接处理

连接升级与初始化: (handler.go:23-95)

func HandleWebSocket(hub *Hub, userRepo *repository.UserRepository) gin.HandlerFunc {
    return func(c *gin.Context) {
        // 1. 解析参数
        conversationIDStr := c.Query("conversation_id")
        isVisitorStr := c.DefaultQuery("is_visitor", "true")
        isVisitor := isVisitorStr == "true" || isVisitorStr == "1"
        
        // 2. 客服身份验证
        var agentID uint
        if !isVisitor {
            agentIDStr := c.Query("agent_id")
            agentID = ... // 解析并验证
            wsToken := c.Query("ws_token")
            if !utils.ValidateWSToken(wsToken, agentID) {
                c.JSON(http.StatusUnauthorized, ...)
                return
            }
        }
        
        // 3. HTTP 升级为 WebSocket
        conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
        
        // 4. 创建 Client 并注册到 Hub
        client := NewClient(hub, conn, uint(conversationID), isVisitor, agentID)
        hub.register <- client
        
        // 5. 启动读写协程
        go client.WritePump()
        go client.ReadPump()
    }
}

关键点说明:

  • 使用 gorilla/websocket 库实现协议升级

  • 访客连接无特殊权限,客服连接需要 token 验证

  • 每个连接独立运行两个协程处理读写

3.2 Hub 消息循环

主事件循环: (hub.go:85-182)

func (h *Hub) Run() {
    for {
        select {
        // 新客户端连接
        case client := <-h.register:
            h.mu.Lock()
            if h.conversations[client.conversationID] == nil {
                h.conversations[client.conversationID] = make(map[*Client]bool)
            }
            h.conversations[client.conversationID][client] = true
            h.mu.Unlock()
            
            if h.onConnect != nil {
                visitorCount := 0 // 统计访客连接数
                h.onConnect(client.conversationID, client.isVisitor, visitorCount, client.agentID)
            }
            
        // 客户端断开
        case client := <-h.unregister:
            h.mu.Lock()
            if clients, ok := h.conversations[client.conversationID]; ok {
                if _, ok := clients[client]; ok {
                    delete(clients, client)
                    close(client.send)
                    if len(clients) == 0 {
                        delete(h.conversations, client.conversationID)
                    }
                }
            }
            h.mu.Unlock()
            
        // 广播消息
        case message := <-h.broadcast:
            if message.Scope == "all_agents" {
                clients := h.snapshotAllAgents()
                h.sendToClients(clients, message)
            } else {
                clients := h.snapshotConversationClients(message.ConversationID)
                h.sendToClients(clients, message)
            }
            
            // 分布式广播
            if h.bus != nil && !message.FromRemote {
                h.bus.Publish(message)
            }
        }
    }
}

设计亮点:

  1. Channel 代替锁竞争: 使用 channel 进行事件通知,而非频繁加锁

  2. Snapshot 模式: 发送消息前先对连接列表做快照,避免锁竞争

  3. 分布式支持: 通过 bus 接口可接入 Redis 等消息中间件实现多实例同步

3.3 消息广播实现

两种广播范围: (hub.go:184-204)

// 1. 对话内广播
func (h *Hub) BroadcastMessage(conversationID uint, messageType string, data interface{}) {
    h.broadcast <- &Message{
        ConversationID: conversationID,
        Type:           messageType,
        Data:           data,
        Scope:          "conversation",
    }
}

// 2. 所有客服广播
func (h *Hub) BroadcastToAllAgents(messageType string, data interface{}) {
    h.broadcast <- &Message{
        Type:    messageType,
        Data:    data,
        Scope:   "all_agents",
    }
}

消息发送逻辑: (hub.go:248-272)

func (h *Hub) sendToClients(clients []*Client, message *Message) {
    for _, client := range clients {
        select {
        case client.send <- message:
            // 发送成功
        default:
            // 发送失败,清理客户端
            h.mu.Lock()
            if cc, ok := h.conversations[client.conversationID]; ok {
                delete(cc, client)
                if len(cc) == 0 {
                    delete(h.conversations, client.conversationID)
                }
            }
            h.mu.Unlock()
            safeClose(client.send)
        }
    }
}

关键点说明:

  • 使用 select-default 模式非阻塞发送

  • 发送失败的客户端自动清理

  • 避免因单个慢客户端阻塞整个广播

四、分布式支持

4.1 Redis 事件总线

事件订阅与发布: (redis_bus.go)

func (bus *RedisBus) Subscribe(handler func(*Message)) {
    pubsub := bus.rdb.Subscribe(bus.ctx, bus.channel)
    ch := pubsub.Channel()
    
    go func() {
        for msg := range ch {
            var m Message
            if err := json.Unmarshal([]byte(msg.Payload), &m); err == nil {
                m.FromRemote = true
                handler(&m)
            }
        }
    }()
}

func (bus *RedisBus) Publish(msg *Message) error {
    data, err := json.Marshal(msg)
    if err != nil {
        return err
    }
    return bus.rdb.Publish(bus.ctx, bus.channel, data).Err()
}

4.2 初始化集成

创建支持分布式的 Hub: (hub.go:58-83)

func NewHub(onConnect OnClientConnectCallback, 
            onDisconnect OnClientDisconnectCallback, 
            bus DistributedBus) *Hub {
    h := &Hub{
        conversations: make(map[uint]map[*Client]bool),
        register:      make(chan *Client),
        unregister:    make(chan *Client),
        broadcast:     make(chan *Message, 256),
        bus:           bus,
    }
    
    if bus != nil {
        bus.Subscribe(func(msg *Message) {
            msg.FromRemote = true
            select {
            case h.broadcast <- msg:
            default:
                // 队列拥塞,丢弃消息
            }
        })
    }
    
    return h
}

五、性能优化策略

5.1 连接管理优化

  1. 读写分离: ReadPump 负责读,WritePump 负责写,职责单一

  2. 连接池设计: 按 conversation 分组管理,避免全量遍历

  3. 心跳机制: 通过 Ping/Pong 帧检测连接健康状态

5.2 消息处理优化

// broadcast 通道带缓冲
broadcast: make(chan *Message, 256)

// 快照方式发送,减少锁持有时间
func (h *Hub) snapshotConversationClients(conversationID uint) []*Client {

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐