douzhigan1687 2019-03-24 13:27 采纳率: 0%
浏览 27
已采纳

从非缓冲通道读取

I am trying to understand non buffered channels, so I have written a small application that iterates through an array of user input, does some work, places info on a non buffered channel and then reads it. However, I'm not able to read from the channels. This is my code

toProcess := os.Args[1:]

var wg sync.WaitGroup
results := make(chan string)
errs := make(chan error)

for _, t := range toProcess {
    wg.Add(1)
    go Worker(t, "text", results, errs, &wg)
}


go func() {
    for err := range errs {
        if err != nil {
            fmt.Println(err)
        }
    }
}()


go func() {
    for res := range results {
        fmt.Println(res)
    }
}()

What am I not understanding about non buffered channels? I thought I should be placing information on it, and have another go routine reading from it.

EDIT: using two goroutines solves the issues, but it still gives me the following when there are errors:

open /Users/roosingh/go/src/github.com/nonbuff/files/22.txt: no such file or directory
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc42001416c)
    /usr/local/Cellar/go/1.10.2/libexec/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0xc420014160)
    /usr/local/Cellar/go/1.10.2/libexec/src/sync/waitgroup.go:129 +0x72
main.main()
    /Users/roosingh/go/src/github.com/nonbuff/main.go:39 +0x207

goroutine 6 [chan receive]:
main.main.func1(0xc4200780c0)
    /Users/roosingh/go/src/github.com/nonbuff/main.go:25 +0x41
created by main.main
    /Users/roosingh/go/src/github.com/nonbuff/main.go:24 +0x1d4

goroutine 7 [chan receive]:
main.main.func2(0xc420078060)
    /Users/roosingh/go/src/github.com/nonbuff/main.go:34 +0xb2
created by main.main
    /Users/roosingh/go/src/github.com/nonbuff/main.go:33 +0x1f6

So it is able to print out the error message. My worker code is as follows;

func Worker(fn string, text string, results chan string, errs chan error, wg *sync.WaitGroup) {
    file, err := os.Open(fn)
    if err != nil {
        errs <- err
        return
    }
    defer func() {
        file.Close()
        wg.Done()
    }()

    reader := bufio.NewReader(file)


    for {
        var buffer bytes.Buffer

        var l []byte
        var isPrefix bool
        for {
            l, isPrefix, err = reader.ReadLine()
            buffer.Write(l)

            if !isPrefix {
                break
            }

            if err != nil {
                errs <- err
                return

            }
        }

        if err == io.EOF {
            return
        }

        line := buffer.String()

        results <- fmt. Sprintf("%s, %s", line, text)

    }

    if err != io.EOF {
        errs <- err
        return
    }

    return
}
  • 写回答

1条回答 默认 最新

  • dqroktbn005028 2019-03-24 14:15
    关注

    As for unbuffered channels, you seem to understand the concept, meaning it's used to pass messages between goroutines but cannot hold any. Therefore, a write on an unbuffered channel will block until another goroutine is reading from the channel and a read from a channel will block until another goroutine writes to this channel.

    In your case, you seem to want to read from 2 channels simultaneously in the same goroutine. Because the way channels work, you cannot range on a non closed channel and further down in the same goroutine range on another channel. Unless the first channel gets closed, you won't reach the second range.

    But, it doesn't mean it's impossible! This is where the select statement comes in.

    The select statement allows you to selectively read from multiple channels, meaning that it will read the first one that has something available to be read.

    With that in mind, you can use the for combined with the select and rewrite your routine this way:

    go func() {
        for {
            select {
                case err := <- errs: // you got an error
                    fmt.Println(err) 
                case res := <- results: // you got a result
                    fmt.Println(res)
            }
        }
    }()
    

    Also, you don't need a waitgroup here, because you know how many workers you are starting, you could just count how many errors and results you get and stop when you reach the number of workers.

    Example:

    go func() {
        var i int
        for {
            select {
                case err := <- errs: // you got an error
                    fmt.Println(err)
                    i++
                case res := <- results: // you got a result
                    fmt.Println(res)
                    i++
            }
            // all our workers are done
            if i == len(toProcess) {
                return 
            }
        }
    }()
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 phython路径名过长报错 不知道什么问题
  • ¥15 深度学习中模型转换该怎么实现
  • ¥15 HLs设计手写数字识别程序编译通不过
  • ¥15 Stata外部命令安装问题求帮助!
  • ¥15 从键盘随机输入A-H中的一串字符串,用七段数码管方法进行绘制。提交代码及运行截图。
  • ¥15 TYPCE母转母,插入认方向
  • ¥15 如何用python向钉钉机器人发送可以放大的图片?
  • ¥15 matlab(相关搜索:紧聚焦)
  • ¥15 基于51单片机的厨房煤气泄露检测报警系统设计
  • ¥15 Arduino无法同时连接多个hx711模块,如何解决?