dqteh7347 2013-06-01 13:03 采纳率: 100%
浏览 32
已采纳

例程从for循环开始-一个或多个通道?

I would like to load some json files (".json") using a goroutine called from a for-loop. I'd like to have the loading parallellized (processing first files while the other files are being loaded).

Q1. Since the numer of files may vary (new ones to be added), I would use a (file) list with filenames (autogenerating the names only in this example), therefore I'd like to use a for-loop. Optimal?

Q2. What would be the most effective use of channel(s).

Q3. How would I define the channel(s) if a unique channel for each load operation (as in the example code below) is needed?

Example code (to be compacted & capable of loading the files using a list of file names):


func load_json(aChan chan byte, s string) {
    // load "filename" + s + ".json"
    // confirm to the channel
    aChan <- 0
}

func do_stuff() {
    // .. with the newly loaded json
}

func Main() {
    chan_A := make(chan byte)
    go load_json(chan_A, "_classA")

    chan_B := make(chan byte)
    go load_json(chan_B, "_classB")

    chan_C := make(chan byte)
    go load_json(chan_C, "_classC")

    chan_D := make(chan byte)
    go load_json(chan_D, "_classD")


    <-chan_A
        // Now, do stuff with Class A
    <-chan_B
        // etc...
    <-chan_C
    <-chan_D
    fmt.Println("Done.")
}

EDIT: I designed a simplified test solution based on the ideas suggested by "Tom" (see below). In my case I splitted the task in three phases, using one channel per phase to control the execution. However, I tend to get deadlocks with this code (See execution results and the note below below the code).

Run this code on the PlayGround.

How can I avoid the deadlocks in this code?:

type TJsonFileInfo struct {
    FileName string
}
type TChannelTracer struct {  // Will count & display visited phases A, B, C
    A, B, C int
}
var ChannelTracer TChannelTracer

var jsonFileList = []string{
    "./files/classA.json",
    "./files/classB.json",
    "./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
    var newFileInfo TJsonFileInfo
    newFileInfo.FileName = aFileName
    // file, e := ioutil.ReadFile(newFileInfo.FileName)...
    ChannelTracer.A += 1
    fmt.Printf("A. Loaded file: %s
", newFileInfo.FileName)
    aResultQueueChan <- &newFileInfo
}

func UnmarshalFile(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.B += 1
    fmt.Printf("B. Marshalled file: %s
", FileInfo.FileName)
    aResultQueueChan <- FileInfo
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.C += 1
    fmt.Printf("C. Processed file: %s 
", FileInfo.FileName)
    aDoneQueueChan <- FileInfo
}

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
        go UnmarshalFile(marshalChan, processChan)
        go ProcessWork(processChan, doneProcessingChan)
    }

    for {
        select {
        case result := <-marshalChan:
            result.FileName = result.FileName // dummy use
        case result := <-processChan:
            result.FileName = result.FileName // dummy use
        case result := <-doneProcessingChan:
            result.FileName = result.FileName // dummy use
            fmt.Printf("Done%s Channels visited: %v
", ".", ChannelTracer)
        }
    }
}

/**
RESULTS (for phases A, B and C):

A. Loaded file: ./files/classA.json
A. Loaded file: ./files/classB.json
A. Loaded file: ./files/classC.json
B. Marshalled file: ./files/classB.json
B. Marshalled file: ./files/classC.json
C. Processed file: ./files/classB.json 
C. Processed file: ./files/classC.json 
Done. Channels visited: {3 2 2}     // ChannelTracer for phase A, B and C
Done. Channels visited: {3 2 2}
fatal error: all goroutines are asleep - deadlock!
*/

Note that this code doesn't access the file system so it should run on the PlayGround.

EDIT2: - Apart from the unsafe "ChannelTracer" I can avoid deadlocks only by consuming doneProcessingChannel the same number of times as the file tasks.
Run the code here: Playground

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    go UnmarshalFiles(marshalChan, processChan)
    go ProcessWork(processChan, doneProcessingChan)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
    }

    //  Read doneProcessingChan equal number of times
    //  as the spawned tasks (files) above :
    for i := 0; i < len(jsonFileList); i++ {
        <-doneProcessingChan
        fmt.Printf("Done%s Channels visited: %v
", ".", ChannelTracer)
    }
}

// RIL

  • 写回答

2条回答 默认 最新

  • doukan4039 2013-06-01 17:31
    关注

    building on the answer by @BraveNewCurrency I have composed a simplistic example program for you:

    package main
    
    import (
        "encoding/json"
        "fmt"
        "os"
    )
    
    type Result struct {
        Some    string
        Another string
        AndAn   int
    }
    
    func generateWork(work chan *os.File) {
        files := []string{
            "/home/foo/a.json",
            "/home/foo/b.json",
            "/home/foo/c.json",
        }
        for _, path := range files {
            file, e := os.Open(path)
            if e != nil {
                panic(e)
            }
            work <- file
        }
    }
    
    func processWork(work chan *os.File, done chan Result) {
        file := <-work
        decoder := json.NewDecoder(file)
        result := Result{}
        decoder.Decode(&result)
        done <- result
    }
    
    func main() {
        work := make(chan *os.File)
        go generateWork(work)
        done := make(chan Result)
        for i := 0; i < 100; i++ {
            go processWork(work, done)
        }
        for {
            select {
            case result := <-done:
                // a result is available
                fmt.Println(result)
            }
        }
    }
    

    Note that this program won't work on the playground because file-system access is disallowed there.

    Edit:

    To answer the edition in your question, I've taken the code and changed some small things:

    package main
    
    import (
        _ "encoding/json"
        "fmt"
        _ "io/ioutil"
        _ "os"
    )
    
    type TJsonMetaInfo struct {
        MetaSystem string
    }
    
    type TJsonFileInfo struct {
        FileName string
    }
    
    type TChannelTracer struct { // Will count & display visited phases A, B, C
        A, B, C int
    }
    
    var ChannelTracer TChannelTracer
    
    var jsonFileList = []string{
        "./files/classA.json",
        "./files/classB.json",
        "./files/classC.json",
    }
    
    func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
        newFileInfo := TJsonFileInfo{aFileName}
        // file, e := ioutil.ReadFile(newFileInfo.FileName)
        // etc...
        ChannelTracer.A += 1
        fmt.Printf("A. Loaded file: %s
    ", newFileInfo.FileName)
        aResultQueueChan <- &newFileInfo
    }
    
    func UnmarshalFiles(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
        for {
            FileInfo := <-aWorkQueueChan
            ChannelTracer.B += 1
            fmt.Printf("B. Unmarshalled file: %s
    ", FileInfo.FileName)
            aResultQueueChan <- FileInfo
        }
    }
    
    func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
        for {
            FileInfo := <-aWorkQueueChan
            ChannelTracer.C += 1
            fmt.Printf("C. Processed file: %s 
    ", FileInfo.FileName)
            aDoneQueueChan <- FileInfo
    
        }
    }
    
    func main() {
        marshalChan := make(chan *TJsonFileInfo)
        processChan := make(chan *TJsonFileInfo)
        doneProcessingChan := make(chan *TJsonFileInfo)
    
        go UnmarshalFiles(marshalChan, processChan)
        go ProcessWork(processChan, doneProcessingChan)
    
        for _, fileName := range jsonFileList {
            go LoadJsonFiles(fileName, marshalChan)
        }
    
        for {
            select {
            case result := <-doneProcessingChan:
                result.FileName = result.FileName // dummy use
                fmt.Printf("Done%s Channels visited: %v
    ", ".", ChannelTracer)
            }
        }
    }
    

    Note that this code still deadlocks but at the end, when all work is complete, in the last empty for loop in main().

    Note also that these lines:

    ChannelTracer.A += 1
    ChannelTracer.B += 1
    ChannelTracer.C += 1
    

    are not concurrency-safe. This means that in a multi-threaded environment one goroutine and the other might try to increment the same counter at the same time, resulting in a wrong count. To come around this issue, take a look at the following packages:

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥60 求一个简单的网页(标签-安全|关键词-上传)
  • ¥35 lstm时间序列共享单车预测,loss值优化,参数优化算法
  • ¥15 基于卷积神经网络的声纹识别
  • ¥15 Python中的request,如何使用ssr节点,通过代理requests网页。本人在泰国,需要用大陆ip才能玩网页游戏,合法合规。
  • ¥100 为什么这个恒流源电路不能恒流?
  • ¥15 有偿求跨组件数据流路径图
  • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
  • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
  • ¥15 一直显示正在等待HID—ISP
  • ¥15 Python turtle 画图