I'm trying to write caching proxy for video streams in golang.
My question is, how to distribute streaming copy of large chunks of data between multiple connection?
Or how to store (cache) and safely (and fast) access a data from multiple goroutines?
I tried several options, with mutexes and channels, but they didn't work. Here's few samples that worked with errors.
This is simplified version:
...
var clients []*client
func new_client(conn net.Conn) {
client := &client{
conn: conn,
}
clients = append(clients, client)
}
...
func stream(source io.Reader) {
buf := make([]byte, 32*1024)
for {
n, _ := source.Read(buf)
for _, client := range clients {
wn, e := client.conn.Write(buf[0:n])
// blocks here for all clients if one of clients stops reading
}
}
}
Problem with this version is when one client stops reading but doesnt closes connection, call to Write() starts to block. Wrap call to Write() in goroutine (with mutex locks on client) didn't helped - there is same delay as with channels (next example), besides go doesn't guarantees order of execution of goroutines.
I tried to fix it like this:
for _, client := range clients {
client.conn.SetWriteDeadline(time.Now().Add(1 * time.Millisecond))
wn, e := client.conn.Write(buf[0:n])
}
It helps with blocking, but slow clients can't read in time, increasing timeout - returns delays.
I also tried something like this:
...
var clients []*client
func new_client(conn net.Conn) {
client := &client{
buf_chan: make(chan []byte, 100),
}
clients = append(clients, client)
for {
buf <- client.buf_chan
n, e := client.conn.Write(buf)
}
}
...
func stream(source io.Reader) {
buf := make([]byte, 32*1024)
for {
n, _ := source.Read(buf)
for _, client := range clients {
client.buf_chan <- buf[0:n]
}
}
}
But in this version - there's some delay between send to channel and receive on the other end, so video stream in player starts getting delays, lags.
Maybe advice for some packages in go, or design patterns for this kind of tasks?
Thanks for any help!