dongpo7467 2019-09-13 11:02
浏览 35

Apache Beam Go SDK-数据流无法正确自动缩放(并行化步骤)

I have a beam batch pipeline written in Go that takes a .csv file of 20 million rows (around 600mb worth of data), do basics transformation steps such as SumPerKey and write back the output to GCS.

When running the pipeline on Dataflow, it invokes a pool of 1 runner only!

I was expecting Dataflow to parallelize the job between multiple workers for this amount of data. Am I missing something ?

Here's my code:

func main() {
    flag.Parse()

    beam.Init()

    p, s := beam.NewPipelineWithRoot()

    ctx := context.Background()

    log.Infof(ctx, "Started pipeline on scope: %s", s)

    /* [TEST PIPELINE START ]*/

    sr := csvio.Read(s, *input, reflect.TypeOf(Rating{}))

    pwo := beam.ParDo(s.Scope("Pair Key With One"),
        func(x Rating, emit func(int, int)) {
            emit(x.UserId, 1)
        }, sr)

    spk := stats.SumPerKey(s, pwo)

    mp := beam.ParDo(s.Scope("Map KV To Struct"),
        func(k int, v int, emit func(UserRatings)) {
            emit(UserRatings{
                UserId:  k,
                Ratings: v,
            })
        }, spk)

    t := top.Largest(s, mp, 1000, func(x, y UserRatings) bool { return x.Ratings < y.Ratings })

    o := beam.ParDo(s, func(x []UserRatings) string {
        if data, err := json.MarshalIndent(x, "", ""); err != nil {
            return fmt.Sprintf("[Err]: %v", err)
        } else {
            return fmt.Sprintf("Output: %s", data)
        }
    }, t)

    textio.Write(s, *output, o)

    /* [TEST PIPELINE END ]*/

    if err := beamx.Run(ctx, p); err != nil {
        fmt.Println(err)
        log.Exitf(ctx, "Failed to execute job: on ctx=%v:")
    }
}

Full Code Here

I deploy the pipeline via this command line:

go run main.go \
  --runner dataflow \
  --max_num_workers 10 \
  --file gs://${BUCKET?}/ratings.csv \
  --output gs://${BUCKET?}/reporting.txt \
  --project ${PROJECT?} \
  --temp_location gs://${BUCKET?}/tmp/ \
  --staging_location gs://${BUCKET?}/binaries/ \
  --worker_harness_container_image=gcr.io/drawndom-app/beam/go:latest

Note: When I set --num_workers to 5, it invokes 5 workers but I want it to do that automatically.

  • 写回答

1条回答 默认 最新

  • doupiai5597 2019-09-13 14:43
    关注

    Update:

    I've added a Reshuffle step right before the .csv input thanks to this lib and Dataflow was able to do auto-scaling by adding 1 more worker.

    I still need to understand how to optimise parallelism on my pipelines.

    Worker history

    Code used:

    func Reshuffle(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("Reshuffle")
    
        col = beam.ParDo(s, func(x beam.X) (int, beam.X) {
            return rand.Int(), x
        }, col)
        col = beam.GroupByKey(s, col)
        return beam.ParDo(s, func(key int, values func(*beam.X) bool, emit func(beam.X)) {
            var x beam.X
            for values(&x) {
                emit(x)
            }
        }, col)
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 乘性高斯噪声在深度学习网络中的应用
  • ¥15 运筹学排序问题中的在线排序
  • ¥15 关于docker部署flink集成hadoop的yarn,请教个问题 flink启动yarn-session.sh连不上hadoop,这个整了好几天一直不行,求帮忙看一下怎么解决
  • ¥30 求一段fortran代码用IVF编译运行的结果
  • ¥15 深度学习根据CNN网络模型,搭建BP模型并训练MNIST数据集
  • ¥15 C++ 头文件/宏冲突问题解决
  • ¥15 用comsol模拟大气湍流通过底部加热(温度不同)的腔体
  • ¥50 安卓adb backup备份子用户应用数据失败
  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛
  • ¥30 python代码,帮调试,帮帮忙吧