dongweng6241 2017-09-27 13:07
浏览 59

RabbitMQ多工模式

I'm trying to find a good method to consume asynchronously from an input queue, process the content using several workers and then publish to an output queue. So far I've tried a number of examples, most recently using the code from here and here as inspiration.

My current code doesn't appear to be doing what it should be however, increasing the number of workers doesn't increase performance (msg/s consumed or published) and the number of goroutines remains fairly static whilst running.

main:

func main() {
    maxWorkers := 10

    // channel for jobs
    in := make(chan []byte)
    out := make(chan []byte)

    // start workers
    wg := &sync.WaitGroup{}
    wg.Add(maxWorkers)
    for i := 1; i <= maxWorkers; i++ {
        log.Println(i)
        defer wg.Done()
        go processor(in, out)
    }

    // add jobs
    go collector(in)
    go sender(out)

    // wait for workers to complete
    wg.Wait()
}

The collector is basically the example from the RabbitMQ site with a goroutine that collects messages from the queue and places them on the 'in' channel:

forever := make(chan bool)
go func() {
    for d := range msgs {
        in <- d.Body
        d.Ack(false)
    }
}()
log.Printf("[*] Waiting for messages. To exit press CTRL+C")
<-forever

The processor receives an 'in' and 'out' channel, unmarshals JSON, performs a series of regexes and then places the output into the 'out' channel:

func processor(in chan []byte, out chan []byte) {

    var (
    // list of regexes declared here
    )

    for {
        body := <-in

        jsonIn := &Data{}
        err := json.Unmarshal(body, jsonIn)
        if err != nil {
            log.Fatalln("Failed to decode:", err)
        }

        content := jsonIn.Content

        //process regexes using:
        //jsonIn.a = r1.FindAllString(content, -1)

        jsonOut, _ := json.Marshal(jsonIn)

        out <- jsonOut
    }
}

And finally the sender is simply the code from the RabbitMQ site, setting up a connection, reading from the 'out' channel and then publishing to a RMQ queue:

for {
    jsonOut := <-out

    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/json",
            Body:         []byte(jsonOut),
        })
    failOnError(err, "Failed to publish a message")

}

This is a pattern that I'll be using quite a lot, so I'm spending a lot of time trying to find something that works correctly (and well) - any advice or help would be appreciated (and in case it isn't obvious, I'm new to Go).

  • 写回答

1条回答 默认 最新

  • dreamer1231 2017-09-27 13:47
    关注

    There are a couple of things that jump out:

    Done within main function

    wg.Add(maxWorkers)
    for i := 1; i <= maxWorkers; i++ {
        log.Println(i)
        defer wg.Done()
        go processor(in, out)
    }
    

    The defer here is executed when main returns so it's not actually indicating when processing is complete. I don't think this'll have an effect on the performance profile of your program though.

    To address this you could pass in wg *sync.WaitGroup to your processor so your processor can indicate when it's done.

    CPU Bound Processing

    Parsing messages and performing Regex is a cpu intensive workload. How many cores is your machine? How is throughput affected if you run your program on two separate machines, does throughput 2x? What if you double your amount of cores? What about running your program with 1 worker vs 2 processor workers? does that double throughput? Are you maxing out your rabbitmq local instance? is it the bottleneck??

    Setting up benchmarking and load testing harnesses should allow you to setup experiments to see where your bottle necks are :)

    For queue based services it's pretty easy to setup a test harness to fill rabbitmq with a set backlog and benchmark how fast you can process those messages, or to setup a load generator to send x messages/second to rabbitmq and observe if you can keep up.

    Does rabbitmq have good visibility into message processing throughput? If not I frequently add a counter to go code and then log the overall averaged throughput on an interval to get a rough idea of performance:

    start := time.Now()
    updateInterval := time.Tick(1 * time.Second)
    numIn := 0
    for {
        select {
        case <-updateInterval:
            log.Infof("IN - Count: %d", numIn)
            log.Infof("IN - Througput: %.0f events/second",
                float64(numIn)/(time.Now().Sub(start)).Seconds())
        case e := <-msgs:
            numIn++
            in <- d.Body
            d.Ack(false)
        }
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥35 平滑拟合曲线该如何生成
  • ¥100 c语言,请帮蒟蒻写一个题的范例作参考
  • ¥15 名为“Product”的列已属于此 DataTable
  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 自己瞎改改,结果现在又运行不了了
  • ¥15 链式存储应该如何解决
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站