dongmaxi6763 2015-12-20 08:52
浏览 158
已采纳

Golang:同时处理5个大文件

I have 5 huge (4 million rows each) logfiles that I process in Perl currently and I thought I may try to implement the same in Go and its concurrent features. So, being very inexperienced in Go, I was thinking of doing as below. Any comments on the approach will be greatly appreciated. Some rough pseudocode:

var wg1 sync.WaitGroup
var wg2 sync.WaitGroup

func processRow (r Row) {
    wg2.Add(1)
    defer wg2.Done()
    res = <process r>
    return res
}

func processFile(f File) {
    wg1.Add(1)
    open(newfile File)
    defer wg1.Done()
    line = <row from f>
    result = go processRow(line)
    newFile.Println(result) // Write new processed line to newFile
    wg2.Wait()
    newFile.Close()

}

func main() {

    for each f logfile {
        go processFile(f)
    }
    wg1.Wait()
}

So, idea is that I process these 5 files concurrently and then all rows of each file will in turn also be processed concurrently.

Will that work?

  • 写回答

1条回答

  • duanju6788 2015-12-20 09:25
    关注

    You should definitely use channels to manage your processed rows. Alternatively you could also write another goroutine to handle your output.

    var numGoWriters = 10
    
    func processRow(r Row, ch chan<- string) {
        res := process(r)
        ch <- res
    }
    
    func writeRow(f File, ch <-chan string) {
        w := bufio.NewWriter(f)
        for s := range ch {
            _, err := w.WriteString(s + "
    ")
        }
    
    func processFile(f File) {
        outFile, err := os.Create("/path/to/file.out")
        if err != nil {
            // handle it
        }
        defer outFile.Close()
        var wg sync.WaitGroup
        ch := make(chan string, 10)  // play with this number for performance
        defer close(ch) // once we're done processing rows, we close the channel
                        // so our worker threads exit
        fScanner := bufio.NewScanner(f)
        for fScanner.Scan() {
            wg.Add(1)
            go func() {
                processRow(fScanner.Text(), ch)
                wg.Done()
            }()
        }
        for i := 0; i < numGoWriters; i++ {
            go writeRow(outFile, ch)
        }
        wg.Wait()  
    }
    

    Here we have processRow doing all the processing (I assumed to string), writeRow doing all the out I/O, and processFile tying each file together. Then all main has to do is hand off the files, spawn the goroutines, et voila.

    func main() {
        var wg sync.WaitGroup
    
        filenames := [...]string{"here", "are", "some", "log", "paths"}
        for fname := range filenames {
            inFile, err := os.Open(fname)
            if err != nil {
                // handle it
            }
            defer inFile.Close()
            wg.Add(1)
            go processFile(inFile)
        }
        wg.Wait()
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 用windows做服务的同志有吗
  • ¥60 求一个简单的网页(标签-安全|关键词-上传)
  • ¥35 lstm时间序列共享单车预测,loss值优化,参数优化算法
  • ¥15 Python中的request,如何使用ssr节点,通过代理requests网页。本人在泰国,需要用大陆ip才能玩网页游戏,合法合规。
  • ¥100 为什么这个恒流源电路不能恒流?
  • ¥15 有偿求跨组件数据流路径图
  • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
  • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
  • ¥15 一直显示正在等待HID—ISP
  • ¥15 Python turtle 画图