I'm writing a tcp bridge that pipes data from a conn to another conn, the process is duplex, that means the another conn may need to write data to the first conn.
I open four go-routines to handle the task, each one just does one of the things: read from upstream, write to downstream, read from downstream, and write to upstream. The read go-routine is simple:
func readUpstream(conn *net.TCPConn) {
for {
buf := make([]byte, 10)
_, err := io.ReadFull(conn, buf)
if err != nil {
break
}
pipeToDownstream(buf)
}
}
In general, the function pipeToDownload
just need a channel to write to it, and the writeDownstream
function just read the channel, and then write it to the downstream conn, just like follow:
func pipeToDownstream(buf) {
downPipe <- buf
}
func writeDownstream(conn *net.Conn) {
for {
data := <- downPipe
_, err := conn.Write(data)
if err != nil {
break
}
}
}
But, in my case, the data from upstream is a video stream, I need to ensure the real-time pipe, that means if the downstream is writing, just the last frame read from the upstream should be reserved, and then the write downstream routine read from the reserved data and then write it. Because the video stream needs big bandwidth, but sometimes the client bandwidth is not enough to send data, and then the write downstream routine will be blocked, and then if use pipeToDownstream
function directly will block the readUpstream
go routine.
So, I added a flag to represent the writeDownstream
is writing, the read data from readUpstream
should be reserved if the flag is 1
, else should pipe to the channel directly. This means I need to add a mutex to ensure thread-safe for the flag
and the reserved
field.
How should I do?