You've got a good start with the locked global map. You can have a worker per "transaction" and handlers send requests to them over channels, using a locked map to keep track of the channels. Workers can close transactions when they receive a special request. You don't want dangling transactions to become a problem, so you should probably arrange to get an artificial close request sent after a timeout.
That isn't the only way, though it might be convenient. If you only need make certain requests wait while their transaction is being worked on elsewhere, there is probably a construction with a map of *sync.Mutexes, rather than channels talking to worker goroutines, that has better resource use. (There's now code for that approach, more or less, in bgp's answer.)
An example of the channel approach is below; besides serializing work within each transaction, it demonstrates how you might do graceful shutdown with close and a sync.WaitGroup for a setup like this, and timeouts. It's on the Playground.
package main
import (
"fmt"
"log"
"sync"
"time"
)
// Req represents a request. In real use, if there are many kinds of requests, it might be or contain an interface value that can point to one of several different concrete structs.
type Req struct {
id int
payload string // just for demo
// ...
}
// Worker represents worker state.
type Worker struct {
id int
reqs chan *Req
// ...
}
var tasks = map[int]chan *Req{}
var tasksLock sync.Mutex
const TimeoutDuration = 100 * time.Millisecond // to demonstrate; in reality higher
// for graceful shutdown, you probably want to be able to wait on all workers to exit
var tasksWg sync.WaitGroup
func (w *Worker) Work() {
defer func() {
tasksLock.Lock()
delete(tasks, w.id)
if r := recover(); r != nil {
log.Println("worker panic (continuing):", r)
}
tasksLock.Unlock()
tasksWg.Done()
}()
for req := range w.reqs {
// ...do work...
fmt.Println("worker", w.id, "handling request", req)
if req.payload == "close" {
fmt.Println("worker", w.id, "quitting because of a close req")
return
}
}
fmt.Println("worker", w.id, "quitting since its channel was closed")
}
// Handle dispatches the Request to a Worker, creating one if needed.
func (r *Req) Handle() {
tasksLock.Lock()
defer tasksLock.Unlock()
id := r.id
reqs := tasks[id]
if reqs == nil {
// making a buffered channel here would let you queue up
// n tasks for a given ID before the the Handle() call
// blocks
reqs = make(chan *Req)
tasks[id] = reqs
w := &Worker{
id: id,
reqs: reqs,
}
tasksWg.Add(1)
go w.Work()
time.AfterFunc(TimeoutDuration, func() {
tasksLock.Lock()
if reqs := tasks[id]; reqs != nil {
close(reqs)
delete(tasks, id)
}
tasksLock.Unlock()
})
}
// you could close(reqs) if you get a request that means
// 'end the transaction' with no further info. I'm only
// using close for graceful shutdown, though.
reqs <- r
}
// Shutdown asks the workers to shut down and waits.
func Shutdown() {
tasksLock.Lock()
for id, w := range tasks {
close(w)
// delete so timers, etc. won't see a ghost of a task
delete(tasks, id)
}
// must unlock b/c workers can't finish shutdown
// until they can remove themselves from maps
tasksLock.Unlock()
tasksWg.Wait()
}
func main() {
fmt.Println("Hello, playground")
reqs := []*Req{
{id: 1, payload: "foo"},
{id: 2, payload: "bar"},
{id: 1, payload: "baz"},
{id: 1, payload: "close"},
// worker 2 will get closed because of timeout
}
for _, r := range reqs {
r.Handle()
}
time.Sleep(75*time.Millisecond)
r := &Req{id: 3, payload: "quux"}
r.Handle()
fmt.Println("worker 2 should get closed by timeout")
time.Sleep(75*time.Millisecond)
fmt.Println("worker 3 should get closed by shutdown")
Shutdown()
}