I created the following methods that connects to RabbitMQ and uses a channel to notify my worker every time a message pops in.
It's working fine, but only to messages that arrive while the worker is running. I can't get pre-existing messages from the queue, they stand there forever, or until RabbitMQ destroys them.
package externalservices
import (
"../domain"
"encoding/json"
"github.com/streadway/amqp"
"os"
)
const (
catalogQueue = "catalog-queue"
)
// DequeueMessageCatalog is nice
func DequeueMessageCatalog(messageChannel chan domain.Catalog) {
message := make(chan []byte)
defer close(message)
for true {
go func() {
dequeue(catalogQueue, message)
}()
currCatalog := domain.Catalog{}
json.Unmarshal([]byte(<-message), &currCatalog)
messageChannel <- currCatalog
}
}
func openConnection() (*amqp.Connection, *amqp.Channel, error) {
connString := os.Getenv("RabbitMQConStr")
conn, err := amqp.Dial(connString)
if err != nil {
return nil, nil, err
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, nil, err
}
return conn, ch, nil
}
func ensureQueueExists(queueName string, ch *amqp.Channel) (amqp.Queue, error) {
q, err := ch.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
return q, err
}
func dequeue(queueName string, message chan []byte) error {
con, ch, err := openConnection()
if err != nil {
return err
}
defer con.Close()
defer ch.Close()
q, err := ensureQueueExists(queueName, ch)
if err != nil {
return err
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
true, // no-wait
nil, // args
)
if err != nil {
return err
}
for currMsg := range msgs {
message <- currMsg.Body
}
return nil
}
Is there any additional method or property I need to set so I can also get messages enqueued before my worker started running?