Golang:同时处理5个大文件

I have 5 huge (4 million rows each) logfiles that I process in Perl currently and I thought I may try to implement the same in Go and its concurrent features. So, being very inexperienced in Go, I was thinking of doing as below. Any comments on the approach will be greatly appreciated. Some rough pseudocode:

var wg1 sync.WaitGroup
var wg2 sync.WaitGroup

func processRow (r Row) {
    wg2.Add(1)
    defer wg2.Done()
    res = <process r>
    return res
}

func processFile(f File) {
    wg1.Add(1)
    open(newfile File)
    defer wg1.Done()
    line = <row from f>
    result = go processRow(line)
    newFile.Println(result) // Write new processed line to newFile
    wg2.Wait()
    newFile.Close()

}

func main() {

    for each f logfile {
        go processFile(f)
    }
    wg1.Wait()
}

So, idea is that I process these 5 files concurrently and then all rows of each file will in turn also be processed concurrently.

Will that work?

dscs63759
dscs63759 不,顺序一点都不重要
4 年多之前 回复
dongxi4235
dongxi4235 您是否需要以与输入文件相同的顺序输出行结果?
4 年多之前 回复
duanchen7703
duanchen7703 是的,但是所有这些加起来(也添加到waitgroup中),如果处理真的很简单,那么就不值得这样做。
4 年多之前 回复
douying0108
douying0108 将有5个单独的结果文件,每个源文件一个。同样,“过程r”足够容易,但是有一些处理需要完成。例如正则表达式,从十六进制转换为十进制,以csv输出
4 年多之前 回复
dqcuq4138
dqcuq4138 不,不难实现,但是下周中旬之前我不会进入开发环境。很想知道其他人对这个想法的看法。
4 年多之前 回复
dsk49208
dsk49208 我的理解是,这将是非常微不足道的。goroutine的开销非常低。
4 年多之前 回复
dtf1111
dtf1111 什么是“过程r”?如果这是相对“容易”的事情,那么在单独的gorutine中可能不值得这样做-开销大于收益。另外,结果是,您将拥有一个“日志数据集”(即将所有5个文件合并为一个)还是会有5个不同的结果集?
4 年多之前 回复
dongsuishou8039
dongsuishou8039 对于I/O绑定任务,您可能无法从CPU并发中获得太多收益。您可能也想研究渠道。
4 年多之前 回复

1个回答

You should definitely use channels to manage your processed rows. Alternatively you could also write another goroutine to handle your output.

var numGoWriters = 10

func processRow(r Row, ch chan<- string) {
    res := process(r)
    ch <- res
}

func writeRow(f File, ch <-chan string) {
    w := bufio.NewWriter(f)
    for s := range ch {
        _, err := w.WriteString(s + "
")
    }

func processFile(f File) {
    outFile, err := os.Create("/path/to/file.out")
    if err != nil {
        // handle it
    }
    defer outFile.Close()
    var wg sync.WaitGroup
    ch := make(chan string, 10)  // play with this number for performance
    defer close(ch) // once we're done processing rows, we close the channel
                    // so our worker threads exit
    fScanner := bufio.NewScanner(f)
    for fScanner.Scan() {
        wg.Add(1)
        go func() {
            processRow(fScanner.Text(), ch)
            wg.Done()
        }()
    }
    for i := 0; i < numGoWriters; i++ {
        go writeRow(outFile, ch)
    }
    wg.Wait()  
}

Here we have processRow doing all the processing (I assumed to string), writeRow doing all the out I/O, and processFile tying each file together. Then all main has to do is hand off the files, spawn the goroutines, et voila.

func main() {
    var wg sync.WaitGroup

    filenames := [...]string{"here", "are", "some", "log", "paths"}
    for fname := range filenames {
        inFile, err := os.Open(fname)
        if err != nil {
            // handle it
        }
        defer inFile.Close()
        wg.Add(1)
        go processFile(inFile)
    }
    wg.Wait()
duanliao3826
duanliao3826 非常感谢您!
4 年多之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问