dongqi0644 2018-08-08 21:45
浏览 56

客户端同步中的竞争条件

I have a web app whose server creates a Client for each websocket connection. A Client acts as an intermediary between the websocket connection and a single instance of a Hub. The Hub maintains a set of registered clients and broadcasts messages to the clients. This works pretty well but the problem is that a client might miss events between when the server generates the initial state bundle that the client receives on connection and when the client is registered with the hub and starts receiving broadcast events.

My idea is to register the client with the hub before any information is fetched from the db. That would ensure that the client doesn't miss any broadcasts, though now it could receive messages that are already applied to the initial state it receives. To allow the client to disregard these messages I could include a monotonic timestamp in both the initial state bundle as well as broadcast events.

Can you think of a more elegant/simpler solution?

  • 写回答

1条回答 默认 最新

  • dqx24298 2018-08-09 06:32
    关注

    I have used a write-ahead-log in the past to do something like this. In short, keep a ring buffer of messages in the hub. Then replay messages that where send to existing clients while the new one was initialized.

    You can expose this concept to the clients too if you wish. That way you can implement efficient re-connects (particularly nice for mobile connections). When clients loose the websocket connection they can reconnect and say "Hey there, it's me again. Looks like we got interrupted. The last message I've seen was number 42. What's new?"

    The following is from memory, so take this only as an illustration of the idea, not a finished implementation. In the intererest of brevity I've omited the select statements around client.send, for instance.

    package main
    
    import (
        "container/list"
        "sync"
    
        "github.com/gorilla/websocket"
    )
    
    type Client struct { // all unchanged
        hub  *Hub
        conn *websocket.Conn
        send chan []byte
    }
    
    type Hub struct {
        mu      *sync.RWMutex
        wal     list.List        // List if recent messages
        clients map[*Client]bool // Registered clients.
    
        register chan Registration // not a chan *Client anymore
    
        broadcast  chan []byte
        unregister chan *Client
    }
    
    type Registration struct {
        client *Client
    
        // init is a function that is executed before the client starts to receive
        // broadcast messages. All messages that are broadcast while init is
        // running will be sent after init returns.
        init func()
    }
    
    func (h *Hub) run() {
        for {
            select {
            case reg := <-h.register:
                // Take note of the most recent message as of right now. 
                // initClient will replay all later messages
                h.mu.RLock()
                head := h.wal.Back()
                h.mu.RUnlock()
    
                go h.initClient(reg, head)
            case client := <-h.unregister:
                h.mu.Lock()
                if _, ok := h.clients[client]; ok {
                    delete(h.clients, client)
                    close(client.send)
                }
                h.mu.Unlock()
            case message := <-h.broadcast:
                h.mu.Lock()
                h.wal.PushBack(message)
                // TODO: Trim list if too long by some metric (e.g. number of
                // messages, age, total message size, etc.)
    
                clients := make([]*Client, len(h.clients))
                copy(clients, h.clients)
                h.mu.Unlock()
    
                for client := range clients {
                    // TODO: deal with backpressure
                    client.send <- message
                }
            }
        }
    }
    
    func (h *Hub) initClient(reg Registration, head *list.Element) {
        reg.init()
    
        // send messages in h.wal after head
        for {
            h.mu.RLock()
            head = head.Next()
            if head == nil {
                // caught up
                h.clients[reg.client] = true
                h.mu.RUnlock()
                return
            }
            h.mu.RUnlock()
    
            // TODO: deal with backpressure
            reg.client.send <- head.Value.([]byte)
        }
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作