I have a Go application processing events from a single RabbitMQ queue. I use the github.com/streadway/amqp
RabbitMQ Client Library.
The Go application processes every message in ~2-3 seconds. It's possible to process ~1000 or even more messages in parallel, if I feed them from memory. But, unfortunately, RabbitMQ performance is worse. So, I want to consume messages from queue faster.
So, the question is: how to consume messages in most effective manner using github.com/streadway/amqp
?
As far as I understand, there are two approaches:
-
set high prefetch
https://godoc.org/github.com/streadway/amqp#Channel.Qos.
Use single consumer goroutine
Example code:
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
ch.Qos(
10000, // prefetch count
0, // prefetch size
false, // global
)
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // NO auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
err:= processMessage(d)
if err != nil {
log.Printf("%s : while consuming task", err)
d.Nack(false, true)
} else {
d.Ack(false)
}
continue // consume other messages
}
But DO the processMessage
will be called here in parallel?
- spawn many channels and use multiple consumers
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
var i = 0
for i = 0; i<=100; i++ {
go func(){
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
ch.Qos(
10, // prefetch count
0, // prefetch size
false, // global
)
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // NO auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
err:= processMessage(d)
if err != nil {
log.Printf("%s : while consuming task", err)
d.Nack(false, true)
} else {
d.Ack(false)
}
continue // consume other messages
}
}()
}
But is this a RAM friendly approach? Isn't spawning a new channel for each worker is quite dramatic for RabbitMQ?
So, question is, which variant is better? Better performance, better memory usage, etc.
So, what is the optimal usage of RabbitMQ here?
Update: currently, I encountered a case when my worker consumes all RAM on VPS, and is OOM-killed. I used second approach for it. So, better in my case is ability to keep my worker without OOM killing after few minutes of work.
Update 2: nack
when worker failed to process message, and ack
when worker processed message is very important. All messages has to be processed (its customers analytics), but sometimes worker cannot process it, so it have to nack
message to pass it to other workers (currently, some 3rd party api used to process messages sometimes simply returns 503 status code, in this case message should be passed to other worker or retried).
SO, using auto-ack
is unfortunately not an option.