dongyi7966 2014-01-07 18:11
浏览 34
已采纳

Golang:生产者/消费者并发模型,但具有序列化结果

func main() {
    jobs := []Job{job1, job2, job3}
    numOfJobs := len(jobs)
    resultsChan := make(chan *Result, numOfJobs)
    jobChan := make(chan *job, numOfJobs)
    go consume(numOfJobs, jobChan, resultsChan)
    for i := 0; i < numOfJobs; i++ {
        jobChan <- jobs[i]
    }
    close(jobChan)

    for i := 0; i < numOfJobs; i++ {
        <-resultsChan
    }
    close(resultsChan)
}

func (b *Blockchain) consume(num int, jobChan chan *Job, resultsChan chan *Result) {
    for i := 0; i < num; i++ {
        go func() {
            job := <-jobChan
            resultsChan <- doJob(job)
        }()
    }
}

In the above example, jobs are pushed into the jobChan and goroutines will pull it off the jobChan and execute the jobs concurrently and push results into resultsChan. We will then pull results out of resultsChan.

Question 1:

In my code, there is no serialized/linearilized results. Although jobs go in the order of job1, job2, job3. The results might come out as job3, job1, job2, depending which one takes the longest.

I would still like to execute the jobs concurrently, however, I need to make sure that results come out of the resultsChan in the same order that it went in as jobs.

Question2:

I have approximately 300k jobs, this means the code will generate up to 300k goroutines. Is this efficient to have so many goroutines or would I be better off group the jobs together in a slice of 100 or so and have each goroutine go through 100 rather than 1.

  • 写回答

1条回答 默认 最新

  • donkey199024 2014-01-07 19:33
    关注

    Here's a way I've handled serialization (and also setting a limited number of workers). I set some worker objects with input and output fields and synchronization channels, then I go round-robin through them, picking up any work they've done and giving them a new job. Then I make one final pass through them to pick up any completed jobs that are left over. Note you might want the worker count to exceed your core count somewhat, so that you can keep all resources busy for a bit even when there's one unusually long job. Code is at http://play.golang.org/p/PM9y4ieMxw and below.

    This is hairy (hairier than I remember it being before sitting down to write an example!)--would love to see what anyone else has, either just better implementations or a whole different way to accomplish your goal.

    package main
    
    import (
        "fmt"
        "math/rand"
        "runtime"
        "time"
    )
    
    type Worker struct {
        in     int
        out    int
        inited bool
    
        jobReady chan bool
        done     chan bool
    }
    
    func (w *Worker) work() {
        time.Sleep(time.Duration(rand.Float32() * float32(time.Second)))
        w.out = w.in + 1000
    }
    func (w *Worker) listen() {
        for <-w.jobReady {
            w.work()
            w.done <- true
        }
    }
    func doSerialJobs(in chan int, out chan int) {
        concurrency := 23
        workers := make([]Worker, concurrency)
        i := 0
        // feed in and get out items
        for workItem := range in {
            w := &workers[i%
                concurrency]
            if w.inited {
                <-w.done
                out <- w.out
            } else {
                w.jobReady = make(chan bool)
                w.done = make(chan bool)
                w.inited = true
                go w.listen()
            }
            w.in = workItem
            w.jobReady <- true
            i++
        }
        // get out any job results left over after we ran out of input
        for n := 0; n < concurrency; n++ {
            w := &workers[i%concurrency]
            if w.inited {
                <-w.done
                out <- w.out
            }
            close(w.jobReady)
            i++
        }
        close(out)
    }
    func main() {
        runtime.GOMAXPROCS(10)
        in, out := make(chan int), make(chan int)
        allFinished := make(chan bool)
        go doSerialJobs(in, out)
        go func() {
            for result := range out {
                fmt.Println(result)
            }
            allFinished <- true
        }()
        for i := 0; i < 100; i++ {
            in <- i
        }
        close(in)
        <-allFinished
    }
    

    Note that only in and out in this example carry actual data--all the other channels are just for synchronization.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥100 如何修改别人网站内的数字,各位接单的私我
  • ¥15 poi怎么在指定位置之后生成多个表格
  • ¥15 nginx在location设置了client_max_body_size依旧报错413
  • ¥15 C#添加WCF服务引用客户端调用方法没有反应
  • ¥15 stm32f103c8t6与esp8266
  • ¥15 使用Hadoop的MapReduce program
  • ¥15 Python发生IndexError错误如何解决?
  • ¥15 如何用matlab搭建激光器
  • ¥15 51单片机两路互补SPWM输出全桥逆变
  • ¥15 outlook左边用户名下拉为啥打不开了