I have this code for test. After some time all gorutines locks on carbonWriteBuf.Flush() without any error. Just stop. The more threads and the shorter the interval the faster the stop. If I kill received app (nc -l 127.0.0.1 2002 -k -m 1000) the Golang resets the connect and work again. But if the gorutine has already stopped, then there is no restart and any messages.
package main
import (
"bufio"
"flag"
"fmt"
"net"
"time"
)
var (
SERVER string
INTERVAL int
THREADS int
)
func main() {
flag.StringVar(&SERVER, "s", "localhost:2002", "server address:port (Default: localhost:2002)")
flag.IntVar(&INTERVAL, "i", 500, "send interval (Default: 500")
flag.IntVar(&THREADS, "t", 100, "num of threads (Default: 100")
flag.Parse()
for i := 0; i < THREADS; i++ {
go sender(i)
}
for {
}
}
func sender(id int) {
var carbonErr error
var carbonWriteBuf *bufio.Writer
var carbonConn net.Conn
defer fmt.Println("End 1")
x := 0
i := 10
isUp := true
updater := time.NewTicker(time.Duration(INTERVAL*3) * time.Millisecond)
go func() {
defer fmt.Println("End 2")
for range updater.C {
if isUp {
i++
} else {
i--
}
if i == 20 {
isUp = false
}
if i == 0 {
isUp = true
}
fmt.Println(x)
}
}()
dataSender := time.NewTicker(time.Duration(INTERVAL) * time.Millisecond)
func() {
defer fmt.Println("End 3")
for range dataSender.C {
x++
if carbonConn != nil {
//carbonConn.SetWriteDeadline(time.Now().Add(time.Second * 3))
if err := carbonConn.SetDeadline(time.Now().Add(time.Second * 3)); err != nil {
fmt.Println("Cannot set deadline", err)
}
if _, writeErr := carbonWriteBuf.WriteString(fmt.Sprintf("test.%d.value %d %d
", id, i, time.Now().Unix())); writeErr == nil {
fmt.Println("Buffered:", carbonWriteBuf.Buffered())
if flushErr := carbonWriteBuf.Flush(); flushErr == nil {
fmt.Println("Send", time.Now().Unix())
} else {
fmt.Printf("Cannot write data to carbon %s Error: %s
", carbonConn.RemoteAddr(), flushErr)
carbonConn = nil
}
} else {
fmt.Println("Cannot write data to buffer:", writeErr)
carbonConn = nil
}
} else {
if carbonConn, carbonErr = net.Dial("tcp", SERVER); carbonErr != nil {
carbonConn = nil
fmt.Println("Cannot connect to carbon:", carbonErr)
} else {
defer carbonConn.Close()
carbonWriteBuf = bufio.NewWriterSize(carbonConn, 4096)
//carbonWriteBuf = bufio.NewWriter(carbonConn)
fmt.Println("Successfully connected to carbon:", carbonConn.RemoteAddr())
}
}
}
}()
}