dongzhan0624 2017-02-01 23:43
浏览 430

如何检测已死的RabbitMQ连接

I have a RabbitMQ consumer script in Go (this is a simple script from RabbitMQ tutorial that uses streadway/amqp library)

Te problem is that if the rabbitmq-server is stopped the consumer script does not exit and when rabbitmq-server is started again the consumer does not receive messages anymore.

Is there a way to detect that the consumer connection is dead and reconnect or at least terminate the consumer script?

I know that it sets default 10 sec. heartbeat interval for the connection, is it possible to use it someway?

Thanks for any help

    func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
            "test_task_queue", // name
            true,         // durable
            false,        // delete when unused
            false,        // exclusive
            false,        // no-wait
            nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
        )
        failOnError(err, "Failed to set QoS")

        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                d.Ack(false)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                log.Printf("Done")
            }
        }()

        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
    }
  • 写回答

1条回答 默认 最新

  • droe9376 2017-02-02 01:23
    关注

    amqp.Connection has method NotifyClose() which return channel signalling a transport or protocol error. So something like

    for {  //reconnection loop
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") //setup
        notify := conn.NotifyClose(make(chan *amqp.Error)) //error channel
    ...
        ch, err := conn.Channel()
        msgs, err := ch.Consume(
    ...
        for{  //receive loop
            select {  //check connection
                case err = <-notify:
                //work with error
                break //reconnect
            case d = <- msgs:
                //work with message
            ...
            }
        }
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 Vue3 大型图片数据拖动排序
  • ¥15 划分vlan后不通了
  • ¥15 GDI处理通道视频时总是带有白色锯齿
  • ¥20 用雷电模拟器安装百达屋apk一直闪退
  • ¥15 算能科技20240506咨询(拒绝大模型回答)
  • ¥15 自适应 AR 模型 参数估计Matlab程序
  • ¥100 角动量包络面如何用MATLAB绘制
  • ¥15 merge函数占用内存过大
  • ¥15 使用EMD去噪处理RML2016数据集时候的原理
  • ¥15 神经网络预测均方误差很小 但是图像上看着差别太大