I am using the recommending streadyway/amqp library to interact with RabbitMQ inside Go.
I want my service to gracefully fail when it cannot connect to RabbitMQ. This means noticing that it couldn't connect, waiting n
seconds, and trying to reconnect (loop forever).
I'm pretty sure I need use the Channel.NotifyClose method, but I can't work it out:
func (ch *Channel) NotifyClose(c chan *Error) chan *Error
NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method. Connection exceptions will be broadcast to all open channels and all channels will be closed, where channel exceptions will only be broadcast to listeners to this channel.
The chan provided will be closed when the Channel is closed and on a graceful close, no error will be sent.
This is what I attempted:
graceful := make(chan *amqp.Error)
errs := channel.NotifyClose(graceful)
for {
case <-graceful:
fmt.Println("Graceful close!")
reconnect()
case <-errs:
fmt.Println("Not graceful close")
reconnect()
}
Sometimes, this works! Othertimes, after reconnecting, it will repeatedly print out:
2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
...
Very rapidly.
I want to be able to run the service in one terminal, and rabbit in the other. I should be able to stop & restart rabbit whenever I like, with the service reconnecting consistently.
I'm a bit confused about the NotifyClose
method - is the c
chan just closed when the connection is closed? Why does it return another channel?
Cheers.
My entire code. This doesn't have a push
or pop
function, because this is a minimal example to demonstrate re-connecting when a connection fails. Push and pop implementations would depend upon how reconnection is implemented.
Any code review comments also welcome.
package main
import (
"github.com/streadway/amqp"
"io"
"log"
"sync"
"time"
)
// RabbitMQ ...
type RabbitMQ struct {
Logger *log.Logger
IsConnected bool
addr string
name string
connection *amqp.Connection
channel *amqp.Channel
queue *amqp.Queue
wg *sync.WaitGroup
done chan bool
}
const retryDelay = 5 * time.Second
// NewQueue creates a new queue instance.
func NewQueue(logOut io.Writer, name string, addr string) *RabbitMQ {
rabbit := RabbitMQ{
IsConnected: false,
addr: addr,
name: name,
wg: new(sync.WaitGroup),
done: make(chan bool),
Logger: log.New(logOut, "", log.LstdFlags),
}
rabbit.wg.Add(1)
rabbit.Connect()
go rabbit.reconnect()
return &rabbit
}
// reconnect waits to be notified about a connection
// error, and then attempts to reconnect to RabbitMQ.
func (rabbit *RabbitMQ) reconnect() {
defer rabbit.wg.Done()
graceful := make(chan *amqp.Error)
errs := rabbit.channel.NotifyClose(graceful)
for {
select {
case <-rabbit.done:
return
case <-graceful:
graceful = make(chan *amqp.Error)
rabbit.Logger.Println("Graceful close!")
rabbit.IsConnected = false
rabbit.Connect()
rabbit.IsConnected = true
errs = rabbit.channel.NotifyClose(graceful)
case <-errs:
graceful = make(chan *amqp.Error)
rabbit.Logger.Println("Normal close")
rabbit.IsConnected = false
rabbit.Connect()
errs = rabbit.channel.NotifyClose(graceful)
}
}
}
// Connect will block until a new connection to
// RabbitMQ is formed.
func (rabbit *RabbitMQ) Connect() {
for {
conn, err := amqp.Dial(rabbit.addr)
if err != nil {
rabbit.Logger.Println("Failed to establish connection")
time.Sleep(retryDelay)
continue
}
ch, err := conn.Channel()
if err != nil {
rabbit.Logger.Println("Failed to create a channel")
time.Sleep(retryDelay)
continue
}
queue, err := ch.QueueDeclare(
name,
false, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
rabbit.Logger.Println("Failed to publish a queue")
time.Sleep(retryDelay)
continue
}
rabbit.Logger.Println("Connected!")
rabbit.IsConnected = true
rabbit.connection = conn
rabbit.channel = ch
rabbit.queue = &queue
return
}
}
// Close the connection to RabbitMQ and stop
// checking for reconnections.
func (rabbit *RabbitMQ) Close() error {
close(rabbit.done)
rabbit.wg.Wait()
return rabbit.connection.Close()
}
And how this is used:
package main
import (
"fmt"
"os"
)
const (
name = "job_queue"
addr = "amqp://guest:guest@localhost:5672/"
)
func main() {
fmt.Println("Starting...")
NewQueue(os.Stdout, name, addr)
for {}
}