duanmingting9544 2017-10-02 15:56
浏览 76
已采纳

我想将文件分割成大小相等的“块”或切片,并使用goroutines同时处理它们

Using Go, I have large log files. Currently I open them, create a new scanner bufio.NewScanner, and then for scanner.Scan() to loop through the lines. Each line is sent through a processing function, which matches it to regular expressions and extracts data. I would like to process this file in chunks simultaneously using goroutines. I believe this may be quicker than looping through the whole file sequentially.

It can take a few seconds per file, and I'm wondering if I can process a single file in, say, 10 pieces at a time. I believe I can sacrifice the memory if needed. I have ~3gb, and the biggest log file is maybe 75mb.

I see that a scanner has a .Split() method, where you can provide a custom split function, but I wasn't able to find a good solution using this method.

I've also tried creating a slice of slices, looping through the scanner with scanner.Scan() and appending scanner.Text() to each slice. eg:

// pseudocode because I couldn't get this to work either

scanner := bufio.NewScanner(logInfo)
threads := [[], [], [], [], []]

i := 0
for scanner.Scan() {
    i = i + 1
    if i > 5 {
        i = 0
    }
    threads[i] = append(threads[i], scanner.Text())
}
fmt.Println(threads)

I'm new to Go and concerned about efficiency and performance. I want to learn how to write good Go code! Any help or advice is really appreciated.

  • 写回答

2条回答 默认 最新

  • douyanqu9722 2017-10-02 16:39
    关注

    Peter gives a good starting point, if you wanted to do something like a fan-out, fan-in pattern you could do something like:

    package main
    
    import (
        "bufio"
        "fmt"
        "log"
        "os"
        "sync"
    )
    
    func main() {
        file, err := os.Open("/path/to/file.txt")
        if err != nil {
            log.Fatal(err)
        }
        defer file.Close()
    
        lines := make(chan string)
        // start four workers to do the heavy lifting
        wc1 := startWorker(lines)
        wc2 := startWorker(lines)
        wc3 := startWorker(lines)
        wc4 := startWorker(lines)
        scanner := bufio.NewScanner(file)
        go func() {
            defer close(lines)
            for scanner.Scan() {
                lines <- scanner.Text()
            }
    
            if err := scanner.Err(); err != nil {
                log.Fatal(err)
            }
        }()
    
        merged := merge(wc1, wc2, wc3, wc4)
        for line := range merged {
            fmt.Println(line)
        }
    }
    
    func startWorker(lines <-chan string) <-chan string {
        finished := make(chan string)
        go func() {
            defer close(finished)
            for line := range lines {
                // Do your heavy work here
                finished <- line
            }
        }()
        return finished
    }
    
    func merge(cs ...<-chan string) <-chan string {
        var wg sync.WaitGroup
        out := make(chan string)
    
        // Start an output goroutine for each input channel in cs.  output
        // copies values from c to out until c is closed, then calls wg.Done.
        output := func(c <-chan string) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }
        wg.Add(len(cs))
        for _, c := range cs {
            go output(c)
        }
    
        // Start a goroutine to close out once all the output goroutines are
        // done.  This must start after the wg.Add call.
        go func() {
            wg.Wait()
            close(out)
        }()
        return out
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?
  • ¥15 求daily translation(DT)偏差订正方法的代码
  • ¥15 js调用html页面需要隐藏某个按钮
  • ¥15 ads仿真结果在圆图上是怎么读数的
  • ¥20 Cotex M3的调试和程序执行方式是什么样的?
  • ¥20 java项目连接sqlserver时报ssl相关错误