dongsu1539 2017-01-20 13:48
浏览 58
已采纳

带有缓冲通道的死锁

I have some code that is a job dispatcher and is collating a large amount of data from lots of TCP sockets. This code is a result of an approach to Large number of transient objects - avoiding contention and it largely works with CPU usage down a huge amount and locking not an issue now either.

From time to time my application locks up and the "Channel length" log is the only thing that keeps repeating as data is still coming in from my sockets. However the count remains at 5000 and no downstream processing is taking place.

I think the issue might be a race condition and the line it is possibly getting hung up on is channel <- msg within the select of the jobDispatcher. Trouble is I can't work out how to verify this.

I suspect that as select can take items at random the goroutine is returning and the shutdownChan doesn't have a chance to process. Then data hits inboundFromTCP and it blocks!

Someone might spot something really obviously wrong here. And offer a solution hopefully!?

var MessageQueue = make(chan *trackingPacket_v1, 5000)

func init() {
    go jobDispatcher(MessageQueue)
}

func addMessage(trackingPacket *trackingPacket_v1) {
    // Send the packet to the buffered queue!
    log.Println("Channel length:", len(MessageQueue))
    MessageQueue <- trackingPacket
}

func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
    var channelMap = make(map[string]chan *trackingPacket_v1)

    // Channel that listens for the strings that want to exit
    shutdownChan := make(chan string)

    for {
        select {
        case msg := <-inboundFromTCP:
            log.Println("Got packet", msg.Avr)
            channel, ok := channelMap[msg.Avr]
            if !ok {
                packetChan := make(chan *trackingPacket_v1)

                channelMap[msg.Avr] = packetChan
                go processPackets(packetChan, shutdownChan, msg.Avr)
                packetChan <- msg
                continue
            }
            channel <- msg
        case shutdownString := <-shutdownChan:
            log.Println("Shutting down:", shutdownString)
            channel, ok := channelMap[shutdownString]
            if ok {
                delete(channelMap, shutdownString)
                close(channel)
            }
        }
    }
}

func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) {
    var messages = []*trackingPacket_v1{}

    tickChan := time.NewTicker(time.Second * 1)
    defer tickChan.Stop()

    hasCheckedData := false

    for {
        select {
        case msg := <-ch:
            log.Println("Got a messages for", id)
            messages = append(messages, msg)
            hasCheckedData = false
        case <-tickChan.C:

            messages = cullChanMessages(messages)
            if len(messages) == 0 {
                messages = nil
                shutdown <- id
                return
            }

            // No point running checking when packets have not changed!!
            if hasCheckedData == false {
                processMLATCandidatesFromChan(messages)
                hasCheckedData = true
            }
        case <-time.After(time.Duration(time.Second * 60)):
            log.Println("This channel has been around for 60 seconds which is too much, kill it")
            messages = nil
            shutdown <- id
            return
        }
    }
}

Update 01/20/16

I tried to rework with the channelMap as a global with some mutex locking but it ended up deadlocking still.


Slightly tweaked the code, still locks but I don't see how this one does!! https://play.golang.org/p/PGpISU4XBJ


Update 01/21/17 After some recommendations I put this into a standalone working example so people can see. https://play.golang.org/p/88zT7hBLeD

It is a long running process so will need running locally on a machine as the playground kills it. Hopefully this will help get to the bottom of it!

  • 写回答

2条回答 默认 最新

  • duanbei3747 2017-01-21 00:37
    关注

    I'm guessing that your problem is getting stuck doing this channel <- msg at the same time as the other goroutine is doing shutdown <- id.

    Since neither the channel nor the shutdown channels are buffered, they block waiting for a receiver. And they can deadlock waiting for the other side to become available.

    There are a couple of ways to fix it. You could declare both of those channels with a buffer of 1.

    Or instead of signalling by sending a shutdown message, you could do what Google's context package does and send a shutdown signal by closing the shutdown channel. Look at https://golang.org/pkg/context/ especially WithCancel, WithDeadline and the Done functions.

    You might be able to use context to remove your own shutdown channel and timeout code.

    And JimB has a point about shutting down the goroutine while it might still be receiving on the channel. What you should do is send the shutdown message (or close, or cancel the context) and continue to process messages until your ch channel is closed (detect that with case msg, ok := <-ch:), which would happen after the shutdown is received by the sender.

    That way you get all of the messages that were incoming until the shutdown actually happened, and should avoid a second deadlock.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
  • douba8048 2017-01-21 00:11
    关注

    I'm new to Go but in this code here

    case msg := <-inboundFromTCP:
            log.Println("Got packet", msg.Avr)
            channel, ok := channelMap[msg.Avr]
            if !ok {
                packetChan := make(chan *trackingPacket_v1)
    
                channelMap[msg.Avr] = packetChan
                go processPackets(packetChan, shutdownChan, msg.Avr)
                packetChan <- msg
                continue
            }
            channel <- msg
    

    Aren't you putting something in channel (unbuffered?) here

    channel, ok := channelMap[msg.Avr]
    

    So wouldn't you need to empty out that channel before you can add the msg here?

    channel <- msg
    

    Like I said, I'm new to Go so I hope I'm not being goofy. :)

    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 win11系统打开软件很慢
  • ¥30 XIAO esp32c3 读取FDC2214的数据
  • ¥15 在工控机(Ubuntu系统)上外接USB蓝牙硬件进行蓝牙通信
  • ¥15 关于PROCEDURE和FUNCTION的问题
  • ¥100 webapi的部署(标签-服务器)
  • ¥20 怎么加快手机软件内部计时的时间(关键词-日期时间)
  • ¥15 C语言除0问题的检测方法
  • ¥15 为什么四分管的内径有的是16mm有的15mm,四分不应该是12.7mm吗
  • ¥15 macos13下 ios交叉编译的问题
  • ¥15 bgz压缩文件怎么打开