drgzmmy6379 2015-07-23 17:22
浏览 95

每个唯一ID最多运行一个并发线程的算法

I have a Go web application which needs to execute a given section of code in only one goroutine per unique ID. The scenario is that I have requests that are coming with various IDs that represent a sort of transaction. A certain subset of the operations on these needs to be guaranteed to be run only "one at at time" for a given ID (and other competing requests should block until the prior one working on/for that ID is done).

I can think of a few ways to do this but the book keeping seems tricky - need to keep a global mutex to lock access to a map of what concurrent requests are happening and then use a mutex or a counter from there, and then make sure it doesn't deadlock, and then garbage collect (or carefully reference count) old request entries. I can do this, but sounds error prone.

Is there a pattern or something in the standard library that can be easily used to good effect in this case? Didn't see anything obvious.

EDIT: One thing I think was confusing in my explanation above is the use of the word "transaction". In my case each of these does not need an explicit close - it's just an identifier to associate multiple operations with. Since I don't have an explicit "close" or "end" concept to these, I might receive 3 requests within the same second and each operation takes 2 seconds - and I need to serialize those because running them concurrently will wreak havoc; but then I might get a request a week later with that same ID and it would be referring to the same set of operations (the ID is just the PK on a table in a database).

  • 写回答

3条回答 默认 最新

  • dousa2794 2015-07-23 18:28
    关注

    You've got a good start with the locked global map. You can have a worker per "transaction" and handlers send requests to them over channels, using a locked map to keep track of the channels. Workers can close transactions when they receive a special request. You don't want dangling transactions to become a problem, so you should probably arrange to get an artificial close request sent after a timeout.

    That isn't the only way, though it might be convenient. If you only need make certain requests wait while their transaction is being worked on elsewhere, there is probably a construction with a map of *sync.Mutexes, rather than channels talking to worker goroutines, that has better resource use. (There's now code for that approach, more or less, in bgp's answer.)

    An example of the channel approach is below; besides serializing work within each transaction, it demonstrates how you might do graceful shutdown with close and a sync.WaitGroup for a setup like this, and timeouts. It's on the Playground.

    package main
    
    import (
        "fmt"
        "log"
        "sync"
        "time"
    )
    
    // Req represents a request. In real use, if there are many kinds of requests, it might be or contain an interface value that can point to one of several different concrete structs.
    type Req struct {
        id      int
        payload string // just for demo
        // ...
    }
    
    // Worker represents worker state.
    type Worker struct {
        id   int
        reqs chan *Req
        // ...
    }
    
    var tasks = map[int]chan *Req{}
    var tasksLock sync.Mutex
    
    const TimeoutDuration = 100 * time.Millisecond // to demonstrate; in reality higher
    
    // for graceful shutdown, you probably want to be able to wait on all workers to exit
    var tasksWg sync.WaitGroup
    
    func (w *Worker) Work() {
        defer func() {
            tasksLock.Lock()
            delete(tasks, w.id)
            if r := recover(); r != nil {
                log.Println("worker panic (continuing):", r)
            }
            tasksLock.Unlock()
            tasksWg.Done()
        }()
        for req := range w.reqs {
            // ...do work...
            fmt.Println("worker", w.id, "handling request", req)
            if req.payload == "close" {
                fmt.Println("worker", w.id, "quitting because of a close req")
                return
            }
        }
        fmt.Println("worker", w.id, "quitting since its channel was closed")
    }
    
    // Handle dispatches the Request to a Worker, creating one if needed.
    func (r *Req) Handle() {
        tasksLock.Lock()
        defer tasksLock.Unlock()
        id := r.id
        reqs := tasks[id]
        if reqs == nil {
            // making a buffered channel here would let you queue up
            // n tasks for a given ID before the the Handle() call
            // blocks
            reqs = make(chan *Req)
            tasks[id] = reqs
            w := &Worker{
                id:   id,
                reqs: reqs,
            }
            tasksWg.Add(1)
            go w.Work()
            time.AfterFunc(TimeoutDuration, func() {
                tasksLock.Lock()
                if reqs := tasks[id]; reqs != nil {
                    close(reqs)
                    delete(tasks, id)
                }
                tasksLock.Unlock()
            })
        }
        // you could close(reqs) if you get a request that means
        // 'end the transaction' with no further info. I'm only
        // using close for graceful shutdown, though.
        reqs <- r
    }
    
    // Shutdown asks the workers to shut down and waits.
    func Shutdown() {
        tasksLock.Lock()
        for id, w := range tasks {
            close(w)
            // delete so timers, etc. won't see a ghost of a task
            delete(tasks, id)
        }
        // must unlock b/c workers can't finish shutdown
        // until they can remove themselves from maps
        tasksLock.Unlock()
        tasksWg.Wait()
    }
    
    func main() {
        fmt.Println("Hello, playground")
        reqs := []*Req{
            {id: 1, payload: "foo"},
            {id: 2, payload: "bar"},
            {id: 1, payload: "baz"},
            {id: 1, payload: "close"},
            // worker 2 will get closed because of timeout
        }
        for _, r := range reqs {
            r.Handle()
        }
        time.Sleep(75*time.Millisecond)
        r := &Req{id: 3, payload: "quux"}
        r.Handle()
        fmt.Println("worker 2 should get closed by timeout")
        time.Sleep(75*time.Millisecond)
        fmt.Println("worker 3 should get closed by shutdown")
        Shutdown()
    }
    
    评论

报告相同问题?