douzi8916 2018-05-02 07:28
浏览 250
已采纳

Golang频道卡住了

I am working with go and redis to dispatch a queue (channel) of messages to subscribers. I am aiming to create an auto scaling solution that will spawn new go routines (within a certain limit) as the queue gets larger. I have the below code:

// Set up max queued messages
var maxMessages = float64(100000)

// Set up max redis senders
var maxSender = float64(5)

// Set up message channel
var messages = make(chan Message, int(maxMessages))

// Set up messages per sender count
var senderRatio = maxSender / maxMessages

type Message struct {
    ChatId  int    `json:"chatId"`
    UserId  int    `json:"userId"`
    Message string `json:"message"`
    Date    int    `json:"date"`
}

func RedisWriteHandler(messageChannel chan Message) {
    senderCount := 0
    killswitch := make(chan string)
    for {
        length := float64(len(messageChannel))
        neededSenders := int(math.Ceil(length * senderRatio))
        if senderCount < neededSenders || senderCount < 1 {
            log.Printf("Increasing sender count to %d, need %d", senderCount+1, neededSenders)
            go addRedisSender(messageChannel, killswitch)
            senderCount++
        } else if senderCount > neededSenders && senderCount > 1 {
            log.Printf("Decreasing sender count to %d, need %d", senderCount-1, neededSenders)
            killMessage := fmt.Sprintf("only need %d senders", neededSenders)
            killswitch <- killMessage
            senderCount--
        }
    }
    log.Fatal("The redis handler unexpectedly went away")
}

func addRedisSender(messageChannel chan Message, killswitch chan string) {
    c, err := redis.Dial("tcp", "localhost:6379")
    if err != nil {
        log.Println(err)
        return
    }
    defer c.Close()
    for {
        select {
        case msg := <-messageChannel:
            redisChannel := strconv.Itoa(msg.ChatId)
            messageBlob, err := json.Marshal(msg)
            if err != nil {
                log.Println(err)
            }
            _, err = c.Do("PUBLISH", redisChannel, messageBlob)
            if err != nil {
                log.Println(err)
            }
        case kill := <-killswitch:
            log.Printf("Sender killed: %s", kill)
            return
        }
    }
    log.Println("Closing redis sender")
}

If I run this code using a channel with a large buffer size (say 100,000 messages) it adds 5 senders one at a time and starts working down the queue - so far so good. However at a seemingly random point - around 1500 messages in it hangs. No more logs at all (and I think I've got all the exit points covered). My expected output would be to have the senders increase to the maxSender value and decrease periodically throughout execution. An e.g of the logs I get with 100k messages is below

2018/05/02 08:21:25 Increasing sender count to 1, need 5
2018/05/02 08:21:25 Increasing sender count to 2, need 5
2018/05/02 08:21:25 Increasing sender count to 3, need 5
2018/05/02 08:21:25 Increasing sender count to 4, need 5
2018/05/02 08:21:25 Increasing sender count to 5, need 5

Then nothing.

I can see from other testing I'm running that this isn't just going slowly and that the messages just aren't being taken from the channel. Can anyone shed any light on this?

Thanks,

Sam

Edit

I have been lurking around some other questions regarding go hanging for no apparent reason and the suggested action was to kill the parent process to get the stack trace of each child process. The below is the output.

kill -ABRT 9162
SIGABRT: abort
PC=0x65f199 m=0 sigcode=0

goroutine 6 [running]:
main.RedisWriteHandler(0xc42005a180)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:116 +0x99 fp=0xc42004dfd8 sp=0xc42004dee8 pc=0x65f199
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42004dfe0 sp=0xc42004dfd8 pc=0x458f01
created by main.main
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:236 +0x130

goroutine 1 [IO wait]:
internal/poll.runtime_pollWait(0x7f03114c5f70, 0x72, 0xffffffffffffffff)
    /usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422722098, 0x72, 0xc420038b00, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422722098, 0xffffffffffffff00, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Accept(0xc422722080, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:334 +0x1e2
net.(*netFD).accept(0xc422722080, 0x7f0311511000, 0x0, 0x7139b0)
    /usr/local/go/src/net/fd_unix.go:238 +0x42
net.(*TCPListener).accept(0xc4200b2000, 0xc420038d80, 0x4122c8, 0x30)
    /usr/local/go/src/net/tcpsock_posix.go:136 +0x2e
net.(*TCPListener).AcceptTCP(0xc4200b2000, 0xc422736120, 0xc422736120, 0x6a3b00)
    /usr/local/go/src/net/tcpsock.go:234 +0x49
net/http.tcpKeepAliveListener.Accept(0xc4200b2000, 0xc42001c0d8, 0x6a3b00, 0x8813d0, 0x6f2aa0)
    /usr/local/go/src/net/http/server.go:3120 +0x2f
net/http.(*Server).Serve(0xc42007a4e0, 0x8575c0, 0xc4200b2000, 0x0, 0x0)
    /usr/local/go/src/net/http/server.go:2695 +0x1b2
net/http.(*Server).ListenAndServe(0xc42007a4e0, 0xc42007a4e0, 0xc420038f00)
    /usr/local/go/src/net/http/server.go:2636 +0xa9
net/http.ListenAndServe(0x6ff124, 0x5, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/http/server.go:2882 +0x7f
main.main()
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:239 +0x15b

goroutine 21 [runnable]:
internal/poll.runtime_pollWait(0x7f03114c5eb0, 0x72, 0x0)
    /usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422722198, 0x72, 0xffffffffffffff00, 0x854d00, 0x851550)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422722198, 0xc4227ab000, 0x1000, 0x1000)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Read(0xc422722180, 0xc4227ab000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:125 +0x18a
net.(*netFD).Read(0xc422722180, 0xc4227ab000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
    /usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc4227a6000, 0xc4227ab000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc4227ae000)
    /usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc4227ae000, 0x7f03114c5e0a, 0x0, 0x459fd6, 0xc42276db60, 0x491d6d, 0xc422722180)
    /usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc4227b0000, 0x0, 0x8000000000000000, 0xc422722180, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc4227b0000, 0x0, 0x0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc4227b0000, 0x0, 0x6ff70a, 0x7, 0xc422800d40, 0x2, 0x2, 0x68f280, 0x696f01, 0xc422800d60, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc4227b0000, 0x6ff70a, 0x7, 0xc422800d40, 0x2, 0x2, 0x0, 0xc4227ba9a0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 22 [IO wait]:
internal/poll.runtime_pollWait(0x7f03114c5c70, 0x72, 0x0)
    /usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422758098, 0x72, 0xffffffffffffff00, 0x854d00, 0x851550)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422758098, 0xc422774000, 0x1000, 0x1000)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Read(0xc422758080, 0xc422774000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:125 +0x18a
net.(*netFD).Read(0xc422758080, 0xc422774000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
    /usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc42274c020, 0xc422774000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc422748120)
    /usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc422748120, 0x7f03114c5c0a, 0x0, 0x459fd6, 0xc420049b60, 0x491d6d, 0xc422758080)
    /usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc420084820, 0x0, 0x8000000000000000, 0xc422758080, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc420084820, 0x0, 0x0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc420084820, 0x0, 0x6ff70a, 0x7, 0xc422800dc0, 0x2, 0x2, 0x68f280, 0x696f01, 0xc422800de0, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc420084820, 0x6ff70a, 0x7, 0xc422800dc0, 0x2, 0x2, 0x0, 0xc4227ba9b0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 23 [runnable]:
syscall.Syscall(0x0, 0x7, 0xc42277a000, 0x1000, 0x4, 0x1000, 0x0)
    /usr/local/go/src/syscall/asm_linux_amd64.s:18 +0x5
syscall.read(0x7, 0xc42277a000, 0x1000, 0x1000, 0xc42276f900, 0x0, 0x0)
    /usr/local/go/src/syscall/zsyscall_linux_amd64.go:756 +0x55
syscall.Read(0x7, 0xc42277a000, 0x1000, 0x1000, 0xcb, 0xc4228072b0, 0xcb)
    /usr/local/go/src/syscall/syscall_unix.go:162 +0x49
internal/poll.(*FD).Read(0xc422722280, 0xc42277a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:121 +0x125
net.(*netFD).Read(0xc422722280, 0xc42277a000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
    /usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc42274c060, 0xc42277a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc4227481e0)
    /usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc4227481e0, 0x7f03114c5d0a, 0x0, 0x459fd6, 0xc42276fb60, 0x491d6d, 0xc422722280)
    /usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc420084a00, 0x0, 0x8000000000000000, 0xc422722280, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc420084a00, 0x0, 0x0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc420084a00, 0x0, 0x6ff70a, 0x7, 0xc4227f1900, 0x2, 0x2, 0x68f280, 0x696f01, 0xc4227f1920, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc420084a00, 0x6ff70a, 0x7, 0xc4227f1900, 0x2, 0x2, 0x0, 0xc4227318e0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 24 [running]:
    goroutine running on other thread; stack unavailable
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 25 [runnable]:
syscall.Syscall6(0x37, 0x9, 0x1, 0x4, 0xc42276b3dc, 0xc42276b3d8, 0x0, 0x0, 0x4, 0x0)
    /usr/local/go/src/syscall/asm_linux_amd64.s:44 +0x5
syscall.getsockopt(0x9, 0x1, 0x4, 0xc42276b3dc, 0xc42276b3d8, 0xc422722400, 0x0)
    /usr/local/go/src/syscall/zsyscall_linux_amd64.go:1605 +0x7c
syscall.GetsockoptInt(0x9, 0x1, 0x4, 0x1, 0x0, 0x0)
    /usr/local/go/src/syscall/syscall_unix.go:245 +0x63
net.(*netFD).connect(0xc422722480, 0x857a00, 0xc42001c0d8, 0x0, 0x0, 0x853a80, 0xc422764100, 0x0, 0x0, 0x0, ...)
    /usr/local/go/src/net/fd_unix.go:160 +0x2f7
net.(*netFD).dial(0xc422722480, 0x857a00, 0xc42001c0d8, 0x858c00, 0x0, 0x858c00, 0xc422736510, 0xc42276b610, 0x54d3fe)
    /usr/local/go/src/net/sock_posix.go:142 +0xe9
net.socket(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x2, 0x1, 0x0, 0x0, 0x858c00, 0x0, ...)
    /usr/local/go/src/net/sock_posix.go:93 +0x1a5
net.internetSocket(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x858c00, 0x0, 0x858c00, 0xc422736510, 0x1, 0x0, ...)
    /usr/local/go/src/net/ipsock_posix.go:141 +0x129
net.doDialTCP(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x0, 0xc422736510, 0xc42276b7e0, 0x0, 0xf2)
    /usr/local/go/src/net/tcpsock_posix.go:62 +0xb9
net.dialTCP(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x0, 0xc422736510, 0x44a6b8, 0xad3bb25ed8, 0x2e6f67bd)
    /usr/local/go/src/net/tcpsock_posix.go:58 +0xe4
net.dialSingle(0x857a00, 0xc42001c0d8, 0xc422722400, 0x8559c0, 0xc422736510, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/dial.go:547 +0x3e2
net.dialSerial(0x857a00, 0xc42001c0d8, 0xc422722400, 0xc420010c80, 0x1, 0x1, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/dial.go:515 +0x247
net.(*Dialer).DialContext(0xc422766240, 0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x701517, 0xe, 0x0, 0x0, 0x0, ...)
    /usr/local/go/src/net/dial.go:397 +0x6ee
net.(*Dialer).Dial(0xc422766240, 0x6fee50, 0x3, 0x701517, 0xe, 0x10, 0x6b5de0, 0xc420082001, 0xc420010c40)
    /usr/local/go/src/net/dial.go:320 +0x75
net.(*Dialer).Dial-fm(0x6fee50, 0x3, 0x701517, 0xe, 0x3, 0xc42273cc50, 0x42a568, 0xc42273cc18)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:180 +0x52
github.com/gomodule/redigo/redis.Dial(0x6fee50, 0x3, 0x701517, 0xe, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:183 +0x182
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:133 +0x12e
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

rax    0x5
rbx    0xf3e31
rcx    0x5
rdx    0xc42005a180
rdi    0x458f01
rsi    0x3
rbp    0xc42004dfc8
rsp    0xc42004dee8
r8     0x0
r9     0x0
r10    0x732f60
r11    0x30
r12    0x0
r13    0xf1
r14    0x11
r15    0x0
rip    0x65f199
rflags 0x246
cs     0x33
fs     0x0
gs     0x0

Go routine 22 is somewhat interesting as it's in [IO WAIT] where as the others are runnable. I've not dealt with these statuses before, is this where the issue lies?

Most up to date code here https://gitlab.com/samisagit/go-im-server/blob/changed-redis-handler-buggy/src/main.go

Edit with root cause!

One of the guys at golang-nuts gGroup suggested this might be a GC issue, and he was right (Thanks Michael)! When running the code with the below

debug.SetGCPercent(-1)

it works as expected - this isn't a long term solution but it points to where the issue lies! If anyone has any clues as to why the GC is being so zealous with it's duties I'd be super appreciative!

  • 写回答

2条回答 默认 最新

  • duandan9680 2018-07-24 19:52
    关注

    For future ref this ended up being the first for loop in RedisWriteHandler. There are no blocking lines in the loop (e.g. a select) so the loop runs 'infinitely' and uses up a load of resources causing GC to bin it off. Thought I'd posted the answer here when I first found this, but evidently not.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 DIFY API Endpoint 问题。
  • ¥20 sub地址DHCP问题
  • ¥15 delta降尺度计算的一些细节,有偿
  • ¥15 Arduino红外遥控代码有问题
  • ¥15 数值计算离散正交多项式
  • ¥30 数值计算均差系数编程
  • ¥15 redis-full-check比较 两个集群的数据出错
  • ¥15 Matlab编程问题
  • ¥15 训练的多模态特征融合模型准确度很低怎么办
  • ¥15 kylin启动报错log4j类冲突