Giving some context, I am implementing a fan-out (streaming) using gRPC, on the server every time a client connects, I save the stream into a map and when the client disconnects I just remove it from the map so that I could keep the map updated with only "live" clients/connections.
So the initial map looks something like this:
key value
client1 stream
client2 stream
client3 stream
The problem with this implementation is that if a client connects multiple times, only the last client (using the same key id
) will receive the "broadcasted" message, since the map will overwrite the existing stream with the latest one from the client connecting, therefore I need to either use a unique key or to find a way to match, relate clients and connections.
I found easy to just switch the map and use as the key the connection ID, something like this
conn_id client_id
conn1 client1
conn2 client2 \
conn3 client2 > 3 connections for client ID 2
conn4 client2 /
This works and helps to send a message to all connected clients, but makes things more complex if I would like to send a message to a specific client since I would need to range/traverse the full map to match the value with the desired client.
In an attempt to fix this, I switched back the map using the client_id, but instead of using the stream as the value, I am using a list of slices
key value
client1 [stream, stream, stream]
client2 [stream]
client3 [stream, stream]
The problem persists since in order to send a message to all the clients the full map still needs to be traverse/range to get streams per client and send the message to each.
Therefore without using a database wondering what data structure could it be used following best practices regarding the big O notation?
Mainly would like to find something that behaves similarly to database foreign keys, probably having 2 maps one keeping all connections and other just keeping status or number of connections per client, for example:
key value client_id # streams
conn1 client1 client1 1
conn2 client2 client2 3
conn3 client2
conn4 client2
But how to "atomically" remove a client from the clients
map when there are no more connections from that client in the connections
map.
This is a try but wondering if could be improved:
https://play.golang.org/p/VEoLsWh6dbJ
package main
import (
"fmt"
"sync"
)
func main() {
clients := &sync.Map{}
for i := 0; i < 100; i++ {
clientID := fmt.Sprintf("client-%d", i%5)
connID := fmt.Sprintf("conn-%d", i)
client, ok := clients.LoadOrStore(clientID, connID)
if ok {
switch c := client.(type) {
case string:
client = []string{c, connID}
case []string:
client = append(client.([]string), connID)
}
clients.Store(clientID, client)
}
}
list := func(k, v interface{}) bool {
fmt.Printf("k: %+v v: %v
", k, v)
return true
}
clients.Range(list)
// Get all connections
broadcast := func(k, v interface{}) bool {
fmt.Printf("To all connections from= %q
", k)
for _, v := range v.([]string) {
fmt.Printf("msg to conn = %s
", v)
}
return true
}
clients.Range(broadcast)
}
Here my try only using maps (easy to handle deletions) is slower but allocate fewer bytes:
https://play.golang.org/p/hz5uRX-VAvH
package main
import (
"fmt"
"sync"
)
func main() {
clients := &sync.Map{}
for i := 0; i < 100; i++ {
clientID := fmt.Sprintf("client-%d", i%5)
connID := fmt.Sprintf("conn-%d", i)
conns, ok := clients.Load(clientID)
if ok {
conns.(*sync.Map).Store(connID, i)
} else {
conns := &sync.Map{}
conns.Store(connID, i)
clients.Store(clientID, conns)
}
}
list := func(k, v interface{}) bool {
fmt.Printf("k: %v
", k)
listValue := func(j, l interface{}) bool {
fmt.Printf(" j = %+v
", j)
return true
}
v.(*sync.Map).Range(listValue)
return true
}
clients.Range(list)
// Get all connections
broadcast := func(k, v interface{}) bool {
fmt.Printf("To all connections from= %q
", k)
listValue := func(j, l interface{}) bool {
fmt.Printf(" msg to conn = %s -- %v
", j, l)
return true
}
v.(*sync.Map).Range(listValue)
return true
}
clients.Range(broadcast)
}