I have a beam batch pipeline written in Go that takes a .csv file of 20 million rows (around 600mb worth of data), do basics transformation steps such as SumPerKey and write back the output to GCS.
When running the pipeline on Dataflow, it invokes a pool of 1 runner only!
I was expecting Dataflow to parallelize the job between multiple workers for this amount of data. Am I missing something ?
Here's my code:
func main() {
flag.Parse()
beam.Init()
p, s := beam.NewPipelineWithRoot()
ctx := context.Background()
log.Infof(ctx, "Started pipeline on scope: %s", s)
/* [TEST PIPELINE START ]*/
sr := csvio.Read(s, *input, reflect.TypeOf(Rating{}))
pwo := beam.ParDo(s.Scope("Pair Key With One"),
func(x Rating, emit func(int, int)) {
emit(x.UserId, 1)
}, sr)
spk := stats.SumPerKey(s, pwo)
mp := beam.ParDo(s.Scope("Map KV To Struct"),
func(k int, v int, emit func(UserRatings)) {
emit(UserRatings{
UserId: k,
Ratings: v,
})
}, spk)
t := top.Largest(s, mp, 1000, func(x, y UserRatings) bool { return x.Ratings < y.Ratings })
o := beam.ParDo(s, func(x []UserRatings) string {
if data, err := json.MarshalIndent(x, "", ""); err != nil {
return fmt.Sprintf("[Err]: %v", err)
} else {
return fmt.Sprintf("Output: %s", data)
}
}, t)
textio.Write(s, *output, o)
/* [TEST PIPELINE END ]*/
if err := beamx.Run(ctx, p); err != nil {
fmt.Println(err)
log.Exitf(ctx, "Failed to execute job: on ctx=%v:")
}
}
I deploy the pipeline via this command line:
go run main.go \
--runner dataflow \
--max_num_workers 10 \
--file gs://${BUCKET?}/ratings.csv \
--output gs://${BUCKET?}/reporting.txt \
--project ${PROJECT?} \
--temp_location gs://${BUCKET?}/tmp/ \
--staging_location gs://${BUCKET?}/binaries/ \
--worker_harness_container_image=gcr.io/drawndom-app/beam/go:latest
Note: When I set --num_workers
to 5, it invokes 5 workers but I want it to do that automatically.