douchixu3686 2015-04-04 11:55
浏览 98
已采纳

与多个生产者/多个消费者并发

I'm probably missing something, or not understanding something in how Go handles concurrency (or in my knowledge of concurrency itself), i've devised a little bit of code to understand a multiple producer/consumer.

This is the code:

package main

import (
    "fmt"
    "time"
    // "math/rand"
    "sync"
)

var seq uint64 = 0
var generatorChan chan uint64
var requestChan chan uint64

func makeTimestamp() int64 {
    return time.Now().UnixNano() / int64(time.Millisecond)
}

func generateStuff(genId int) {
    var crap uint64
    for {
        crap = <-requestChan
        // <- requestChan
        seq = seq+1
        fmt.Println("Gen ", genId, " - From : ", crap, " @", makeTimestamp())
        generatorChan <- uint64(seq)
    }
}

func concurrentPrint(id int, work *sync.WaitGroup) {
    defer work.Done()

    for i := 0; i < 5; i++ {
        requestChan<-uint64(id)
        fmt.Println("Conc", id, ": ", <-generatorChan)
    }
}

func main() {
    generatorChan = make(chan uint64)
    requestChan = make(chan uint64)
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        go generateStuff(i)
    }
    maximumWorker := 200
    wg.Add(maximumWorker)
    for i := 0; i < maximumWorker; i++ {
        go concurrentPrint(i, &wg)
    }
    wg.Wait()
}

When run it prints (mostly in order) all the numbers from 1 to 1000 (200 consumers getting a number 5 times each). I would've expected that some consumer would print the exact same number but it seems that the requestChan is working like a barrier preventing this even if there are 20 goroutines serving the generateStuff that generate the number by increasing a global variable.

What am i getting wrong about Go or Concurrency in general?

I would've expected a situation in like two go routines of type generateStuff would've been woke up together and increase seq at the same time thus having something like two consumers printing the same number two times.

EDIT Code on playgolang: http://play.golang.org/p/eRzNXjdxtZ

展开全部

  • 写回答

2条回答 默认 最新

  • duanjuebin2519 2015-04-04 13:36
    关注

    You have multiple workers that can all run at the same time and all try and make requests at the same time. Since requestChan is unbuffered they all block waiting for a reader to synchronize with and take their request.

    You have multiple generators that will synchronize with a requester via requestChan, produce a result, and then block on the unbuffered generatorChan until a worker reads the result. Note it may be a different worker.

    There is no additional synchronization so everything else is non-deterministic.

    • One generator could field all the requests.
    • A generator could grab a request and get through incrementing seq before any other generator happens to get a chance to run. With only one processor this may even be likely.
    • All the generators could grab requests and all end up wanting to increment seq at exactly the same time causing all kinds of problems.
    • The workers can get responses from the same generator they happened to send to or from a completely different one.

    In general, without adding synchronization to force one of these behaviors there is no way you can ensure any of these actually happen.

    Note that with the data race, that itself is another non-deterministic event. It's possible to get arbitrary values, program crashes, etc. It's not safe to assume that under race conditions the value may just be off by one or some such relatively innocuous result.

    For experimenting, the best you may be able to do is crank up GOMAXPROCS. Either through an environment variable (e.g. something like env GOMAXPROCS=16 go run foo.go or env GOMAXPROCS=16 ./foo after go build) or by calling runtime.GOMAXPROCS(16) from your program. The default is 1 and this means that data races or other "strange" behavior may be hidden.

    You can also influence things a little by adding calls to runtime.Gosched or time.Sleep at various points.

    You can also see the data race if you use the race detector (e.g. with go run -race foo.goo or go build -race). Not only should the program show "Found 1 data race(s)" on exit but it should also dump out a lot of details with stack traces when the race is first detected.

    Here is a "cleaned up" version of your code for experimentation:

    package main
    
    import (
        "log"
        "sync"
        "sync/atomic"
    )
    
    var seq uint64 = 0
    var generatorChan = make(chan uint64)
    var requestChan = make(chan uint64)
    
    func generator(genID int) {
        for reqID := range requestChan {
            // If you want to see a data race:
            //seq = seq + 1
            // Else:
            s := atomic.AddUint64(&seq, 1)
            log.Printf("Gen: %2d, from %3d", genID, reqID)
            generatorChan <- s
        }
    }
    
    func worker(id int, work *sync.WaitGroup) {
        defer work.Done()
    
        for i := 0; i < 5; i++ {
            requestChan <- uint64(id)
            log.Printf("\t\t\tWorker: %3d got %4d", id, <-generatorChan)
        }
    }
    
    func main() {
        log.SetFlags(log.Lmicroseconds)
        const (
            numGen    = 20
            numWorker = 200
        )
        var wg sync.WaitGroup
        for i := 0; i < numGen; i++ {
            go generator(i)
        }
        wg.Add(numWorker)
        for i := 0; i < numWorker; i++ {
            go worker(i, &wg)
        }
        wg.Wait()
        close(requestChan)
    }
    

    <kbd>Playground</kbd> (but note that the timestamps on the playground won't be useful and calling runtime.MAXPROCS may not do anything). Further note that the playground caches results so re-running the exact same program will always show the same output, you need to make some small change or just run it on your own machine.

    Largely small changes like shunting down the generator, using log versus fmt since the former makes concurrency guarantees, removing the data race, making the output look nicer, etc.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)
编辑
预览

报告相同问题?