douju1928 2015-04-28 22:10
浏览 56
已采纳

如何将多个goroutine同步到所选goroutine的终止(即Thread.join())

I asked this in a previous question, but some people felt that my original question was not detailed enough ("Why would you ever want a timed condition wait??") so here is a more specific one.

I have a goroutine running, call it server. It is already started, will execute for some amount of time, and do its thing. Then, it will exit since it is done.

During its execution some large number of other goroutines start. Call them "client" threads if you like. They run step A, and step B. Then, they must wait for the "server" goroutine to finish for a specified amount of time, and exit with status if "server is not finished, and say run step C if it finishes.

(Please do not tell me how to restructure this workflow. It is hypothetical and a given. It cannot be changed.)

A normal, sane way to do this is to have the server thread signal a condition variable with a selectAll or Broadcast function, and have the other threads in a timed wait state monitoring the condition variable.

func (s *Server) Join(timeMillis int) error {
  s.mux.Lock()
  defer s.mux.Unlock()
  while !s.isFinished {
     err = s.cond.Wait(timeMillis)
     if err != nil {
        stepC()
     }
  }
  return err
}

Where the server will enter a state where isFinished becomes true and broadcast signal the condition variable atomically with respect to the mutex. Except this is impoosible, since Go does not support timed condition waits. (But there is a Broadcast())

So, what is the "Go-centric" way to do this? I've reall all of the Go blogs and documentation, and this pattern or its equivalent, despite its obviousness, never comes up, nor any equivalent "reframing" of the basic problem - which is that IPC style channels are between one routine and one other routine. Yes, there is fan-in/fan-out, but remember these threads are constantly appearing and vanishing. This should be simple - and crucially /not leave thousands of "wait-state" goroutines hanging around waiting for the server to die when the other "branch" of the mux channel (the timer) has signalled/.

Note that some of the "client" above might be started before the server goroutine has started (this is when channel is usually created), some might appear during, and some might appear after... in all cases they should run stepC if and only if the server has run and exited after timeMillis milliseconds post entering the Join() function...

In general the channels facility seems sorely lacking when there's more than one consumer. "First build a registry of channels to which listeners are mapped" and "there's this really nifty recursive data structure which sends itself over a channel it holds as field" are so.not.ok as replacements to the nice, reliable, friendly, obvious: wait(forSomeTime)

  • 写回答

1条回答 默认 最新

  • dongnu4254 2015-04-28 23:15
    关注

    I think what you want can be done by selecting on a single shared channel, and then having the server close it when it's done.

    Say we create a global "Exit channel", that's shared across all goroutines. It can be created before the "server" goroutine is created. The important part is that the server goroutine never sends anything down the channel, but simply closes it.

    Now the client goroutines, simply do this:

    select {
        case <- ch:
        fmt.Println("Channel closed, server is done!")
        case <-time.After(time.Second):
        fmt.Println("Timed out. do recovery stuff")
    
    }
    

    and the server goroutine just does:

    close(ch)
    

    More complete example:

    package main
    
    import(
        "fmt"
        "time"
    
    )
    
    
    func waiter(ch chan struct{}) {
        fmt.Println("Doing stuff")
    
        fmt.Println("Waiting...")
    
        select {
            case <- ch:
            fmt.Println("Channel closed")
            case <-time.After(time.Second):
            fmt.Println("Timed out. do recovery stuff")
    
        }
    }
    
    
    func main(){
    
        ch := make(chan struct{})
    
        go waiter(ch)
        go waiter(ch)
        time.Sleep(100*time.Millisecond)
        fmt.Println("Closing channel")
        close(ch)
    
        time.Sleep(time.Second)
    
    }
    

    This can be abstracted as the following utility API:

    type TimedCondition struct {
        ch chan struct{}
    }
    
    func NewTimedCondition()*TimedCondition {
        return &TimedCondition {
            ch: make(chan struct{}),
        }
    }
    
    func (c *TimedCondition)Broadcast() {
        close(c.ch)
    }
    
    func (c *TimedCondition)Wait(t time.Duration) error {
        select {
            // channel closed, meaning broadcast was called
            case <- c.ch:
                return nil
            case <-time.After(t):
                return errors.New("Time out")   
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 划分vlan后不通了
  • ¥15 GDI处理通道视频时总是带有白色锯齿
  • ¥20 用雷电模拟器安装百达屋apk一直闪退
  • ¥15 算能科技20240506咨询(拒绝大模型回答)
  • ¥15 自适应 AR 模型 参数估计Matlab程序
  • ¥100 角动量包络面如何用MATLAB绘制
  • ¥15 merge函数占用内存过大
  • ¥15 Revit2020下载问题
  • ¥15 使用EMD去噪处理RML2016数据集时候的原理
  • ¥15 神经网络预测均方误差很小 但是图像上看着差别太大