dou8mwz5079
2015-12-21 11:52
浏览 281
已采纳

使用sync.WaitGroup.wait时如何实现超时?

I have come across a situation that i want to trace some goroutine to sync on a specific point, for example when all the urls are fetched. Then, we can put them all and show them in specific order.

I think this is the barrier comes in. It is in go with sync.WaitGroup. However, in real situation that we can not make sure that all the fetch operation will succeed in a short time. So, i want to introduce a timeout when wait for the fetch operations.

I am a newbie to Golang, so can someone give me some advice?


What i am looking for is like this:

   wg := &sync.WaigGroup{}
   select {
   case <-wg.Wait():
   // All done!
   case <-time.After(500 * time.Millisecond):
   // Hit timeout.
   }

I know Wait do not support Channel.

  • 写回答
  • 关注问题
  • 收藏
  • 邀请回答

4条回答 默认 最新

  • duanchen7703 2015-12-22 23:54
    已采纳

    If all you want is your neat select, you can easily convert blocking function to a channel by spawning a routine which calls a method and closes/sends on channel once done.

    done := make(chan struct{})
    go func() {
       wg.Wait()
       close(done)
    }()
    
    select {
    case <-done:
    // All done!
    case <-time.After(500 * time.Millisecond):
    // Hit timeout.
    }
    
    已采纳该答案
    打赏 评论
  • dongyi0210 2015-12-22 21:05

    Another way to do it would be to monitor it internally, your question is limited but I'm going to assume you're starting your goroutines through a loop even if you're not you can refactor this to work for you but you could do one of these 2 examples, the first one will timeout each request to timeout individually and the second one will timeout the entire batch of requests and move on if too much time has passed

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        success := make(chan struct{}, 1)
        go func() {
            // send your request and wait for a response
            // pretend response was received
            time.Sleep(5 * time.Second)
            success <- struct{}{}
            // goroutine will close gracefully after return     
            fmt.Println("Returned Gracefully")
        }()
    
        select {
        case <-success:
            break
        case <-time.After(1 * time.Second):
            break
        }
    
        wg.Done()
        // everything should be garbage collected and no longer take up space
    }()
    
    wg.Wait()
    
    // do whatever with what you got    
    fmt.Println("Done")
    time.Sleep(10 * time.Second)
    fmt.Println("Checking to make sure nothing throws errors after limbo goroutine is done")
    

    Or if you just want a general easy way to timeout ALL requests you could do something like

    var wg sync.WaitGroup
    waiter := make(chan int)
    wg.Add(1)
    go func() {
        success := make(chan struct{}, 1)
        go func() {
            // send your request and wait for a response
            // pretend response was received
            time.Sleep(5 * time.Second)
            success <- struct{}{}
            // goroutine will close gracefully after return     
            fmt.Println("Returned Gracefully")
        }()
    
        select {
        case <-success:
            break
        case <-time.After(1 * time.Second):
            // control the timeouts for each request individually to make sure that wg.Done gets called and will let the goroutine holding the .Wait close
            break
        }
        wg.Done()
        // everything should be garbage collected and no longer take up space
    }()
    
    completed := false
    go func(completed *bool) {
        // Unblock with either wait
        wg.Wait()
        if !*completed {
            waiter <- 1         
            *completed = true
        }       
        fmt.Println("Returned Two")
    }(&completed)
    
    go func(completed *bool) {
        // wait however long
        time.Sleep(time.Second * 5)
        if !*completed {
            waiter <- 1         
            *completed = true
        }       
        fmt.Println("Returned One")
    }(&completed)
    
    
     // block until it either times out or .Wait stops blocking 
     <-waiter
    
    // do whatever with what you got    
    fmt.Println("Done")
    time.Sleep(10 * time.Second)
    fmt.Println("Checking to make sure nothing throws errors after limbo goroutine is done")
    

    This way your WaitGroup will stay in sync and you won't have any goroutines left in limbo

    http://play.golang.org/p/g0J_qJ1BUT try it here you can change the variables around to see it work differently

    Edit: I'm on mobile If anybody could fix the formatting that would be great thanks.

    打赏 评论
  • doujing6053 2015-12-22 21:19

    Send your results to a buffered channel enough to take all results, without blocking, and read them in for-select loop in the main thread:

    func work(msg string, d time.Duration, ret chan<- string) {
        time.Sleep(d) // Work emulation.
        select {
        case ret <- msg:
        default:
        }
    }
    
    // ...
    
    const N = 2
    ch := make(chan string, N)
    
    go work("printed", 100*time.Millisecond, ch)
    go work("not printed", 1000*time.Millisecond, ch)
    
    timeout := time.After(500 * time.Millisecond)
    loop:
    for received := 0; received < N; received++ {
        select {
        case msg := <-ch:
            fmt.Println(msg)
        case <-timeout:
            fmt.Println("timeout!")
            break loop
        }
    }
    

    Playground: http://play.golang.org/p/PxeEEJo2dz.

    See also: Go Concurrency Patterns: Timing out, moving on.

    打赏 评论
  • duan_2000 2017-06-25 15:21

    If you would like to avoid mixing concurrency logic with business logic, I wrote this library https://github.com/shomali11/parallelizer to help you with that. It encapsulates the concurrency logic so you do not have to worry about it.

    So in your example:

    package main
    
    import (
        "github.com/shomali11/parallelizer"
        "fmt"
    )
    
    func main() {
        urls := []string{ ... }
        results = make([]*HttpResponse, len(urls)
    
        options := &Options{ Timeout: time.Second }
        group := parallelizer.NewGroup(options)
        for index, url := range urls {
            group.Add(func(index int, url string, results *[]*HttpResponse) {
                return func () {
                    ...
    
                    results[index] = &HttpResponse{url, response, err}
                }
            }(index, url, &results))
        }
    
        err := group.Run()
    
        fmt.Println("Done")
        fmt.Println(fmt.Sprintf("Results: %v", results))
        fmt.Printf("Error: %v", err) // nil if it completed, err if timed out
    }
    
    打赏 评论

相关推荐 更多相似问题