doutangxi2144 2018-02-06 03:35
浏览 30

如何像没有工人一样订购工人的结果?

Suppose that I have the following code to read lines and multiple each line by 2 and print each line out one by one.

I'd like to use N workers. Each worker takes M lines each time and processes them. More importantly, I'd like the output to be printed in the same order as the input. But the example here does not guarantee the output is printed in the same order as the input.

https://gobyexample.com/worker-pools

The following URL also shows some examples. But I don't think they fit my requirement. The problem is that the input can be arbitrarily long. There is no way to hold everything in memory before they are printed. There must be a way to get some output from the workers can determine if the output of a worker is ready to be printed and then it is print. It sounds like there should be a master goroutine to do this. But I am not sure how to implement it most efficiently, as this master gorountine can easily be a bottleneck when N is big.

How to collect values from N goroutines executed in a specific order?

Could anybody show an example program that results from the workers in order and prints the results as early as they can be printed?

$ cat main.go
#!/usr/bin/env gorun
// vim: set noexpandtab tabstop=2:

package main

import (
    "bufio"
    "fmt"
    "strconv"
    "io"
    "os"
    "log"
)

func main() {
    stdin := bufio.NewReader(os.Stdin)

    for {
        line, err := stdin.ReadString('
')

        if err == io.EOF {
            if len(line) != 0 {
                i, _ := strconv.Atoi(line)
                fmt.Println(i*2)
            }
            break
        } else if err != nil {
            log.Fatal(err)
        }

        i, _ := strconv.Atoi(line[:(len(line)-1)])
        fmt.Println(i*2)
    }
}
  • 写回答

1条回答 默认 最新

  • dtx9763 2018-02-06 07:25
    关注

    If workers know the initial order they are e.g. informed about line numbers for example, then let workers preserve that information (just the line numbers). Your workers then feed that information back to your results channel. Your results aggregating code that receives from results channel then orders results based on the initial ordering information before further processing (e.g. printing).

    Below is quick modification of one of the examples you show.

    package main

    import "fmt"
    import "time"
    
    type Result struct {
        Data, Seq int
    }
    
    type Job struct {
        Data string
        Seq  int
    }
    
    func worker(id int, jobs <-chan Job, results chan<- Result) {
        for j := range jobs {
            fmt.Println("worker", id, "started  job", j)
            time.Sleep(time.Second)
            fmt.Println("worker", id, "finished job", j)
            results <- Result{len(j.Data), j.Seq}
        }
    }
    
    func main() {
        workload := 5
    
        jobs := make(chan Job, 100)
        results := make(chan Result, 100)
    
        output := make([]Result, workload)
    
        for w := 1; w <= 3; w++ {
            go worker(w, jobs, results)
        }
    
        for j := 0; j < workload; j++ {
            jobs <- Job{ // explicit to make it clear
                Data: fmt.Sprintf("blah blah blah %d", j),
                Seq:  j,
            }
        }
        close(jobs)
    
        // receive results
        for a := 1; a <= workload; a++ {
            res := <-results
            output[res.Seq] = res
    
            // uncomment to see unordered
            // fmt.Printf("received: %#v", res)
        }
    
        for _, out := range output {
            fmt.Printf("output %#v
    ", out)
        }
    }
    

    BTW: this does not work well if you do not know your workload in advance... In which case your code that receives results needs to be a little smarter in processing part that is already received and ordered (homework) :). Essentially wait for line 0 then wait for next or print what is already received in sequence.

    Have fun!

    评论

报告相同问题?

悬赏问题

  • ¥15 flink cdc无法实时同步mysql数据
  • ¥100 有人会搭建GPT-J-6B框架吗?有偿
  • ¥15 求差集那个函数有问题,有无佬可以解决
  • ¥15 【提问】基于Invest的水源涵养
  • ¥20 微信网友居然可以通过vx号找到我绑的手机号
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名