My objective is to read one or multiple csv files that share a common format, and write to separate files based on a partition column in the csv data. Please allow that the last column is the partition, that data is un-sorted, and a given partition can be found in multiple files. Example of one file:
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02
If this approach smells like the dreaded XY Problem, I'm happy to adjust.
What I've tried so far:
- Read in the data set and iterate over each line
- If the partition has
been seen, spin off a new worker routine (this will contain a file/csv
writer). Send the line into a
chan []string
. - As each worker is a file writer, it should only receive lines for exactly one partition over it's input channel.
This obviously doesn't work (yet), as I'm not aware of how to send a line to the correct worker based on the partition value seen on a given line.
I've given each worker an id string
for each partition value, but am not aware how to select that worker to send to, if I should be creating a separate chan []string
for each worker and send to that channel with a select
, or if perhaps a struct should hold each worker with some sort of pool and routing functionality.
TLDR; I'm lost as to how to conditionally send data to a given go routine or channel based on some categorical string
value, where the number of unique's can be arbitrary, but likely does not exceed 24 unique partition values.
I will caveat by stating I've noticed questions like this do get down-voted, so if you feel this is counter-constructive or incomplete enough to down-vote, please comment with why so I can avoid repeating the offense.
Thanks for any help in advance!
Snippet:
package main
import (
"encoding/csv"
"fmt"
"log"
"strings"
"time"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
lines, err := r.ReadAll()
if err != nil {
log.Fatalf("error reading all lines: %v", err)
}
// CHANNELS
lineChan := make(chan []string)
// TRACKER
var seenPartitions []string
for _, line := range lines {
hour := line[6]
if !stringInSlice(hour, seenPartitions) {
seenPartitions = append(seenPartitions, hour)
go worker(hour, lineChan)
}
// How to send to the correct worker/channel?
lineChan <- line
}
close(lineChan)
}
func worker(id string, lineChan <-chan []string) {
for j := range lineChan {
fmt.Println("worker", id, "started job", j)
// Write to a new file here and wait for input over the channel
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
}
}
func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
// DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)