I have used a write-ahead-log in the past to do something like this. In short, keep a ring buffer of messages in the hub. Then replay messages that where send to existing clients while the new one was initialized.
You can expose this concept to the clients too if you wish. That way you can implement efficient re-connects (particularly nice for mobile connections). When clients loose the websocket connection they can reconnect and say "Hey there, it's me again. Looks like we got interrupted. The last message I've seen was number 42. What's new?"
The following is from memory, so take this only as an illustration of the idea, not a finished implementation. In the intererest of brevity I've omited the select statements around client.send
, for instance.
package main
import (
"container/list"
"sync"
"github.com/gorilla/websocket"
)
type Client struct { // all unchanged
hub *Hub
conn *websocket.Conn
send chan []byte
}
type Hub struct {
mu *sync.RWMutex
wal list.List // List if recent messages
clients map[*Client]bool // Registered clients.
register chan Registration // not a chan *Client anymore
broadcast chan []byte
unregister chan *Client
}
type Registration struct {
client *Client
// init is a function that is executed before the client starts to receive
// broadcast messages. All messages that are broadcast while init is
// running will be sent after init returns.
init func()
}
func (h *Hub) run() {
for {
select {
case reg := <-h.register:
// Take note of the most recent message as of right now.
// initClient will replay all later messages
h.mu.RLock()
head := h.wal.Back()
h.mu.RUnlock()
go h.initClient(reg, head)
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.Lock()
h.wal.PushBack(message)
// TODO: Trim list if too long by some metric (e.g. number of
// messages, age, total message size, etc.)
clients := make([]*Client, len(h.clients))
copy(clients, h.clients)
h.mu.Unlock()
for client := range clients {
// TODO: deal with backpressure
client.send <- message
}
}
}
}
func (h *Hub) initClient(reg Registration, head *list.Element) {
reg.init()
// send messages in h.wal after head
for {
h.mu.RLock()
head = head.Next()
if head == nil {
// caught up
h.clients[reg.client] = true
h.mu.RUnlock()
return
}
h.mu.RUnlock()
// TODO: deal with backpressure
reg.client.send <- head.Value.([]byte)
}
}