dongsu1539 2017-01-20 13:48
浏览 58
已采纳

带有缓冲通道的死锁

I have some code that is a job dispatcher and is collating a large amount of data from lots of TCP sockets. This code is a result of an approach to Large number of transient objects - avoiding contention and it largely works with CPU usage down a huge amount and locking not an issue now either.

From time to time my application locks up and the "Channel length" log is the only thing that keeps repeating as data is still coming in from my sockets. However the count remains at 5000 and no downstream processing is taking place.

I think the issue might be a race condition and the line it is possibly getting hung up on is channel <- msg within the select of the jobDispatcher. Trouble is I can't work out how to verify this.

I suspect that as select can take items at random the goroutine is returning and the shutdownChan doesn't have a chance to process. Then data hits inboundFromTCP and it blocks!

Someone might spot something really obviously wrong here. And offer a solution hopefully!?

var MessageQueue = make(chan *trackingPacket_v1, 5000)

func init() {
    go jobDispatcher(MessageQueue)
}

func addMessage(trackingPacket *trackingPacket_v1) {
    // Send the packet to the buffered queue!
    log.Println("Channel length:", len(MessageQueue))
    MessageQueue <- trackingPacket
}

func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
    var channelMap = make(map[string]chan *trackingPacket_v1)

    // Channel that listens for the strings that want to exit
    shutdownChan := make(chan string)

    for {
        select {
        case msg := <-inboundFromTCP:
            log.Println("Got packet", msg.Avr)
            channel, ok := channelMap[msg.Avr]
            if !ok {
                packetChan := make(chan *trackingPacket_v1)

                channelMap[msg.Avr] = packetChan
                go processPackets(packetChan, shutdownChan, msg.Avr)
                packetChan <- msg
                continue
            }
            channel <- msg
        case shutdownString := <-shutdownChan:
            log.Println("Shutting down:", shutdownString)
            channel, ok := channelMap[shutdownString]
            if ok {
                delete(channelMap, shutdownString)
                close(channel)
            }
        }
    }
}

func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) {
    var messages = []*trackingPacket_v1{}

    tickChan := time.NewTicker(time.Second * 1)
    defer tickChan.Stop()

    hasCheckedData := false

    for {
        select {
        case msg := <-ch:
            log.Println("Got a messages for", id)
            messages = append(messages, msg)
            hasCheckedData = false
        case <-tickChan.C:

            messages = cullChanMessages(messages)
            if len(messages) == 0 {
                messages = nil
                shutdown <- id
                return
            }

            // No point running checking when packets have not changed!!
            if hasCheckedData == false {
                processMLATCandidatesFromChan(messages)
                hasCheckedData = true
            }
        case <-time.After(time.Duration(time.Second * 60)):
            log.Println("This channel has been around for 60 seconds which is too much, kill it")
            messages = nil
            shutdown <- id
            return
        }
    }
}

Update 01/20/16

I tried to rework with the channelMap as a global with some mutex locking but it ended up deadlocking still.


Slightly tweaked the code, still locks but I don't see how this one does!! https://play.golang.org/p/PGpISU4XBJ


Update 01/21/17 After some recommendations I put this into a standalone working example so people can see. https://play.golang.org/p/88zT7hBLeD

It is a long running process so will need running locally on a machine as the playground kills it. Hopefully this will help get to the bottom of it!

  • 写回答

2条回答 默认 最新

  • duanbei3747 2017-01-21 00:37
    关注

    I'm guessing that your problem is getting stuck doing this channel <- msg at the same time as the other goroutine is doing shutdown <- id.

    Since neither the channel nor the shutdown channels are buffered, they block waiting for a receiver. And they can deadlock waiting for the other side to become available.

    There are a couple of ways to fix it. You could declare both of those channels with a buffer of 1.

    Or instead of signalling by sending a shutdown message, you could do what Google's context package does and send a shutdown signal by closing the shutdown channel. Look at https://golang.org/pkg/context/ especially WithCancel, WithDeadline and the Done functions.

    You might be able to use context to remove your own shutdown channel and timeout code.

    And JimB has a point about shutting down the goroutine while it might still be receiving on the channel. What you should do is send the shutdown message (or close, or cancel the context) and continue to process messages until your ch channel is closed (detect that with case msg, ok := <-ch:), which would happen after the shutdown is received by the sender.

    That way you get all of the messages that were incoming until the shutdown actually happened, and should avoid a second deadlock.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥65 LineageOs-21.0系统编译问题
  • ¥30 关于#c++#的问题,请各位专家解答!
  • ¥15 App的会员连续扣费
  • ¥15 不同数据类型的特征融合应该怎么做
  • ¥15 用proteus软件设计一个基于8086微处理器的简易温度计
  • ¥15 用联想小新14Pro
  • ¥15 multisim中关于74ls192n和DSWPK开关仿真图分析(减法计数器)
  • ¥15 w3wp,exe 中发生未处理的 Microsoft ,NETFramework 异常。
  • ¥20 C51单片机程序及仿真(加减器)
  • ¥15 AQWA | 水动力分析 二阶波浪力