douchui7332 2019-07-15 11:09
浏览 67

如何在RabbitMQ上阅读现有消息

I created the following methods that connects to RabbitMQ and uses a channel to notify my worker every time a message pops in.

It's working fine, but only to messages that arrive while the worker is running. I can't get pre-existing messages from the queue, they stand there forever, or until RabbitMQ destroys them.

package externalservices

import (
    "../domain"
    "encoding/json"
    "github.com/streadway/amqp"
    "os"
)

const (
    catalogQueue = "catalog-queue"
)

// DequeueMessageCatalog is nice
func DequeueMessageCatalog(messageChannel chan domain.Catalog) {

    message := make(chan []byte)
    defer close(message)

    for true {
        go func() {
            dequeue(catalogQueue, message)
        }()

        currCatalog := domain.Catalog{}
        json.Unmarshal([]byte(<-message), &currCatalog)
        messageChannel <- currCatalog
    }
}

func openConnection() (*amqp.Connection, *amqp.Channel, error) {
    connString := os.Getenv("RabbitMQConStr")
    conn, err := amqp.Dial(connString)
    if err != nil {
        return nil, nil, err
    }

    ch, err := conn.Channel()
    if err != nil {
        conn.Close()
        return nil, nil, err
    }
    return conn, ch, nil
}

func ensureQueueExists(queueName string, ch *amqp.Channel) (amqp.Queue, error) {
    q, err := ch.QueueDeclare(
        queueName, // name
        false,     // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )

    return q, err
}

func dequeue(queueName string, message chan []byte) error {
    con, ch, err := openConnection()
    if err != nil {
        return err
    }

    defer con.Close()
    defer ch.Close()

    q, err := ensureQueueExists(queueName, ch)
    if err != nil {
        return err
    }
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        true,  // no-wait
        nil,    // args
    )
    if err != nil {
        return err
    }
    for currMsg := range msgs {
        message <- currMsg.Body
    }
    return nil
}

Is there any additional method or property I need to set so I can also get messages enqueued before my worker started running?

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 求差集那个函数有问题,有无佬可以解决
    • ¥15 MATLAB动图问题
    • ¥15 【提问】基于Invest的水源涵养
    • ¥20 微信网友居然可以通过vx号找到我绑的手机号
    • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
    • ¥15 解riccati方程组
    • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
    • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
    • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
    • ¥50 树莓派安卓APK系统签名