I am working with go and redis to dispatch a queue (channel) of messages to subscribers. I am aiming to create an auto scaling solution that will spawn new go routines (within a certain limit) as the queue gets larger. I have the below code:
// Set up max queued messages
var maxMessages = float64(100000)
// Set up max redis senders
var maxSender = float64(5)
// Set up message channel
var messages = make(chan Message, int(maxMessages))
// Set up messages per sender count
var senderRatio = maxSender / maxMessages
type Message struct {
ChatId int `json:"chatId"`
UserId int `json:"userId"`
Message string `json:"message"`
Date int `json:"date"`
}
func RedisWriteHandler(messageChannel chan Message) {
senderCount := 0
killswitch := make(chan string)
for {
length := float64(len(messageChannel))
neededSenders := int(math.Ceil(length * senderRatio))
if senderCount < neededSenders || senderCount < 1 {
log.Printf("Increasing sender count to %d, need %d", senderCount+1, neededSenders)
go addRedisSender(messageChannel, killswitch)
senderCount++
} else if senderCount > neededSenders && senderCount > 1 {
log.Printf("Decreasing sender count to %d, need %d", senderCount-1, neededSenders)
killMessage := fmt.Sprintf("only need %d senders", neededSenders)
killswitch <- killMessage
senderCount--
}
}
log.Fatal("The redis handler unexpectedly went away")
}
func addRedisSender(messageChannel chan Message, killswitch chan string) {
c, err := redis.Dial("tcp", "localhost:6379")
if err != nil {
log.Println(err)
return
}
defer c.Close()
for {
select {
case msg := <-messageChannel:
redisChannel := strconv.Itoa(msg.ChatId)
messageBlob, err := json.Marshal(msg)
if err != nil {
log.Println(err)
}
_, err = c.Do("PUBLISH", redisChannel, messageBlob)
if err != nil {
log.Println(err)
}
case kill := <-killswitch:
log.Printf("Sender killed: %s", kill)
return
}
}
log.Println("Closing redis sender")
}
If I run this code using a channel with a large buffer size (say 100,000 messages) it adds 5 senders one at a time and starts working down the queue - so far so good. However at a seemingly random point - around 1500 messages in it hangs. No more logs at all (and I think I've got all the exit points covered). My expected output would be to have the senders increase to the maxSender value and decrease periodically throughout execution. An e.g of the logs I get with 100k messages is below
2018/05/02 08:21:25 Increasing sender count to 1, need 5 2018/05/02 08:21:25 Increasing sender count to 2, need 5 2018/05/02 08:21:25 Increasing sender count to 3, need 5 2018/05/02 08:21:25 Increasing sender count to 4, need 5 2018/05/02 08:21:25 Increasing sender count to 5, need 5
Then nothing.
I can see from other testing I'm running that this isn't just going slowly and that the messages just aren't being taken from the channel. Can anyone shed any light on this?
Thanks,
Sam
Edit
I have been lurking around some other questions regarding go hanging for no apparent reason and the suggested action was to kill the parent process to get the stack trace of each child process. The below is the output.
kill -ABRT 9162
SIGABRT: abort
PC=0x65f199 m=0 sigcode=0
goroutine 6 [running]:
main.RedisWriteHandler(0xc42005a180)
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:116 +0x99 fp=0xc42004dfd8 sp=0xc42004dee8 pc=0x65f199
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42004dfe0 sp=0xc42004dfd8 pc=0x458f01
created by main.main
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:236 +0x130
goroutine 1 [IO wait]:
internal/poll.runtime_pollWait(0x7f03114c5f70, 0x72, 0xffffffffffffffff)
/usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422722098, 0x72, 0xc420038b00, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422722098, 0xffffffffffffff00, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Accept(0xc422722080, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_unix.go:334 +0x1e2
net.(*netFD).accept(0xc422722080, 0x7f0311511000, 0x0, 0x7139b0)
/usr/local/go/src/net/fd_unix.go:238 +0x42
net.(*TCPListener).accept(0xc4200b2000, 0xc420038d80, 0x4122c8, 0x30)
/usr/local/go/src/net/tcpsock_posix.go:136 +0x2e
net.(*TCPListener).AcceptTCP(0xc4200b2000, 0xc422736120, 0xc422736120, 0x6a3b00)
/usr/local/go/src/net/tcpsock.go:234 +0x49
net/http.tcpKeepAliveListener.Accept(0xc4200b2000, 0xc42001c0d8, 0x6a3b00, 0x8813d0, 0x6f2aa0)
/usr/local/go/src/net/http/server.go:3120 +0x2f
net/http.(*Server).Serve(0xc42007a4e0, 0x8575c0, 0xc4200b2000, 0x0, 0x0)
/usr/local/go/src/net/http/server.go:2695 +0x1b2
net/http.(*Server).ListenAndServe(0xc42007a4e0, 0xc42007a4e0, 0xc420038f00)
/usr/local/go/src/net/http/server.go:2636 +0xa9
net/http.ListenAndServe(0x6ff124, 0x5, 0x0, 0x0, 0x0, 0x0)
/usr/local/go/src/net/http/server.go:2882 +0x7f
main.main()
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:239 +0x15b
goroutine 21 [runnable]:
internal/poll.runtime_pollWait(0x7f03114c5eb0, 0x72, 0x0)
/usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422722198, 0x72, 0xffffffffffffff00, 0x854d00, 0x851550)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422722198, 0xc4227ab000, 0x1000, 0x1000)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Read(0xc422722180, 0xc4227ab000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_unix.go:125 +0x18a
net.(*netFD).Read(0xc422722180, 0xc4227ab000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
/usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc4227a6000, 0xc4227ab000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc4227ae000)
/usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc4227ae000, 0x7f03114c5e0a, 0x0, 0x459fd6, 0xc42276db60, 0x491d6d, 0xc422722180)
/usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc4227b0000, 0x0, 0x8000000000000000, 0xc422722180, 0x0, 0x0)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc4227b0000, 0x0, 0x0, 0x0, 0x0)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc4227b0000, 0x0, 0x6ff70a, 0x7, 0xc422800d40, 0x2, 0x2, 0x68f280, 0x696f01, 0xc422800d60, ...)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc4227b0000, 0x6ff70a, 0x7, 0xc422800d40, 0x2, 0x2, 0x0, 0xc4227ba9a0, 0x0, 0x0)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381
goroutine 22 [IO wait]:
internal/poll.runtime_pollWait(0x7f03114c5c70, 0x72, 0x0)
/usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422758098, 0x72, 0xffffffffffffff00, 0x854d00, 0x851550)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422758098, 0xc422774000, 0x1000, 0x1000)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Read(0xc422758080, 0xc422774000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_unix.go:125 +0x18a
net.(*netFD).Read(0xc422758080, 0xc422774000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
/usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc42274c020, 0xc422774000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc422748120)
/usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc422748120, 0x7f03114c5c0a, 0x0, 0x459fd6, 0xc420049b60, 0x491d6d, 0xc422758080)
/usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc420084820, 0x0, 0x8000000000000000, 0xc422758080, 0x0, 0x0)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc420084820, 0x0, 0x0, 0x0, 0x0)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc420084820, 0x0, 0x6ff70a, 0x7, 0xc422800dc0, 0x2, 0x2, 0x68f280, 0x696f01, 0xc422800de0, ...)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc420084820, 0x6ff70a, 0x7, 0xc422800dc0, 0x2, 0x2, 0x0, 0xc4227ba9b0, 0x0, 0x0)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381
goroutine 23 [runnable]:
syscall.Syscall(0x0, 0x7, 0xc42277a000, 0x1000, 0x4, 0x1000, 0x0)
/usr/local/go/src/syscall/asm_linux_amd64.s:18 +0x5
syscall.read(0x7, 0xc42277a000, 0x1000, 0x1000, 0xc42276f900, 0x0, 0x0)
/usr/local/go/src/syscall/zsyscall_linux_amd64.go:756 +0x55
syscall.Read(0x7, 0xc42277a000, 0x1000, 0x1000, 0xcb, 0xc4228072b0, 0xcb)
/usr/local/go/src/syscall/syscall_unix.go:162 +0x49
internal/poll.(*FD).Read(0xc422722280, 0xc42277a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_unix.go:121 +0x125
net.(*netFD).Read(0xc422722280, 0xc42277a000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
/usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc42274c060, 0xc42277a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc4227481e0)
/usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc4227481e0, 0x7f03114c5d0a, 0x0, 0x459fd6, 0xc42276fb60, 0x491d6d, 0xc422722280)
/usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc420084a00, 0x0, 0x8000000000000000, 0xc422722280, 0x0, 0x0)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc420084a00, 0x0, 0x0, 0x0, 0x0)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc420084a00, 0x0, 0x6ff70a, 0x7, 0xc4227f1900, 0x2, 0x2, 0x68f280, 0x696f01, 0xc4227f1920, ...)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc420084a00, 0x6ff70a, 0x7, 0xc4227f1900, 0x2, 0x2, 0x0, 0xc4227318e0, 0x0, 0x0)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381
goroutine 24 [running]:
goroutine running on other thread; stack unavailable
created by main.RedisWriteHandler
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381
goroutine 25 [runnable]:
syscall.Syscall6(0x37, 0x9, 0x1, 0x4, 0xc42276b3dc, 0xc42276b3d8, 0x0, 0x0, 0x4, 0x0)
/usr/local/go/src/syscall/asm_linux_amd64.s:44 +0x5
syscall.getsockopt(0x9, 0x1, 0x4, 0xc42276b3dc, 0xc42276b3d8, 0xc422722400, 0x0)
/usr/local/go/src/syscall/zsyscall_linux_amd64.go:1605 +0x7c
syscall.GetsockoptInt(0x9, 0x1, 0x4, 0x1, 0x0, 0x0)
/usr/local/go/src/syscall/syscall_unix.go:245 +0x63
net.(*netFD).connect(0xc422722480, 0x857a00, 0xc42001c0d8, 0x0, 0x0, 0x853a80, 0xc422764100, 0x0, 0x0, 0x0, ...)
/usr/local/go/src/net/fd_unix.go:160 +0x2f7
net.(*netFD).dial(0xc422722480, 0x857a00, 0xc42001c0d8, 0x858c00, 0x0, 0x858c00, 0xc422736510, 0xc42276b610, 0x54d3fe)
/usr/local/go/src/net/sock_posix.go:142 +0xe9
net.socket(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x2, 0x1, 0x0, 0x0, 0x858c00, 0x0, ...)
/usr/local/go/src/net/sock_posix.go:93 +0x1a5
net.internetSocket(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x858c00, 0x0, 0x858c00, 0xc422736510, 0x1, 0x0, ...)
/usr/local/go/src/net/ipsock_posix.go:141 +0x129
net.doDialTCP(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x0, 0xc422736510, 0xc42276b7e0, 0x0, 0xf2)
/usr/local/go/src/net/tcpsock_posix.go:62 +0xb9
net.dialTCP(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x0, 0xc422736510, 0x44a6b8, 0xad3bb25ed8, 0x2e6f67bd)
/usr/local/go/src/net/tcpsock_posix.go:58 +0xe4
net.dialSingle(0x857a00, 0xc42001c0d8, 0xc422722400, 0x8559c0, 0xc422736510, 0x0, 0x0, 0x0, 0x0)
/usr/local/go/src/net/dial.go:547 +0x3e2
net.dialSerial(0x857a00, 0xc42001c0d8, 0xc422722400, 0xc420010c80, 0x1, 0x1, 0x0, 0x0, 0x0, 0x0)
/usr/local/go/src/net/dial.go:515 +0x247
net.(*Dialer).DialContext(0xc422766240, 0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x701517, 0xe, 0x0, 0x0, 0x0, ...)
/usr/local/go/src/net/dial.go:397 +0x6ee
net.(*Dialer).Dial(0xc422766240, 0x6fee50, 0x3, 0x701517, 0xe, 0x10, 0x6b5de0, 0xc420082001, 0xc420010c40)
/usr/local/go/src/net/dial.go:320 +0x75
net.(*Dialer).Dial-fm(0x6fee50, 0x3, 0x701517, 0xe, 0x3, 0xc42273cc50, 0x42a568, 0xc42273cc18)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:180 +0x52
github.com/gomodule/redigo/redis.Dial(0x6fee50, 0x3, 0x701517, 0xe, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:183 +0x182
main.addRedisSender(0xc42005a180, 0xc42001e240)
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:133 +0x12e
created by main.RedisWriteHandler
/home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381
rax 0x5
rbx 0xf3e31
rcx 0x5
rdx 0xc42005a180
rdi 0x458f01
rsi 0x3
rbp 0xc42004dfc8
rsp 0xc42004dee8
r8 0x0
r9 0x0
r10 0x732f60
r11 0x30
r12 0x0
r13 0xf1
r14 0x11
r15 0x0
rip 0x65f199
rflags 0x246
cs 0x33
fs 0x0
gs 0x0
Go routine 22 is somewhat interesting as it's in [IO WAIT] where as the others are runnable. I've not dealt with these statuses before, is this where the issue lies?
Most up to date code here https://gitlab.com/samisagit/go-im-server/blob/changed-redis-handler-buggy/src/main.go
Edit with root cause!
One of the guys at golang-nuts gGroup suggested this might be a GC issue, and he was right (Thanks Michael)! When running the code with the below
debug.SetGCPercent(-1)
it works as expected - this isn't a long term solution but it points to where the issue lies! If anyone has any clues as to why the GC is being so zealous with it's duties I'd be super appreciative!