doufubu2518 2017-12-18 16:06
浏览 79
已采纳

AMQP Golang优先级不起作用

I am currently testing rabbitmq with golang (github.com/streadway/amqp), and I have three programs, two of them sending messages to the queue with different priorities and one reading from the queue.
And the problem I am having is that after sending a few messages with the two programs I then proceed to launch the program that will read from the queue and when it starts to read from the queue it output the result like a FIFO.
I expected to output the high priority first followed by the lower priority message.
Did I misunderstand how rabbitmq works or am I doing something wrong?

Sending to queues package main

import (
        "log"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        args := make(amqp.Table)
        args["x-max-priority"] = int64(9)

        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "test", // name                                                                                                                                                                                     
                false,   // durable                                                                                                                                                                                 
                false,   // delete when unused                                                                                                                                                                      
                false,   // exclusive                                                                                                                                                                               
                false,   // no-wait                                                                                                                                                                                 
                args,     // arguments                                                                                                                                                                              
        )
        failOnError(err, "Failed to declare a queue")

        body := "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")

        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")

        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "high"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 9,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")

}

Reading queue:

package main

import (
        "log"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        args := make(amqp.Table)
        args["x-max-priority"] = int64(9)

        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")

        q, err := ch.QueueDeclare(
                "test", // name                                                                                                                                                                                     
                false,   // durable                                                                                                                                                                                 
                false,   // delete when unused                                                                                                                                                                      
                false,   // exclusive                                                                                                                                                                               
                false,   // no-wait                                                                                                                                                                                 
                args,     // arguments                                                                                                                                                                              
        )
        failOnError(err, "Failed to declare a queue")

        msgs, err := ch.Consume(
                q.Name, // queue                                                                                                                                                                                    
                "",     // consumer                                                                                                                                                                                 
                true,   // auto-ack                                                                                                                                                                                 
                false,  // exclusive                                                                                                                                                                                
                false,  // no-local                                                                                                                                                                                 
                false,  // no-wait                                                                                                                                                                                  
                args,    // args                                                                                                                                                                                    
        )
        failOnError(err, "Failed to register a consumer")

        defer ch.Close()
        defer conn.Close()

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf("Received a message: %s", d.Body)
                }
        }()

        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
}

First I launch the code sending the messages to the queue the following values:

  • low
  • low
  • low
  • high
  • low
  • low
  • low

With the low messages having a priority of 0 and the high a priority of 9.
Then I launch the program that will receive the queue.
Expected output:

  • high
  • low
  • low
  • low
  • low
  • low
  • low

Actual output:

  • low
  • low
  • low
  • high
  • low
  • low
  • low

Thank you

  • 写回答

2条回答 默认 最新

  • dougan1465 2017-12-20 16:15
    关注

    I updated to the most recent rabbitmq and it worked, I do not know if this is the only solution.

    As well as callign ch.Qos(1, 0, false) I also needed to remove the auto-ack and acknowledge the message manually afterwards so that it doesn't acknowledge all the messages in the queue.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 CVRP 图论 物流运输优化
  • ¥15 Tableau online 嵌入ppt失败
  • ¥100 支付宝网页转账系统不识别账号
  • ¥15 基于单片机的靶位控制系统
  • ¥15 真我手机蓝牙传输进度消息被关闭了,怎么打开?(关键词-消息通知)
  • ¥15 下图接收小电路,谁知道原理
  • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?
  • ¥20 IOS游览器某宝手机网页版自动立即购买JavaScript脚本
  • ¥15 手机接入宽带网线,如何释放宽带全部速度
  • ¥30 关于#r语言#的问题:如何对R语言中mfgarch包中构建的garch-midas模型进行样本内长期波动率预测和样本外长期波动率预测