dongteng0748 2015-05-19 14:56
浏览 32
已采纳

渠道中的合并项目

I have a function which receives tasks and puts them into a channel. Every task has ID, some properties and a channel where result will be placed. It looks like this

task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)

Another goroutine takes a task from the channel, processes it and puts the result into task's channel

task := <-queue
task.Result <- doExpensiveComputation(task)

This code works fine. But now I want to coalesce tasks in the queue. Task processing is a very expensive operation, so I want process all the tasks in the queue with the same IDs once. I see two ways of doing it.

First one is not to put tasks with the same IDs to the queue, so when existing task arrives it will wait for it's copy to complete. Here is pseudo-code

if newTask in queue {
  existing := queue.getById(newTask.ID)
  existing.waitForComplete()
  sendResponse(existing.ProcessingResult)
} else {
  queue.enqueue(newTask)
}

So, I can implement it using go channel and map for random access + some synchronization means like mutex. What I don't like about this way is that I have to carry both map and channel around the code and keep their contents synchronized.

The second way is to put all the tasks into queue, but to extract task and all the tasks with the same IDs from the queue when result arrives, then send result to all the tasks. Here is pseudo-code

someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
  theSameTask.Result <- result
}

And I have an idea how to implement this using chan + map + mutex in the same way as above.

And here is the question: is there some builtin/existing data structures which I can use for such a problem? Are there another (better) ways of doing this?

  • 写回答

1条回答 默认 最新

  • douliao8318 2015-05-19 15:31
    关注

    If I understand the problem correctly, the simplest solution that comes into my mind is adding a middle layer between task senders (putting into queue) and workers (taking from queue). This, probably routine, would be responsible for storing current tasks (by ID) and broadcasting the results to every matching tasks.

    Pseugo code:

    go func() {
        active := make(map[TaskID][]Task)
    
        for {
            select {
            case task := <-queue:
                tasks := active[task.ID]
                // No tasks with such ID, start heavy work
                if len(tasks) == 0 {
                    worker <- task
                }
                // Save task for the result
                active[task.ID] = append(active[task.ID], task)
            case r := <-response:
                // Broadcast to all tasks
                for _, task := range active[r.ID] {
                    task.Result <- r.Result
                }
            }
        }
    }()
    

    No mutexes needed and probably no need to carry anything around either, workers will simply need to put all the results into this middle layer, which is then routing responses correctly. You could even easily add caching here if there's a chance clashing IDs can arrive some time apart.

    Edit: I had this dream where the above code caused a deadlock. If you send a lot of requests at once and choke worker channel there's a serious problem – this middle layer routine is stuck on worker <- task waiting for a worker to finish, but all the workers will be probably blocked on send to response channel (because our routine can't collect it). Playable proof.

    One could think of adding some buffers into the channels but this is not a proper solution (unless you can design the system in such way the buffer will never fill up). There're a few ways of solving this problem; for example, you can run a separate routine for collecting responses, but then you would need to protect active map with a mutex. Doable. You could also put worker <- task into a select, which would try to send task to a worker, receive new task (if nothing to send) or collect a response. One could take advantage of the fact that nil channel is never ready for communication (ignored by select), so you can alternate between receiving and sending tasks within a single select. Example:

    go func() {
        var next Task // received task which needs to be passed to a worker
        in := queue // incoming channel (new tasks) -- active
        var out chan Task // outgoing channel (to workers) -- inactive
        for {
            select {
            case t := <-in:
                next = t // store task, so we can pass to worker
                in, out = nil, worker // deactivate incoming channel, activate outgoing
            case out <- next:
                in, out = queue, nil // deactivate outgoing channel, activate incoming
            case r := <-response:
                collect <- r
            }
        }
    }()
    

    play

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 matlab中使用gurobi时报错
  • ¥15 WPF 大屏看板表格背景图片设置
  • ¥15 这个主板怎么能扩出一两个sata口
  • ¥15 不是,这到底错哪儿了😭
  • ¥15 2020长安杯与连接网探
  • ¥15 关于#matlab#的问题:在模糊控制器中选出线路信息,在simulink中根据线路信息生成速度时间目标曲线(初速度为20m/s,15秒后减为0的速度时间图像)我想问线路信息是什么
  • ¥15 banner广告展示设置多少时间不怎么会消耗用户价值
  • ¥16 mybatis的代理对象无法通过@Autowired装填
  • ¥15 可见光定位matlab仿真
  • ¥15 arduino 四自由度机械臂