doutangu4978 2018-04-01 08:43
浏览 88
已采纳

从接收方关闭通道:从多个goroutine访问sync.Mutex时出现死锁

I'm trying to implement graceful channel closing from receiver side.

Yes, I'm aware that this violates the channel closing rule:

...don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

But I want to implement such logic. Unfortunately, I fail into deadlock issue in the number of cases: the application just hangs for the unlimited time, trying to lock same locked Mutex again.

So, I have 2 goroutines:

  • one that will write into a channel and
  • another that will receive data + will close channel from the receiver side.

My channel wrapped in the struct with sync.Mutex and closed boolean flag:

type Chan struct {
    sync.Mutex // can be replaced with deadlock.Mutex from "github.com/sasha-s/go-deadlock"
    data           chan int
    closed         bool
}

All Send(), Close(), IsClosed() operations on this struct are guarded with Mutex and to prevent duplicate locking have the non-threadsafe method versions (send(), close(), isClosed()).

The full source code:

package main

import (
    "log"
    "net/http"
    "sync"
)

func main() {
    log.Println("Start")

    ch := New(0) // unbuffered channel to expose problem faster

    wg := sync.WaitGroup{}
    wg.Add(2)

    // send data:
    go func(ch *Chan) {
        for i := 0; i < 100; i++ {
            ch.Send(i)
        }
        wg.Done()
    }(ch)

    // receive data and close from receiver side:
    go func(ch *Chan) {
        for data := range ch.data {
            log.Printf("Received %d data", data)
            // Bad practice: I want to close the channel from receiver's side:
            if data > 50 {
                ch.Close()
                break
            }
        }
        wg.Done()
    }(ch)

    wg.Wait()
    log.Println("End")
}

type Chan struct {
    deadlock.Mutex //sync.Mutex
    data           chan int
    closed         bool
}

func New(size int) *Chan {
    defer func() {
        log.Printf("Channel was created")
    }()
    return &Chan{
        data: make(chan int, size),
    }
}

func (c *Chan) Send(data int) {
    c.Lock()
    c.send(data)
    c.Unlock()
}

func (c *Chan) Close() {
    c.Lock()
    c.close()
    c.Unlock()
}

func (c *Chan) IsClosed() bool {
    c.Lock()
    defer c.Unlock()
    return c.isClosed()
}

// send is internal non-threadsafe api.
func (c *Chan) send(data int) {
    if !c.closed {
        c.data <- data
        log.Printf("Data %d was sent", data)
    }
}

// close is internal non-threadsafe api.
func (c *Chan) close() {
    if !c.closed {
        close(c.data)
        c.closed = true
        log.Println("Channel was closed")
    } else {
        log.Println("Channel was already closed")
    }
}

// isClosed is internal non-threadsafe api.
func (c *Chan) isClosed() bool {
    return c.closed
}

You can run this program in the sandbox.

On the local machine, in small number of runs, after 30 seconds the output will be (using deadlock.Mutex instead of sync.Mutex):

2018/04/01 11:26:22 Data 50 was sent
2018/04/01 11:26:22 Received 50 data
2018/04/01 11:26:22 Data 51 was sent
2018/04/01 11:26:22 Received 51 data
POTENTIAL DEADLOCK:
Previous place where the lock was grabbed
goroutine 35 lock 0xc42015a040
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:71 main.(*Chan).Send { c.Lock() } <<<<<
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:30 main.main.func1 { ch.Send(i) }

Have been trying to lock it again for more than 30s
goroutine 36 lock 0xc42015a040
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:77 main.(*Chan).Close { c.Lock() } <<<<<
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:44 main.main.func2 { ch.Close() }

Why this deadlock happened and how to fix this implementation to avoid deadlocks?


Closing the channel on the sender's side is not the answer. So, this is not the fix for my question: Example of closing channel from sender side.

  • 写回答

2条回答 默认 最新

  • duanpao6163 2018-04-01 12:07
    关注

    You can try as hard as you like: you have to close the channel from sender side.

    You might be able to get it working without a complete lockdown but you will leak goroutines. The sender will block forever and cannot be shut down. If the receiver wants to trigger a shutdown it has to tell the sender to shut the channel down. How you could tell the sender to shut down:

    • A boolean as you suggest (needs another mutex)
    • A stop-channel that when closed signals the sender to close the data channel (cannot be closed multiple times)
    • a ctx.Context: calling the cancel() function will signal the sender to stop. (Can be cancelled multiple times without worry)

    (Only elaborating on Peters correct answer)

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 STM32驱动继电器
  • ¥15 Windows server update services
  • ¥15 关于#c语言#的问题:我现在在做一个墨水屏设计,2.9英寸的小屏怎么换4.2英寸大屏
  • ¥15 模糊pid与pid仿真结果几乎一样
  • ¥15 java的GUI的运用
  • ¥15 我想付费需要AKM公司DSP开发资料及相关开发。
  • ¥15 怎么配置广告联盟瀑布流
  • ¥15 Rstudio 保存代码闪退
  • ¥20 win系统的PYQT程序生成的数据如何放入云服务器阿里云window版?
  • ¥50 invest生境质量模块