doushan6692 2018-08-18 19:08
浏览 30
已采纳

如何处理可以增长而不会阻塞的队列

I'm trying to understand how to process a queue in Go if the queue can grow from the processing function itself. See below code.

In this pseudo code, I want to limit the number of handlers I am making to 10. Therefore I create 10 handlers that process the queue. I then start the queue off with a url.

My issue is that according to the docs, the sender to a channel will block until a receiver receives the data. In the below code, each process is a receiver that handles a new url. However it's quite easy to see that if a process sends 11 links to the queue, it will block until all receivers are finished handling these new links. If those receivers each have even 1 link, then they will also block while sending that new 1 link to the queue. Since everyone is blocked, nothing finishes.

I'm wondering what is the general solution in go, for processing a queue which can grow from the process itself. Note that I think I can do this with a lock on an array called queue, but I'm trying to understand how it would be done with channels.

var queue = make(chan string)

func process(){
    for currentURL := range queue {
        links, _ := ... // some http call that gets links from a url
        for _, link := links {
            queue <- link
        }
    }
}

func main () {
   for i :=0; i < 10; i++ {
        go process()
   }

   queue <- "https://stackoverflow.com"
   ...
   // block until receive some quit message
   <-quit 
}
  • 写回答

2条回答 默认 最新

  • douju5062 2018-08-18 19:31
    关注

    One simple method you could use is to move the code that adds the links to the channel into it's own go routine. This way, your main processing can continue while the blocked channel write is instead blocking a separate go routine.

    func process(){
        for currentURL := range queue {
            links, _ := ... // some http call that gets links from a url
            for _, link := links {
                l := link // this is important! ...
                // the loop will re-set the value of link before the go routine is started
    
                go func(l) {
                    queue <- link // we'll be blocked here...
                    // but the "parent" routine can still iterate through the channel
                    // which in turn un-blocks the write
                }(l)
            }
        }
    }
    

    Edit with semaphore example to limit go routines:

    func main () {
        maxWorkers := 5000
        sem := semaphore.NewWeighted(int64(maxWorkers))
        ctx := context.TODO()
        for i :=0; i < 10; i++ {
            go process(ctx)
        }
    
        queue <- "https://stackoverflow.com"
        // block until receive some quit message
        <-quit 
    }
    
    func process(ctx context.Context){
        for currentURL := range queue {
            links, _ := ... // some http call that gets links from a url
            for _, link := links {
                l := link // this is important! ...
                // the loop will re-set the value of link before the go routine is started
    
                // acquire a go routine...
                // if we are at the routine limit, this line will block until one becomes available
                sem.Acquire(ctx, 1)
                go func(l) {
                    defer sem.Release(1)
                    queue <- link // we'll be blocked here...
                    // but the "parent" routine can still iterate through the channel
                    // which in turn un-blocks the write
                }(l)
            }
        }
    }
    

    This option could end up causing deadlocks though... Assuming all go routines have been claimed, the parent loops could get locked on sem.Acquire. That would then cause the child routines to never add to the channel, and therefor never execute the deferred sem.Release. Off the top of my head I'm struggling to come up with a nice way to deal with this. Perhaps an external in-memory queue rather than channels?

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 Python报错怎么解决
  • ¥15 simulink如何调用DLL文件
  • ¥15 关于用pyqt6的项目开发该怎么把前段后端和业务层分离
  • ¥30 线性代数的问题,我真的忘了线代的知识了
  • ¥15 有谁能够把华为matebook e 高通骁龙850刷成安卓系统,或者安装安卓系统
  • ¥188 需要修改一个工具,懂得汇编的人来。
  • ¥15 livecharts wpf piechart 属性
  • ¥20 数学建模,尽量用matlab回答,论文格式
  • ¥15 昨天挂载了一下u盘,然后拔了
  • ¥30 win from 窗口最大最小化,控件放大缩小,闪烁问题