doufubu2518
doufubu2518
2017-12-18 16:06

AMQP Golang优先级不起作用

已采纳

I am currently testing rabbitmq with golang (github.com/streadway/amqp), and I have three programs, two of them sending messages to the queue with different priorities and one reading from the queue.
And the problem I am having is that after sending a few messages with the two programs I then proceed to launch the program that will read from the queue and when it starts to read from the queue it output the result like a FIFO.
I expected to output the high priority first followed by the lower priority message.
Did I misunderstand how rabbitmq works or am I doing something wrong?

Sending to queues package main

import (
        "log"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        args := make(amqp.Table)
        args["x-max-priority"] = int64(9)

        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "test", // name                                                                                                                                                                                     
                false,   // durable                                                                                                                                                                                 
                false,   // delete when unused                                                                                                                                                                      
                false,   // exclusive                                                                                                                                                                               
                false,   // no-wait                                                                                                                                                                                 
                args,     // arguments                                                                                                                                                                              
        )
        failOnError(err, "Failed to declare a queue")

        body := "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")

        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")

        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "high"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 9,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")

}

Reading queue:

package main

import (
        "log"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        args := make(amqp.Table)
        args["x-max-priority"] = int64(9)

        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")

        q, err := ch.QueueDeclare(
                "test", // name                                                                                                                                                                                     
                false,   // durable                                                                                                                                                                                 
                false,   // delete when unused                                                                                                                                                                      
                false,   // exclusive                                                                                                                                                                               
                false,   // no-wait                                                                                                                                                                                 
                args,     // arguments                                                                                                                                                                              
        )
        failOnError(err, "Failed to declare a queue")

        msgs, err := ch.Consume(
                q.Name, // queue                                                                                                                                                                                    
                "",     // consumer                                                                                                                                                                                 
                true,   // auto-ack                                                                                                                                                                                 
                false,  // exclusive                                                                                                                                                                                
                false,  // no-local                                                                                                                                                                                 
                false,  // no-wait                                                                                                                                                                                  
                args,    // args                                                                                                                                                                                    
        )
        failOnError(err, "Failed to register a consumer")

        defer ch.Close()
        defer conn.Close()

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf("Received a message: %s", d.Body)
                }
        }()

        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
}

First I launch the code sending the messages to the queue the following values:

  • low
  • low
  • low
  • high
  • low
  • low
  • low

With the low messages having a priority of 0 and the high a priority of 9.
Then I launch the program that will receive the queue.
Expected output:

  • high
  • low
  • low
  • low
  • low
  • low
  • low

Actual output:

  • low
  • low
  • low
  • high
  • low
  • low
  • low

Thank you

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

2条回答

  • dougan1465 dougan1465 4年前

    I updated to the most recent rabbitmq and it worked, I do not know if this is the only solution.

    As well as callign ch.Qos(1, 0, false) I also needed to remove the auto-ack and acknowledge the message manually afterwards so that it doesn't acknowledge all the messages in the queue.

    点赞 评论 复制链接分享
  • duanou9758 duanou9758 4年前

    This is what I can figure out from your code, and the description of the steps you follow.

    It seems the issue is you do not create the queue the same way in each piece of code, the low and high priority producers create the queue withouth x-max-priority argument.

    As a queue is defined as a priority queue upon creation, and only at that time, you actually created a "standard" queue.

    You should be able to easily confirm this checking the created queue and its arguments.

    点赞 评论 复制链接分享

相关推荐