doutangu4978 2018-04-01 08:43
浏览 88
已采纳

从接收方关闭通道:从多个goroutine访问sync.Mutex时出现死锁

I'm trying to implement graceful channel closing from receiver side.

Yes, I'm aware that this violates the channel closing rule:

...don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

But I want to implement such logic. Unfortunately, I fail into deadlock issue in the number of cases: the application just hangs for the unlimited time, trying to lock same locked Mutex again.

So, I have 2 goroutines:

  • one that will write into a channel and
  • another that will receive data + will close channel from the receiver side.

My channel wrapped in the struct with sync.Mutex and closed boolean flag:

type Chan struct {
    sync.Mutex // can be replaced with deadlock.Mutex from "github.com/sasha-s/go-deadlock"
    data           chan int
    closed         bool
}

All Send(), Close(), IsClosed() operations on this struct are guarded with Mutex and to prevent duplicate locking have the non-threadsafe method versions (send(), close(), isClosed()).

The full source code:

package main

import (
    "log"
    "net/http"
    "sync"
)

func main() {
    log.Println("Start")

    ch := New(0) // unbuffered channel to expose problem faster

    wg := sync.WaitGroup{}
    wg.Add(2)

    // send data:
    go func(ch *Chan) {
        for i := 0; i < 100; i++ {
            ch.Send(i)
        }
        wg.Done()
    }(ch)

    // receive data and close from receiver side:
    go func(ch *Chan) {
        for data := range ch.data {
            log.Printf("Received %d data", data)
            // Bad practice: I want to close the channel from receiver's side:
            if data > 50 {
                ch.Close()
                break
            }
        }
        wg.Done()
    }(ch)

    wg.Wait()
    log.Println("End")
}

type Chan struct {
    deadlock.Mutex //sync.Mutex
    data           chan int
    closed         bool
}

func New(size int) *Chan {
    defer func() {
        log.Printf("Channel was created")
    }()
    return &Chan{
        data: make(chan int, size),
    }
}

func (c *Chan) Send(data int) {
    c.Lock()
    c.send(data)
    c.Unlock()
}

func (c *Chan) Close() {
    c.Lock()
    c.close()
    c.Unlock()
}

func (c *Chan) IsClosed() bool {
    c.Lock()
    defer c.Unlock()
    return c.isClosed()
}

// send is internal non-threadsafe api.
func (c *Chan) send(data int) {
    if !c.closed {
        c.data <- data
        log.Printf("Data %d was sent", data)
    }
}

// close is internal non-threadsafe api.
func (c *Chan) close() {
    if !c.closed {
        close(c.data)
        c.closed = true
        log.Println("Channel was closed")
    } else {
        log.Println("Channel was already closed")
    }
}

// isClosed is internal non-threadsafe api.
func (c *Chan) isClosed() bool {
    return c.closed
}

You can run this program in the sandbox.

On the local machine, in small number of runs, after 30 seconds the output will be (using deadlock.Mutex instead of sync.Mutex):

2018/04/01 11:26:22 Data 50 was sent
2018/04/01 11:26:22 Received 50 data
2018/04/01 11:26:22 Data 51 was sent
2018/04/01 11:26:22 Received 51 data
POTENTIAL DEADLOCK:
Previous place where the lock was grabbed
goroutine 35 lock 0xc42015a040
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:71 main.(*Chan).Send { c.Lock() } <<<<<
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:30 main.main.func1 { ch.Send(i) }

Have been trying to lock it again for more than 30s
goroutine 36 lock 0xc42015a040
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:77 main.(*Chan).Close { c.Lock() } <<<<<
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:44 main.main.func2 { ch.Close() }

Why this deadlock happened and how to fix this implementation to avoid deadlocks?


Closing the channel on the sender's side is not the answer. So, this is not the fix for my question: Example of closing channel from sender side.

  • 写回答

2条回答 默认 最新

  • duanpao6163 2018-04-01 12:07
    关注

    You can try as hard as you like: you have to close the channel from sender side.

    You might be able to get it working without a complete lockdown but you will leak goroutines. The sender will block forever and cannot be shut down. If the receiver wants to trigger a shutdown it has to tell the sender to shut the channel down. How you could tell the sender to shut down:

    • A boolean as you suggest (needs another mutex)
    • A stop-channel that when closed signals the sender to close the data channel (cannot be closed multiple times)
    • a ctx.Context: calling the cancel() function will signal the sender to stop. (Can be cancelled multiple times without worry)

    (Only elaborating on Peters correct answer)

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 matlab用simulink求解一个二阶微分方程,要求截图
  • ¥30 matlab解优化问题代码
  • ¥15 写论文,需要数据支撑
  • ¥15 identifier of an instance of 类 was altered from xx to xx错误
  • ¥100 反编译微信小游戏求指导
  • ¥15 docker模式webrtc-streamer 无法播放公网rtsp
  • ¥15 学不会递归,理解不了汉诺塔参数变化
  • ¥15 基于图神经网络的COVID-19药物筛选研究
  • ¥30 软件自定义无线电该怎样使用
  • ¥15 R语言mediation包做中介分析,直接效应和间接效应都很小,为什么?