dtrhd2850 2015-04-13 22:37 采纳率: 0%
浏览 24
已采纳

用于潜在递归任务的工作池(即,每个作业可以排队其他作业)

I'm writing an application that the user can start with a number of "jobs" (URLs actually). At the beginning (main routine), I add these URLs to a queue, then start x goroutines that work on these URLs.

In special cases, the resource a URL points to may contain even more URLs which have to be added to the queue. The 3 workers are waiting for new jobs to come in and process them. The problem is: once EVERY worker is waiting for a job (and none is producing any), the workers should stop altogether. So either all of them work or no one works.

My current implementation looks something like this and I don't think it's elegant. Unfortunately I couldn't think of a better way that wouldn't include race conditions and I'm not entirely sure if this implementation actually works as intended:

var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}

func work(working chan int) {
  absent := make(chan struct{}, 1)
  // if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.
  // This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
  one := false
  for {
    select {
    case u, ok := <-queue.Pop():
      if !ok {
        close(absent)
        return
      }
      if !one {
        // I have started working (delta + 1)
        working <- 1
        absent <- struct{}{}
        one = true
      }
      // do work with u (which may lead to queue.Push(urls...))
    case <-absent: // no jobs at the moment. consume absent => wait
      one = false
      working <- -1
    }
  }
}

func Start() {
  working := make(chan int)
  for i := 0; i < WORKER_COUNT; i++ {
    go work(working)
  }
  // the amount of actually working workers...
  sum := 0
  for {
    delta := <-working
    sum += delta
    if sum == 0 {
      queue.Close() // close channel -> kill workers.
      done <- struct{}{}
      return
    }
  }
}

Is there a better way to tackle this problem?

  • 写回答

1条回答 默认 最新

  • duandianwen1723 2015-04-13 23:17
    关注

    You can use a sync.WaitGroup (see docs) to control the lifetime of the workers, and use a non-blocking send so workers can't deadlock when they try to queue up more jobs:

    package main
    
    import "sync"
    
    const workers = 4
    
    type job struct{}
    
    func (j *job) do(enqueue func(job)) {
        // do the job, calling enqueue() for subtasks as needed
    }
    
    func main() {
        jobs, wg := make(chan job), new(sync.WaitGroup)
        var enqueue func(job)
    
        // workers
        for i := 0; i < workers; i++ {
            go func() {
                for j := range jobs {
                    j.do(enqueue)
                    wg.Done()
                }
            }()
        }
    
        // how to queue a job
        enqueue = func(j job) {
            wg.Add(1)
            select {
            case jobs <- j: // another worker took it
            default: // no free worker; do the job now
                j.do(enqueue)
                wg.Done()
            }
        }
    
        todo := make([]job, 1000)
        for _, j := range todo {
            enqueue(j)
        }
        wg.Wait()
        close(jobs)
    }
    

    It might appear as though buffering the jobs channel would prevent deadlocks adding jobs, but it wouldn't: the buffer could fill up, and then you're back where you started. Buffering is fine, and can be efficient in some scenarios; it's just not necessary or sufficient to prevent a deadlock.

    I ran into this situation in this function to kick off a parallel sort that's recursive in the same way your URL fetching is. It's probably harder to read than the example above, since details specific to sorting--like treating small tasks differently from big ones--are mixed in.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?