duanqianmou4661 2018-11-04 04:31
浏览 851

断开连接后重新连接到RabbitMQ

I am using the recommending streadyway/amqp library to interact with RabbitMQ inside Go.

I want my service to gracefully fail when it cannot connect to RabbitMQ. This means noticing that it couldn't connect, waiting n seconds, and trying to reconnect (loop forever).

I'm pretty sure I need use the Channel.NotifyClose method, but I can't work it out:

func (ch *Channel) NotifyClose(c chan *Error) chan *Error

NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method. Connection exceptions will be broadcast to all open channels and all channels will be closed, where channel exceptions will only be broadcast to listeners to this channel.

The chan provided will be closed when the Channel is closed and on a graceful close, no error will be sent.

This is what I attempted:

graceful := make(chan *amqp.Error)
errs := channel.NotifyClose(graceful)
for {
    case <-graceful:
        fmt.Println("Graceful close!")
        reconnect()
    case <-errs:
        fmt.Println("Not graceful close")
        reconnect()
}

Sometimes, this works! Othertimes, after reconnecting, it will repeatedly print out:

2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
...

Very rapidly.

I want to be able to run the service in one terminal, and rabbit in the other. I should be able to stop & restart rabbit whenever I like, with the service reconnecting consistently.

I'm a bit confused about the NotifyClose method - is the c chan just closed when the connection is closed? Why does it return another channel?

Cheers.


My entire code. This doesn't have a push or pop function, because this is a minimal example to demonstrate re-connecting when a connection fails. Push and pop implementations would depend upon how reconnection is implemented.

Any code review comments also welcome.

package main

import (
    "github.com/streadway/amqp"
    "io"
    "log"
    "sync"
    "time"
)

// RabbitMQ ...
type RabbitMQ struct {
    Logger      *log.Logger
    IsConnected bool
    addr        string
    name        string
    connection  *amqp.Connection
    channel     *amqp.Channel
    queue       *amqp.Queue
    wg          *sync.WaitGroup
    done        chan bool
}

const retryDelay = 5 * time.Second

// NewQueue creates a new queue instance.
func NewQueue(logOut io.Writer, name string, addr string) *RabbitMQ {
    rabbit := RabbitMQ{
        IsConnected: false,
        addr:        addr,
        name:        name,
        wg:          new(sync.WaitGroup),
        done:        make(chan bool),
        Logger:      log.New(logOut, "", log.LstdFlags),
    }
    rabbit.wg.Add(1)
    rabbit.Connect()
    go rabbit.reconnect()
    return &rabbit
}

// reconnect waits to be notified about a connection
// error, and then attempts to reconnect to RabbitMQ.
func (rabbit *RabbitMQ) reconnect() {
    defer rabbit.wg.Done()
    graceful := make(chan *amqp.Error)
    errs := rabbit.channel.NotifyClose(graceful)
    for {
        select {
        case <-rabbit.done:
            return
        case <-graceful:
            graceful = make(chan *amqp.Error)
            rabbit.Logger.Println("Graceful close!")
            rabbit.IsConnected = false
            rabbit.Connect()
            rabbit.IsConnected = true
            errs = rabbit.channel.NotifyClose(graceful)
        case <-errs:
            graceful = make(chan *amqp.Error)
            rabbit.Logger.Println("Normal close")
            rabbit.IsConnected = false
            rabbit.Connect()
            errs = rabbit.channel.NotifyClose(graceful)
        }
    }
}

// Connect will block until a new connection to
// RabbitMQ is formed.
func (rabbit *RabbitMQ) Connect() {
    for {
        conn, err := amqp.Dial(rabbit.addr)
        if err != nil {
            rabbit.Logger.Println("Failed to establish connection")
            time.Sleep(retryDelay)
            continue
        }
        ch, err := conn.Channel()
        if err != nil {
            rabbit.Logger.Println("Failed to create a channel")
            time.Sleep(retryDelay)
            continue
        }
        queue, err := ch.QueueDeclare(
            name,
            false, // Durable
            false, // Delete when unused
            false, // Exclusive
            false, // No-wait
            nil,   // Arguments
        )
        if err != nil {
            rabbit.Logger.Println("Failed to publish a queue")
            time.Sleep(retryDelay)
            continue
        }
        rabbit.Logger.Println("Connected!")
        rabbit.IsConnected = true
        rabbit.connection = conn
        rabbit.channel = ch
        rabbit.queue = &queue
        return
    }
}

// Close the connection to RabbitMQ and stop
// checking for reconnections.
func (rabbit *RabbitMQ) Close() error {
    close(rabbit.done)
    rabbit.wg.Wait()
    return rabbit.connection.Close()
}

And how this is used:

package main

import (
    "fmt"
    "os"
)

const (
    name = "job_queue"
    addr = "amqp://guest:guest@localhost:5672/"
)

func main() {
    fmt.Println("Starting...")
    NewQueue(os.Stdout, name, addr)
    for {}
}
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
    • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
    • ¥20 有关区间dp的问题求解
    • ¥15 多电路系统共用电源的串扰问题
    • ¥15 slam rangenet++配置
    • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
    • ¥15 ubuntu子系统密码忘记
    • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
    • ¥15 保护模式-系统加载-段寄存器
    • ¥15 电脑桌面设定一个区域禁止鼠标操作