I have a function which receives tasks and puts them into a channel. Every task has ID, some properties and a channel where result will be placed. It looks like this
task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)
Another goroutine takes a task from the channel, processes it and puts the result into task's channel
task := <-queue
task.Result <- doExpensiveComputation(task)
This code works fine. But now I want to coalesce tasks in the queue
. Task processing is a very expensive operation, so I want process all the tasks in the queue with the same IDs once. I see two ways of doing it.
First one is not to put tasks with the same IDs to the queue, so when existing task arrives it will wait for it's copy to complete. Here is pseudo-code
if newTask in queue {
existing := queue.getById(newTask.ID)
existing.waitForComplete()
sendResponse(existing.ProcessingResult)
} else {
queue.enqueue(newTask)
}
So, I can implement it using go channel and map for random access + some synchronization means like mutex. What I don't like about this way is that I have to carry both map and channel around the code and keep their contents synchronized.
The second way is to put all the tasks into queue, but to extract task and all the tasks with the same IDs from the queue when result arrives, then send result to all the tasks. Here is pseudo-code
someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
theSameTask.Result <- result
}
And I have an idea how to implement this using chan + map + mutex in the same way as above.
And here is the question: is there some builtin/existing data structures which I can use for such a problem? Are there another (better) ways of doing this?