I'm trying to write code to stream data on a topic, like a radio station (one broadcaster, several listeners). I'm stuck on how to process a new WebSocket connection request without having a goroutine for each open WebSocket (that starts to get resource-intensive for many "listeners" to the same "station").
At the moment, I have a map of dataStream structs which look like this:
struct dataStream {
data chan byte[]
conns []*websocket.Connection
}
And here's the pseudocode for upgrading the request to a WebSocket, and then attempting to add a WebSocket connection to the dataStreams conns:
func process_request(w http.ResponseWriter, r *http.Request) {
// hundred lines of business logic...
c := upgrade websocket connection
defer c.Close()
if dataStream exists {
append the new connection c to the dataStream.conns slice
} else {
create new dataStream
append the new connection c to the dataStream.conns slice
stream(dataStream)
}
}
And then here's the stream
function mentioned in the above code block. One of these runs in the background for each dataStream (not for every WebSocket connection).
func stream(ds *dataStream) {
ticker := time.NewTicker(poll every ~10 seconds)
go func() { // this is to poll and remove closed connections
for _ = range ticker.C {
for traverse ds.conns {
ping all connections, remove any closed ones and free memory
if len(ds.conns == 0){ // no more connections are listening to this dataStream
delete the ds dataStream and free the memory
stop ticker
return // kill goroutine and free the memory
}
}
}}()
while len(ds.conns) != 0 { // while there are open connections
fetch any available <-ds.data from channel
write the data as websocket message to each connection
}
}
The trouble with this approach is that in the process_request
function, as soon as the flow reaches the bottom if statement
of the 2nd and subsequent connections, after the new connection is appended to the dataStream.conns
slice the function terminates closing the WebSocket connection! As a result, stream()
is running in the background and polls that a closed connection has been added to the ds.conns
slice and removes it.
Hence my question:
What approach should I take to maintain the WebSocket connection open for even after the process_request
handler function returns, preferentially without running a separate goroutine for each connection?