doumu1873 2019-02-09 16:32
浏览 39

如何实现流水线到goroutines?

I need some help on understanding how to use pipeline to get data to transfer from one goroutine to another.

I read the golang blogpost on pipeline, I understood it but couldn't fully put it into action and thus thought seeking help from the community.

Now, I have come up with this ugly code ( Playground ) :

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    wg := sync.WaitGroup{}
    ch := make(chan int)
    for a := 0; a < 3; a++ {
        wg.Add(1)
        go func1(int(3-a), ch, &wg)
    }
    go func() {
        wg.Wait()
        close(ch)
    }()
    wg2 := sync.WaitGroup{}
    ch2 := make(chan string)
    for val := range ch {
        fmt.Println(val)
        wg2.Add(1)
        go func2(val, ch2, &wg2)
    }
    go func() {
        wg2.Wait()
        close(ch2)
    }()
    for val := range ch2 {
        fmt.Println(val)
    }
}

func func1(seconds int, ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    time.Sleep(time.Duration(seconds) * time.Second)
    ch <- seconds
}

func func2(seconds int, ch chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    ch <- "hello"
}

Problem

I want to do it the proper way using pipelines or whatever is the proper way to do it.

Also, the pipeline shown in the blogpost isn't for goroutines and thus I am not able to do it myself.

In real life those func1 and func2 are functions which fetch resources from the web and hence they're launched in their own goroutine.

Thanks.
Temporarya
( A golang noobie )

P.S. Real life examples and usage of pipeline using goroutines would be of great help too.

  • 写回答

1条回答 默认 最新

  • doukuang6795 2019-02-09 20:09
    关注

    The key pattern of that pipelines post is that you can view the contents of a channel as a stream of data, and write a set of cooperating goroutines that build up a data-processing stream graph. This can be a way to get some concurrency into a data-oriented application.

    In terms of design, you may also find it more helpful to build up blocks that aren't tied to the goroutine structure, and wrap them in channels. This makes it much easier to test the lower-level code, and if you change your mind about running things in a goroutine or not, it's easier to add or remove the wrapper.

    So in your example I'd start by refactoring the lowest-level tasks out into their own (synchronous) functions:

    func fetch(ms int) int {
        time.Sleep(time.Duration(ms) * time.Millisecond)
        return ms
    }
    
    func report(ms int) string {
        return fmt.Sprintf("Hello after %d ms", ms)
    }
    

    Since the second half of your example is fairly synchronous, it's easy to adapt to the pipeline pattern. We write a function that consumes all of its input stream and produces a complete output stream, closing it when it's done.

    func reportAll(mss <-chan int, out chan<- string) {
        for ms := range mss {
            out <- report(ms)
        }
        close(out)
    }
    

    The function that calls the asynchronous code is a little tricker. In the main loop of the function, every time you read a value, you need to launch a goroutine to process it. Then after you've read everything out of the input channel you need to wait for all of those goroutines to finish before closing the output channel. You can use a small anonymous function here to help.

    func fetchAll(mss <-chan int, out chan<- int) {
        var wg sync.WaitGroup
        for ms := range mss {
            wg.Add(1)
            go func(ms int) {
                out <- fetch(ms)
                wg.Done()
            }(ms)
        }
        wg.Wait()
        close(out)
    }
    

    It's also helpful here (because channel writes are blocking) to write another function to seed the input values.

    func produceInputs(mss chan<- int) {
        for ms := 1000; ms > 0; ms -= 300 {
            mss <- ms
        }
        close(mss)
    }
    

    Now your main function needs to create the channels between these and run the final consumer.

    // main is the entry point to the program.
    //
    //                   mss        fetched       results
    //     produceInputs --> fetchAll --> reportAll --> main
    func main() {
        mss := make(chan int)
        fetched := make(chan int)
        results := make(chan string)
    
        go produceInputs(mss)
        go fetchAll(mss, fetched)
        go reportAll(fetched, results)
    
        for val := range results {
            fmt.Println(val)
        }
    }
    

    https://play.golang.org/p/V9Z7ECUVIJL is a complete example.

    I've avoided manually passing around sync.WaitGroups here (and tend to do that in general: you wouldn't have a WaitGroup unless you're explicitly calling something as the top level of a goroutine, so pushing the WaitGroup management up to the caller makes the code more modular; see my fetchAll function above for an example). How do I know all of my goroutines have finished? We can trace through:

    • If I've reached the end of main, the results channel is closed.
    • The results channel is the output channel of reportAll; if it closed, then that function reached the end of its execution; and if that happened then the fetched channel is closed.
    • The fetched channel is the output channel of fetchAll; ...

    Another way to look at this is that as soon as the pipeline's source (produceInputs) closes its output channel and finishes, that "I'm done" signal flows down the pipeline and causes the downstream steps to close their output channels and finish too.

    The blog post mentions a separate explicit close channel. I haven't gone into that here at all. Since it was written, though, the standard library gained the context package, which is now the standard idiom for managing those. You'd need to use a select statement in the body of the main loop, which makes the handling a little more complicated. This might look like:

    func reportAllCtx(ctx context.Context, mss <-chan int, out chan<- string) {
        for {
            select {
                case <-ctx.Done():
                    break
                case ms, ok := <-mss:
                    if ok {
                        out <- report(ms)
                    } else {
                        break
                    }
                }
            }
        }
        close(out)
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 制裁名单20240508芯片厂商
  • ¥20 易康econgnition精度验证
  • ¥15 线程问题判断多次进入
  • ¥15 msix packaging tool打包问题
  • ¥28 微信小程序开发页面布局没问题,真机调试的时候页面布局就乱了
  • ¥15 python的qt5界面
  • ¥15 无线电能传输系统MATLAB仿真问题
  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接