dtudj42064 2017-12-07 10:50
浏览 99

Golang的递归并发

I'd like to distribute some load across some goroutines. If the number of tasks is known beforehand then it is easy to organize. For example, I could do fan out with a wait group.

nTasks := 100
nGoroutines := 10

// it is important that this channel is not buffered
ch := make(chan *Task)
done := make(chan bool)
var w sync.WaitGroup
// Feed the channel until done
go func () {
    for i:= 0; i < nTasks; i++ {
        task := getTaskI(i)
        ch <- task
    }
    // as ch is not buffered once everything is read we know we have delivered all of them
    for i:=0; i < nGoroutines; i++ {
        done <- false
    }
}()
for i:= 0; i < nGoroutines; i ++ {
    w.Add(1)
    go func () {
        defer w.Done()
        select {
        case task := <-ch:
            doSomethingWithTask(task)
        case <- done:
            return
        }
    }()
}
w.Wait()
// All tasks done, all goroutines closed

However, in my case each task returns more tasks to be done. Say for example a crawler where we receive all the links from the crawled web. My initial hunch was to have a main loop where I track the number of tasks done and tasks pending. When I'm done I send a finish signal to all goroutines:

nGoroutines := 10
ch := make(chan *Task, nGoroutines)
feedBackChannel := make(chan * Task, nGoroutines)
done := make(chan bool)

for i:= 0; i < nGoroutines; i ++ {
    go func () {
        select {
        case task := <-ch:
            task.NextTasks = doSomethingWithTask(task)
            feedBackChannel <- task
        case <- done:
            return
        }
    }()
}

// seed first task
ch <- firstTask
nTasksRemaining := 1

for nTasksRemaining > 0 {
    task := <- feedBackChannel
    nTasksRemaining -= 1
    for _, t := range(task.NextTasks) {
        ch <- t
        nTasksRemaining++
    }
}
for i:=0; i < nGoroutines; i++ {
    done <- false
}

However, this produces a deadlock. For example if NextTasks is bigger than the number of goroutines then the main loop will stall when the first tasks finish. But the first tasks can't finish because the feedBack is blocked since the mainLoop is waiting to write.

One "easy" way out of this is to post to the channel asynchronously: Instead of doing feedBackChannel <- task do go func () {feedBackChannel <- task}(). Now, this feels like an awful hack. Specially since there might be hundred of thousands of tasks.

What would be a nice way to avoid this deadlock? I've searched for concurrency patterns, but mostly are simpler things like fanning out or pipelines where the later stage does not affect the earlier steps.

  • 写回答

1条回答 默认 最新

  • dop2144 2017-12-10 13:50
    关注

    If I understand your problem correctly, your solution is pretty complex. Here are some points. Hope it helps.

    • As people mentioned in comments, launching a goroutine is cheap (both memory and switch between them is much cheaper that OS level theread) and you could have hundred thousand of them. Let's assume for some reasons you want to have worker goroutines.
    • Instead of done channel you could just close ch channel and instead of select you just range over your channel getting tasks.
    • I don't see the point of separating ch and feedBackChannel just push every task you have into ch and increase its capacity.
    • As mentioned you may get a deadlock when you trying to enqueue new task. My solution is pretty naive. Just increase its capacity until you are sure that it won't overflow (you could also log warnings if cap(ch) - len(ch) < threshold). If you create a channel (of pointers) with 1 million capacity it will take about 8 * 1e6 ~= 8MB of ram.
    评论

报告相同问题?

悬赏问题

  • ¥100 角动量包络面如何用MATLAB绘制
  • ¥15 merge函数占用内存过大
  • ¥15 Revit2020下载问题
  • ¥15 使用EMD去噪处理RML2016数据集时候的原理
  • ¥15 神经网络预测均方误差很小 但是图像上看着差别太大
  • ¥15 单片机无法进入HAL_TIM_PWM_PulseFinishedCallback回调函数
  • ¥15 Oracle中如何从clob类型截取特定字符串后面的字符
  • ¥15 想通过pywinauto自动电机应用程序按钮,但是找不到应用程序按钮信息
  • ¥15 如何在炒股软件中,爬到我想看的日k线
  • ¥15 seatunnel 怎么配置Elasticsearch