dtoqemais553654797 2017-11-14 21:10
浏览 162
已采纳

同时从一个写入多个csv文件,在Golang中的分区列上拆分

My objective is to read one or multiple csv files that share a common format, and write to separate files based on a partition column in the csv data. Please allow that the last column is the partition, that data is un-sorted, and a given partition can be found in multiple files. Example of one file:

fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02

If this approach smells like the dreaded XY Problem, I'm happy to adjust.

What I've tried so far:

  • Read in the data set and iterate over each line
  • If the partition has been seen, spin off a new worker routine (this will contain a file/csv writer). Send the line into a chan []string.
  • As each worker is a file writer, it should only receive lines for exactly one partition over it's input channel.

This obviously doesn't work (yet), as I'm not aware of how to send a line to the correct worker based on the partition value seen on a given line.

I've given each worker an id string for each partition value, but am not aware how to select that worker to send to, if I should be creating a separate chan []string for each worker and send to that channel with a select, or if perhaps a struct should hold each worker with some sort of pool and routing functionality.

TLDR; I'm lost as to how to conditionally send data to a given go routine or channel based on some categorical string value, where the number of unique's can be arbitrary, but likely does not exceed 24 unique partition values.

I will caveat by stating I've noticed questions like this do get down-voted, so if you feel this is counter-constructive or incomplete enough to down-vote, please comment with why so I can avoid repeating the offense.

Thanks for any help in advance!

Playground

Snippet:

  package main

    import (
        "encoding/csv"
        "fmt"
        "log"
        "strings"
        "time"
    )

    func main() {

        // CSV
        r := csv.NewReader(csvFile1)
        lines, err := r.ReadAll()
        if err != nil {
            log.Fatalf("error reading all lines: %v", err)
        }

        // CHANNELS
        lineChan := make(chan []string)

        // TRACKER
        var seenPartitions []string

        for _, line := range lines {

            hour := line[6]
            if !stringInSlice(hour, seenPartitions) {
                seenPartitions = append(seenPartitions, hour)
                go worker(hour, lineChan)
            }
            // How to send to the correct worker/channel? 
            lineChan <- line

        }
        close(lineChan)
    }

    func worker(id string, lineChan <-chan []string) {
        for j := range lineChan {
            fmt.Println("worker", id, "started  job", j)
            // Write to a new file here and wait for input over the channel
            time.Sleep(time.Second)
            fmt.Println("worker", id, "finished job", j)
        }
    }

    func stringInSlice(str string, list []string) bool {
        for _, v := range list {
            if v == str {
                return true
            }
        }
        return false
    }

    // DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04 
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)
  • 写回答

1条回答 默认 最新

  • dongxi9326 2017-11-15 15:31
    关注

    Synchronous version no go concurrent magic first (see concurrent version below).

    package main
    
    import (
        "encoding/csv"
        "fmt"
        "io"
        "log"
        "strings"
    )
    
    func main() {
    
        // CSV
        r := csv.NewReader(csvFile1)
        partitions := make(map[string][][]string)
    
        for {
            rec, err := r.Read()
            if err != nil {
                if err == io.EOF {
                    err = nil
    
                    save_partitions(partitions)
    
                    return
                }
                log.Fatal(err)
            }
    
            process(rec, partitions)
        }
    
    }
    
    // prints only
    func save_partitions(partitions map[string][][]string) {
        for part, recs := range partitions {
            fmt.Println(part)
            for _, rec := range recs {
                fmt.Println(rec)
            }
        }
    }
    
    // this can also write/append directly to a file
    func process(rec []string, partitions map[string][][]string) {
        l := len(rec)
        part := rec[l-1]
        if p, ok := partitions[part]; ok {
            partitions[part] = append(p, rec)
        } else {
            partitions[part] = [][]string{rec}
        }
    }
    
    // DUMMY
    var csvFile1 = strings.NewReader(`
    fsdio,abc,def,2017,11,06,01
    1sdf9,abc,def,2017,11,06,01
    1d243,abc,def,2017,11,06,01
    1v2t3,abc,def,2017,11,06,01
    a1523,abc,def,2017,11,06,01
    1r2r3,abc,def,2017,11,06,02
    11213,abc,def,2017,11,06,02
    g1253,abc,def,2017,11,06,02
    d1e23,abc,def,2017,11,06,02
    a1d23,abc,def,2017,11,06,02
    12jj3,abc,def,2017,11,06,03
    t1r23,abc,def,2017,11,06,03
    2123r,abc,def,2017,11,06,03
    22123,abc,def,2017,11,06,03
    14d23,abc,def,2017,11,06,04
    1da23,abc,def,2017,11,06,04
    12fy3,abc,def,2017,11,06,04
    12453,abc,def,2017,11,06,04`)
    

    https://play.golang.org/p/--iqZGzxCF

    And the concurrent version:

    package main
    
    import (
        "encoding/csv"
        "fmt"
        "io"
        "log"
        "strings"
        "sync"
    )
    
    var (
        // list of channels to communicate with workers
        // workers accessed synchronousely no mutex required
        workers = make(map[string]chan []string)
    
        // wg is to make sure all workers done before exiting main
        wg = sync.WaitGroup{}
    
        // mu used only for sequential printing, not relevant for program logic
        mu = sync.Mutex{}
    )
    
    func main() {
    
        // wait for all workers to finish up before exit
        defer wg.Wait()
    
        r := csv.NewReader(csvFile1)
    
        for {
            rec, err := r.Read()
            if err != nil {
                if err == io.EOF {
                    savePartitions()
                    return
                }
                log.Fatal(err) // sorry for the panic
            }
            process(rec)
        }
    
    }
    
    func process(rec []string) {
        l := len(rec)
        part := rec[l-1]
    
        if c, ok := workers[part]; ok {
            // send rec to worker
            c <- rec
        } else {
            // if no worker for the partition
    
            // make a chan
            nc := make(chan []string)
            workers[part] = nc
    
            // start worker with this chan
            go worker(nc)
    
            // send rec to worker via chan
            nc <- rec
        }
    }
    
    func worker(c chan []string) {
    
        // wg.Done signals to main worker completion
        wg.Add(1)
        defer wg.Done()
    
        part := [][]string{}
        for {
            // wait for a rec or close(chan)
            rec, ok := <-c
            if ok {
                // save the rec
                // instead of accumulation in memory
                // this can be saved to file directly
                part = append(part, rec)
            } else {
                // channel closed on EOF
    
                // dump partition
                // locks ensures sequential printing
                // not a required for independent files
                mu.Lock()
                for _, p := range part {
                    fmt.Printf("%+v
    ", p)
                }
                mu.Unlock()
    
                return
            }
        }
    }
    
    // simply signals to workers to stop
    func savePartitions() {
        for _, c := range workers {
            // signal to all workers to exit
            close(c)
        }
    }
    
    // DUMMY
    var csvFile1 = strings.NewReader(`
    fsdio,abc,def,2017,11,06,01
    1sdf9,abc,def,2017,11,06,01
    1d243,abc,def,2017,11,06,01
    1v2t3,abc,def,2017,11,06,01
    a1523,abc,def,2017,11,06,01
    1r2r3,abc,def,2017,11,06,02
    11213,abc,def,2017,11,06,02
    g1253,abc,def,2017,11,06,02
    d1e23,abc,def,2017,11,06,02
    a1d23,abc,def,2017,11,06,02
    12jj3,abc,def,2017,11,06,03
    t1r23,abc,def,2017,11,06,03
    2123r,abc,def,2017,11,06,03
    22123,abc,def,2017,11,06,03
    14d23,abc,def,2017,11,06,04
    1da23,abc,def,2017,11,06,04
    12fy3,abc,def,2017,11,06,04
    12453,abc,def,2017,11,06,04`)
    

    https://play.golang.org/p/oBTPosy0yT

    Have fun!

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
  • ¥50 成都蓉城足球俱乐部小程序抢票
  • ¥15 yolov7训练自己的数据集
  • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)