dongsu1539 2017-01-20 13:48
浏览 58
已采纳

带有缓冲通道的死锁

I have some code that is a job dispatcher and is collating a large amount of data from lots of TCP sockets. This code is a result of an approach to Large number of transient objects - avoiding contention and it largely works with CPU usage down a huge amount and locking not an issue now either.

From time to time my application locks up and the "Channel length" log is the only thing that keeps repeating as data is still coming in from my sockets. However the count remains at 5000 and no downstream processing is taking place.

I think the issue might be a race condition and the line it is possibly getting hung up on is channel <- msg within the select of the jobDispatcher. Trouble is I can't work out how to verify this.

I suspect that as select can take items at random the goroutine is returning and the shutdownChan doesn't have a chance to process. Then data hits inboundFromTCP and it blocks!

Someone might spot something really obviously wrong here. And offer a solution hopefully!?

var MessageQueue = make(chan *trackingPacket_v1, 5000)

func init() {
    go jobDispatcher(MessageQueue)
}

func addMessage(trackingPacket *trackingPacket_v1) {
    // Send the packet to the buffered queue!
    log.Println("Channel length:", len(MessageQueue))
    MessageQueue <- trackingPacket
}

func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
    var channelMap = make(map[string]chan *trackingPacket_v1)

    // Channel that listens for the strings that want to exit
    shutdownChan := make(chan string)

    for {
        select {
        case msg := <-inboundFromTCP:
            log.Println("Got packet", msg.Avr)
            channel, ok := channelMap[msg.Avr]
            if !ok {
                packetChan := make(chan *trackingPacket_v1)

                channelMap[msg.Avr] = packetChan
                go processPackets(packetChan, shutdownChan, msg.Avr)
                packetChan <- msg
                continue
            }
            channel <- msg
        case shutdownString := <-shutdownChan:
            log.Println("Shutting down:", shutdownString)
            channel, ok := channelMap[shutdownString]
            if ok {
                delete(channelMap, shutdownString)
                close(channel)
            }
        }
    }
}

func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) {
    var messages = []*trackingPacket_v1{}

    tickChan := time.NewTicker(time.Second * 1)
    defer tickChan.Stop()

    hasCheckedData := false

    for {
        select {
        case msg := <-ch:
            log.Println("Got a messages for", id)
            messages = append(messages, msg)
            hasCheckedData = false
        case <-tickChan.C:

            messages = cullChanMessages(messages)
            if len(messages) == 0 {
                messages = nil
                shutdown <- id
                return
            }

            // No point running checking when packets have not changed!!
            if hasCheckedData == false {
                processMLATCandidatesFromChan(messages)
                hasCheckedData = true
            }
        case <-time.After(time.Duration(time.Second * 60)):
            log.Println("This channel has been around for 60 seconds which is too much, kill it")
            messages = nil
            shutdown <- id
            return
        }
    }
}

Update 01/20/16

I tried to rework with the channelMap as a global with some mutex locking but it ended up deadlocking still.


Slightly tweaked the code, still locks but I don't see how this one does!! https://play.golang.org/p/PGpISU4XBJ


Update 01/21/17 After some recommendations I put this into a standalone working example so people can see. https://play.golang.org/p/88zT7hBLeD

It is a long running process so will need running locally on a machine as the playground kills it. Hopefully this will help get to the bottom of it!

  • 写回答

2条回答 默认 最新

  • duanbei3747 2017-01-21 00:37
    关注

    I'm guessing that your problem is getting stuck doing this channel <- msg at the same time as the other goroutine is doing shutdown <- id.

    Since neither the channel nor the shutdown channels are buffered, they block waiting for a receiver. And they can deadlock waiting for the other side to become available.

    There are a couple of ways to fix it. You could declare both of those channels with a buffer of 1.

    Or instead of signalling by sending a shutdown message, you could do what Google's context package does and send a shutdown signal by closing the shutdown channel. Look at https://golang.org/pkg/context/ especially WithCancel, WithDeadline and the Done functions.

    You might be able to use context to remove your own shutdown channel and timeout code.

    And JimB has a point about shutting down the goroutine while it might still be receiving on the channel. What you should do is send the shutdown message (or close, or cancel the context) and continue to process messages until your ch channel is closed (detect that with case msg, ok := <-ch:), which would happen after the shutdown is received by the sender.

    That way you get all of the messages that were incoming until the shutdown actually happened, and should avoid a second deadlock.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 ATAC测序数据做Diffbind差异分析之后如何注释
  • ¥15 配置FPT报错,该如何处理
  • ¥15 有偿请人帮写个安卓系统下禁止装软件及禁止拷入文件的程序
  • ¥100 用 H.265 对音视频硬编码 (CUDA)
  • ¥20 mpich安装完成后出问题
  • ¥15 multisim仿真
  • ¥15 stm32循迹小车代码问题
  • ¥15 输入一堆单词,使其去重输出
  • ¥15 qc代码,修改和添加东西
  • ¥50 Unity的粒子系统使用shadergraph(内置管线)制作的一个顶点偏移shader,但是粒子模型移动时,顶点也会偏移