2017-02-01
I have a RabbitMQ consumer script in Go (this is a simple script from RabbitMQ tutorial that uses streadway/amqp library)

Te problem is that if the rabbitmq-server is stopped the consumer script does not exit and when rabbitmq-server is started again the consumer does not receive messages anymore.

Is there a way to detect that the consumer connection is dead and reconnect or at least terminate the consumer script?

I know that it sets default 10 sec. heartbeat interval for the connection, is it possible to use it someway?

Thanks for any help

    func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
            "test_task_queue", // name
            true,         // durable
            false,        // delete when unused
            false,        // exclusive
            false,        // no-wait
            nil,          // arguments
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
        failOnError(err, "Failed to set QoS")

        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)

        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  2017-02-02

    amqp.Connection has method NotifyClose() which return channel signalling a transport or protocol error. So something like

    for {  //reconnection loop
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") //setup
        notify := conn.NotifyClose(make(chan *amqp.Error)) //error channel
        ch, err := conn.Channel()
        msgs, err := ch.Consume(
        for{  //receive loop
            select {  //check connection
                case err = <-notify:
                //work with error
                break //reconnect
            case d = <- msgs:
                //work with message



