duanqianmou4661 2018-11-04 04:31
浏览 851

断开连接后重新连接到RabbitMQ

I am using the recommending streadyway/amqp library to interact with RabbitMQ inside Go.

I want my service to gracefully fail when it cannot connect to RabbitMQ. This means noticing that it couldn't connect, waiting n seconds, and trying to reconnect (loop forever).

I'm pretty sure I need use the Channel.NotifyClose method, but I can't work it out:

func (ch *Channel) NotifyClose(c chan *Error) chan *Error

NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method. Connection exceptions will be broadcast to all open channels and all channels will be closed, where channel exceptions will only be broadcast to listeners to this channel.

The chan provided will be closed when the Channel is closed and on a graceful close, no error will be sent.

This is what I attempted:

graceful := make(chan *amqp.Error)
errs := channel.NotifyClose(graceful)
for {
    case <-graceful:
        fmt.Println("Graceful close!")
        reconnect()
    case <-errs:
        fmt.Println("Not graceful close")
        reconnect()
}

Sometimes, this works! Othertimes, after reconnecting, it will repeatedly print out:

2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
...

Very rapidly.

I want to be able to run the service in one terminal, and rabbit in the other. I should be able to stop & restart rabbit whenever I like, with the service reconnecting consistently.

I'm a bit confused about the NotifyClose method - is the c chan just closed when the connection is closed? Why does it return another channel?

Cheers.


My entire code. This doesn't have a push or pop function, because this is a minimal example to demonstrate re-connecting when a connection fails. Push and pop implementations would depend upon how reconnection is implemented.

Any code review comments also welcome.

package main

import (
    "github.com/streadway/amqp"
    "io"
    "log"
    "sync"
    "time"
)

// RabbitMQ ...
type RabbitMQ struct {
    Logger      *log.Logger
    IsConnected bool
    addr        string
    name        string
    connection  *amqp.Connection
    channel     *amqp.Channel
    queue       *amqp.Queue
    wg          *sync.WaitGroup
    done        chan bool
}

const retryDelay = 5 * time.Second

// NewQueue creates a new queue instance.
func NewQueue(logOut io.Writer, name string, addr string) *RabbitMQ {
    rabbit := RabbitMQ{
        IsConnected: false,
        addr:        addr,
        name:        name,
        wg:          new(sync.WaitGroup),
        done:        make(chan bool),
        Logger:      log.New(logOut, "", log.LstdFlags),
    }
    rabbit.wg.Add(1)
    rabbit.Connect()
    go rabbit.reconnect()
    return &rabbit
}

// reconnect waits to be notified about a connection
// error, and then attempts to reconnect to RabbitMQ.
func (rabbit *RabbitMQ) reconnect() {
    defer rabbit.wg.Done()
    graceful := make(chan *amqp.Error)
    errs := rabbit.channel.NotifyClose(graceful)
    for {
        select {
        case <-rabbit.done:
            return
        case <-graceful:
            graceful = make(chan *amqp.Error)
            rabbit.Logger.Println("Graceful close!")
            rabbit.IsConnected = false
            rabbit.Connect()
            rabbit.IsConnected = true
            errs = rabbit.channel.NotifyClose(graceful)
        case <-errs:
            graceful = make(chan *amqp.Error)
            rabbit.Logger.Println("Normal close")
            rabbit.IsConnected = false
            rabbit.Connect()
            errs = rabbit.channel.NotifyClose(graceful)
        }
    }
}

// Connect will block until a new connection to
// RabbitMQ is formed.
func (rabbit *RabbitMQ) Connect() {
    for {
        conn, err := amqp.Dial(rabbit.addr)
        if err != nil {
            rabbit.Logger.Println("Failed to establish connection")
            time.Sleep(retryDelay)
            continue
        }
        ch, err := conn.Channel()
        if err != nil {
            rabbit.Logger.Println("Failed to create a channel")
            time.Sleep(retryDelay)
            continue
        }
        queue, err := ch.QueueDeclare(
            name,
            false, // Durable
            false, // Delete when unused
            false, // Exclusive
            false, // No-wait
            nil,   // Arguments
        )
        if err != nil {
            rabbit.Logger.Println("Failed to publish a queue")
            time.Sleep(retryDelay)
            continue
        }
        rabbit.Logger.Println("Connected!")
        rabbit.IsConnected = true
        rabbit.connection = conn
        rabbit.channel = ch
        rabbit.queue = &queue
        return
    }
}

// Close the connection to RabbitMQ and stop
// checking for reconnections.
func (rabbit *RabbitMQ) Close() error {
    close(rabbit.done)
    rabbit.wg.Wait()
    return rabbit.connection.Close()
}

And how this is used:

package main

import (
    "fmt"
    "os"
)

const (
    name = "job_queue"
    addr = "amqp://guest:guest@localhost:5672/"
)

func main() {
    fmt.Println("Starting...")
    NewQueue(os.Stdout, name, addr)
    for {}
}
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 Arduino红外遥控代码有问题
    • ¥15 数值计算离散正交多项式
    • ¥30 数值计算均差系数编程
    • ¥15 redis-full-check比较 两个集群的数据出错
    • ¥15 Matlab编程问题
    • ¥15 训练的多模态特征融合模型准确度很低怎么办
    • ¥15 kylin启动报错log4j类冲突
    • ¥15 超声波模块测距控制点灯,灯的闪烁很不稳定,经过调试发现测的距离偏大
    • ¥15 import arcpy出现importing _arcgisscripting 找不到相关程序
    • ¥15 onvif+openssl,vs2022编译openssl64