dongwu3596 2019-08-06 12:11
浏览 523

RabbitMQ消费者性能-预取与并发

I have a Go application processing events from a single RabbitMQ queue. I use the github.com/streadway/amqp RabbitMQ Client Library.

The Go application processes every message in ~2-3 seconds. It's possible to process ~1000 or even more messages in parallel, if I feed them from memory. But, unfortunately, RabbitMQ performance is worse. So, I want to consume messages from queue faster.

So, the question is: how to consume messages in most effective manner using github.com/streadway/amqp?

As far as I understand, there are two approaches:

  1. set high prefetch

    https://godoc.org/github.com/streadway/amqp#Channel.Qos.

    Use single consumer goroutine

    Example code:

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

ch.Qos(
        10000,           // prefetch count
        0,               // prefetch size
        false,           // global
    )

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // NO auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)

for d := range msgs {
  log.Printf("Received a message: %s", d.Body)
  err:= processMessage(d)
  if err != nil {
      log.Printf("%s : while consuming task", err)
      d.Nack(false, true)
  } else {
      d.Ack(false)
  }
  continue // consume other messages
}

But DO the processMessage will be called here in parallel?

  1. spawn many channels and use multiple consumers
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
var i = 0
for i = 0; i<=100; i++ {
  go func(){
      ch, err := conn.Channel()
      failOnError(err, "Failed to open a channel")
      defer ch.Close()

      ch.Qos(
            10,           // prefetch count
            0,               // prefetch size
            false,           // global
    )

      msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // NO auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
      )

      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        err:= processMessage(d)
        if err != nil {
            log.Printf("%s : while consuming task", err)
            d.Nack(false, true)
        } else {
            d.Ack(false)
        }
        continue // consume other messages
      }
  }()
}

But is this a RAM friendly approach? Isn't spawning a new channel for each worker is quite dramatic for RabbitMQ?

So, question is, which variant is better? Better performance, better memory usage, etc.

So, what is the optimal usage of RabbitMQ here?

Update: currently, I encountered a case when my worker consumes all RAM on VPS, and is OOM-killed. I used second approach for it. So, better in my case is ability to keep my worker without OOM killing after few minutes of work.

Update 2: nack when worker failed to process message, and ack when worker processed message is very important. All messages has to be processed (its customers analytics), but sometimes worker cannot process it, so it have to nack message to pass it to other workers (currently, some 3rd party api used to process messages sometimes simply returns 503 status code, in this case message should be passed to other worker or retried). SO, using auto-ack is unfortunately not an option.

  • 写回答

1条回答 默认 最新

  • doufuhao8085 2019-08-08 04:56
    关注

    I suppose each processMessage() run in a new goroutine.

    Which variant is better?

    I prefer the first one, because open/close channel is a little bit expensive (2 + 2 TCP packets). I think your OOM problem is not related to too many gorutine, gorutine is very light, just cost about 5KB. So the problem is probably caused by your processMessage().

    I think the github.com/streadway/amqp channel consume operation is thread/gorutine-safe, so it is safe to share channel between goruntine if you just do some consume operation.

    评论

报告相同问题?

悬赏问题

  • ¥15 素材场景中光线烘焙后灯光失效
  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器