douzhiji2020 2018-07-02 07:41
浏览 32
已采纳

在我的代码中安全完成goroutine的正确方法是什么?

I am writing a simple tcp server, the goroutine model is very straight forward:

One goroutine is responsible for accepting new connections; for every new connection, three goroutines are started :

  1. one for read
  2. one for process and handle application logic
  3. one for write

Currently one server will serve no more than 1000 users, so I don't try to limit goroutine numbers.

for {
    conn, err := listener.Accept()
    // ....
    connHandler := connHandler{
        conn:      conn,
        done:      make(chan struct{}),
        readChan:  make(chan string, 100),
        writeChan: make(chan string, 100),
    }
    // ....
    go connHandler.readAll()    
    go connHandler.processAll() 
    go connHandler.writeAll()   
}

I use the done channel to notify all three channels to finish, when user logout or a permanent network error happened, done channel will be closed (use sync.Once to make sure closing only happen once):

func (connHandler *connHandler) Close() {
    connHandler.doOnce.Do(func() {
        connHandler.isClosed = true
        close(connHandler.done)
    })
}

Below is the code of writeAll() method:

func (connHandler *connHandler) writeAll() {
    writer := bufio.NewWriter(connHandler.conn)

    for {
        select {
        case <-connHandler.done:
            connHandler.conn.Close()
            return
        case msg := <-connHandler.writeChan:
            connHandler.writeOne(msg, writer)
        }
    }
}

There is a Send method to send message to a user, by sending strings to the write channel:

func (connHandler *connHandler) Send(msg string) {
    case connHandler.writeChan <- msg:
}

The Send method will be called mainly in processAll() goroutine, but also in many other goroutines because different users need to communicate with each other.

Now is the problem: if userA logout or network failed, userB send a message to userA, userB's goroutine may be permanently blocked because no one will ever receive the message from the channel.

My solution:

My first thought is to use a boolean value to make sure connHanler is not closed when sending to it:

func (connHandler *connHandler) Send(msg string) {
    if !connHandler.isClosed {
        connHandler.writeChan <- msg
    }
}

But I think connHandler.writeChan <- msg and close(done) can still happen at the same time, the possibility of blocking still exist. So I have to add a timeout:

func (connHandler *connHandler) Send(msg string) {
    if !connHandler.isClosed {
        timer := time.NewTimer(10 * time.Second)
        defer timer.Stop()
        select {
        case connHandler.writeChan <- msg:
        case <-timer.C:
            log.Warning(connHandler.Addr() + " send msg timeout:" + msg)
        }
    }

}

Now I feel the code is safe, but also ugly, and start a timer every time when sending a message feels like an unnecessary overhead.

Then I read this article: https://go101.org/article/channel-closing.html, My problem looks like the second example in the article:

One receiver, N senders, the receiver says "please stop sending more" by closing an additional signal channel

But I think this solution can't eliminate the possibility of blocking in my circumstance.

Maybe the easiest solution is to just close the write channel and let the Send method panic, then use recover to handle the panic? But this looks like an ugly way, too.

So is there a simple and straight forward way to accomplish what I want to do?

(My English is not good, so if there is any ambiguity, please point out, thanks.)

  • 写回答

1条回答 默认 最新

  • dream_high1026 2018-07-02 08:20
    关注

    Your example looks pretty good, and I think you've got 90% of what you need.

    I think the problem that you are seeing is with sending, when you might actually be "done".

    You can use the "done" channel to notify to all the go routines that you've finished. You will always be able to read a value from a closed channel (it will be the zero value). This means that you can update your Send(msg) method to consider the done channel.

    func (connHandler *connHandler) Send(msg string) {
        select {
        case connHandler.writeChan <- msg:
        case <- connHandler.done:
            log.Debug("connHandler is done, exiting Send without sending.")
        case <-time.After(10 * time.Second):
            log.Warning(connHandler.Addr() + " send msg timeout:" + msg)
        }
    }
    

    What will happen in this select now is one of:

    1. msg is sent to writeChan
    2. close(done) has been called elsewhere, the done chan is closed. You will be able to read from done, breaking the select.
    3. time.After(...) will get to 10seconds, and you will be able to timeout the send.
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 关于#matlab#的问题:在模糊控制器中选出线路信息,在simulink中根据线路信息生成速度时间目标曲线(初速度为20m/s,15秒后减为0的速度时间图像)我想问线路信息是什么
  • ¥15 banner广告展示设置多少时间不怎么会消耗用户价值
  • ¥16 mybatis的代理对象无法通过@Autowired装填
  • ¥15 可见光定位matlab仿真
  • ¥15 arduino 四自由度机械臂
  • ¥15 wordpress 产品图片 GIF 没法显示
  • ¥15 求三国群英传pl国战时间的修改方法
  • ¥15 matlab代码代写,需写出详细代码,代价私
  • ¥15 ROS系统搭建请教(跨境电商用途)
  • ¥15 AIC3204的示例代码有吗,想用AIC3204测量血氧,找不到相关的代码。