dtpd58676 2013-12-18 07:39
浏览 21
已采纳

使用golang频道的结果不一致

I a task written in Go to get a unique list from a bunch of text files. I put in some parallelization using channels and am having inconsistent results now - a variance of 5 records output/not output each time with the same input files.

The am testing it with go run process.go | wc -l on Fedora x86_64, go1.1.2, 8 core amd.

The code is:

package main

import (
    "fmt"
    "os"
    "io"    
    "encoding/csv"
    "regexp"
    "log"
)

var (
    cleanRe *regexp.Regexp = regexp.MustCompile("[^0-9]+")
    comma rune ='\t'
    fieldsPerRecord=-1
)

func clean(s string) string {
    clean:=cleanRe.ReplaceAllLiteralString(s,"")
    if len(clean)<6 {return ""}
    return clean
}

func uniqueChannel(inputChan chan []string, controlChan chan string) {
    defer func(){controlChan<-"Input digester."}()
    uniq:=make(map[string]map[string]bool)
    i:=0
    for record:= range inputChan {
        i++
        id,v:=record[0],record[1]
        if uniq[id]==nil {
            uniq[id]=make(map[string]bool)
        } else if !uniq[id][v] {
            uniq[id][v]=true
            fmt.Println(id,string(comma),v)
        }
    }
    log.Println("digest ", i)
}

func processFile(fileName string, outputChan chan []string, controlChan chan string) {
    defer func(){controlChan<-fileName}()
    f,err:=os.Open(fileName)
    if err!=nil{log.Fatal(err)}
    r:=csv.NewReader(f)
    r.FieldsPerRecord = fieldsPerRecord
    r.Comma = comma

    //  Process the records
    i:=0
    for record,err:=r.Read();err!=io.EOF;record,err=r.Read() {
        if err!=nil{continue}
        id:=record[0]
        for _,v:=range record[1:] {
            if cleanV:=clean(v);cleanV!=""{
                i++
                outputChan<-[]string{id,cleanV}
            }
        }
    }
    log.Println(fileName,i)
}


func main() {
    inputs:=[]string{}
    recordChan:=make(chan []string,100)
    processesLeft:=len(inputs)+1
    controlChan:=make(chan string,processesLeft)

    //  Ingest the inputs
    for _,fName:=range inputs {
        go processFile(fName,recordChan,controlChan)
    }

    //  This is the loop to ensure it's all unique
    go uniqueChannel(recordChan,controlChan)

    //  Make sure all the channels close up
    for processesLeft>0 {
        if processesLeft==1{
            close(recordChan)
        }
        c:=<-controlChan
        log.Println(c)
        processesLeft--
    }
    close(controlChan)
}

It seems like it closes the channel before it's empty and quite. Without the closing mechanism I was getting deadlocks - I'm out of ideas.

  • 写回答

1条回答 默认 最新

  • dongyongkui6329 2013-12-21 15:48
    关注

    You could ditch the control channel and use a sync.WaitGroup:

    package main
    
    import (
        "encoding/csv"
        "fmt"
        "io"
        "log"
        "os"
        "regexp"
        "sync"
    )
    
    var (
        cleanRe         *regexp.Regexp = regexp.MustCompile("[^0-9]+")
        comma           rune           = '\t'
        fieldsPerRecord                = -1
    )
    
    func clean(s string) string {
        clean := cleanRe.ReplaceAllLiteralString(s, "")
        if len(clean) < 6 {
            return ""
        }
        return clean
    }
    
    func uniqueChannel(inputChan chan []string) {
        uniq := make(map[string]map[string]bool)
        i := 0
        for record := range inputChan {
            i++
            id, v := record[0], record[1]
            if uniq[id] == nil {
                uniq[id] = make(map[string]bool)
            } else if !uniq[id][v] {
                uniq[id][v] = true
                fmt.Println(id, string(comma), v)
            }
        }
        log.Println("digest ", i)
    }
    
    func processFile(fileName string, outputChan chan []string) {
        f, err := os.Open(fileName)
        if err != nil {
            log.Fatal(err)
        }
        r := csv.NewReader(f)
        r.FieldsPerRecord = fieldsPerRecord
        r.Comma = comma
    
        //  Process the records
        for record, err := r.Read(); err != io.EOF; record, err = r.Read() {
            if err != nil {
                continue
            }
            id := record[0]
            for _, v := range record[1:] {
                if cleanV := clean(v); cleanV != "" {
                    outputChan <- []string{id, cleanV}
                }
            }
        }
    }
    
    func main() {
        inputs := []string{"ex.tsv"}
        recordChan := make(chan []string)
    
        var wg sync.WaitGroup
        //  Ingest the inputs
        for _, fName := range inputs {
            wg.Add(1)
            go func() {
                processFile(fName, recordChan)
                wg.Done()
            }()
        }
        go func() {
            wg.Wait()
            close(recordChan)
        }()
    
        //  This is the loop to ensure it's all unique
        uniqueChannel(recordChan)
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 有没有帮写代码做实验仿真的
  • ¥15 報錯:Person is not mapped,如何解決?
  • ¥30 vmware exsi重置后登不上
  • ¥15 易盾点选的cb参数怎么解啊
  • ¥15 MATLAB运行显示错误,如何解决?
  • ¥15 c++头文件不能识别CDialog
  • ¥15 Excel发现不可读取的内容
  • ¥15 关于#stm32#的问题:CANOpen的PDO同步传输问题
  • ¥20 yolov5自定义Prune报错,如何解决?