duanfen2008 2017-09-09 12:55
浏览 26
已采纳

关闭Go频道并同步go例程

Im unable to terminate my WaitGroup in go and consequently can't exit the range loop. Can anybody tell me why. Or a better way of limiting the number of go routines while still being able to exit on chan close!

Most examples i have seen relate to a statically typed chan length, but this channel is dynamically resized as a result of other processes.

The print statement ("DONE!") in the example are printed showing that the testValProducer prints the right amount of times but the code never reaches ("--EXIT--") which means wg.Wait is still blocking somehow.

type TestValContainer chan string

func StartFunc(){
testValContainer            := make(TestValContainer)
go func(){testValContainer <- "string val 1"}()
go func(){testValContainer <- "string val 2"}()
go func(){testValContainer <- "string val 3"}()
go func(){testValContainer <- "string val 4"}()
go func(){testValContainer <- "string val 5"}()
go func(){testValContainer <- "string val 6"}()
go func(){testValContainer <- "string val 7"}()
wg  := sync.WaitGroup{}

// limit the number of worker goroutines
for i:=0; i < 3; i++ {
    wg.Add(1)
    go func(){
        v := i
        fmt.Printf("launching %v", i)
        for str := range testValContainer{
            testValProducer(str, &wg)
        }
        fmt.Println(v, "--EXIT --")  // never called
    }()
}

wg.Wait()
close(testValContainer)

}


func get(url string){
    http.Get(url)
    ch <- getUnvisited()
}


func testValProducer(testStr string, wg *sync.WaitGroup){
    doSomething(testStr)
    fmt.Println("done !") // called
    wg.Done() // NO EFFECT??
}
  • 写回答

2条回答 默认 最新

  • dqzow3859 2017-09-10 18:08
    关注

    I might do something like this, it keeps everything easy to follow. I define a structure which implements a semaphore to control the number of active Go routines spinning up... and allows me to read from the channel as they come in.

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    type TestValContainer struct {
        wg   sync.WaitGroup
        sema chan struct{}
        data chan int
    }
    
    func doSomething(number int) {
        fmt.Println(number)
    }
    
    func main() {
        //semaphore limit 10 routines at time
        tvc := TestValContainer{
            sema: make(chan struct{}, 10),
            data: make(chan int),
        }
    
        for i := 0; i <= 100; i++ {
            tvc.wg.Add(1)
            go func(i int) {
                tvc.sema <- struct{}{}
                defer func() {
                    <-tvc.sema
                    tvc.wg.Done()
                }()
    
                tvc.data <- i
            }(i)
        }
        // wait in the background so that waiting and closing the channel dont
        // block the for loop below
        go func() {
            tvc.wg.Wait()
            close(tvc.data)
        }()
        // get channel results
        for res := range tvc.data {
            doSomething(res)
        }
    
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 微信公众号自制会员卡没有收款渠道啊
  • ¥15 stable diffusion
  • ¥100 Jenkins自动化部署—悬赏100元
  • ¥15 关于#python#的问题:求帮写python代码
  • ¥20 MATLAB画图图形出现上下震荡的线条
  • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘
  • ¥15 perl MISA分析p3_in脚本出错
  • ¥15 k8s部署jupyterlab,jupyterlab保存不了文件
  • ¥15 ubuntu虚拟机打包apk错误
  • ¥199 rust编程架构设计的方案 有偿