DA__AD 2021-10-13 14:06 采纳率: 100%
浏览 953
已结题

RabbitMQ怎样实现保持监听并且每次只消费一条消息

建立连接时,使用using 每次连接完毕即释放,这样导致了一个问题,除非是进程正在监听中,否则生产出来的消息,消费者接收不到
如果不使用using,保持channel活跃,这样可以将消息队列里面的消息挨个取出来,但是无法使用消息的手动确认
如果在连接服务器处也不使用using,保持连接活跃,这样会保持监听,但是会将所有的消息都取出来

以下代码为片段
//using (var connection = factory.CreateConnection())//连接服务器,即正在创建终结点。
//using (var channel = connection.CreateModel())
var connection = factory.CreateConnection();
                var channel = connection.CreateModel();

                channel.QueueDeclare(queue: config.QUEENNAME, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.ExchangeDeclare(exchange: config.EXCHANGENAME, type: "topic", durable: true);

                        channel.QueueBind(queue: config.QUEENNAME, exchange: config.EXCHANGENAME, routingKey: config.ROUNTINGKEY);

                        var consumer = new EventingBasicConsumer(channel);//消费者

                        Utility.WriteLog("连接至消费者");
                        //确认消息,noAck: false,手动确认
                        //channel.BasicConsume(queue: config.QUEENNAME, noAck: false, consumer: consumer);
                        Utility.WriteLog("next1");
                        //while (true)
                        //{
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body.ToArray();
                            var message = Encoding.UTF8.GetString(body);
                            
                            var routingKey = ea.RoutingKey;
                            Utility.WriteLog("W-messagr" + message);
                            //消息确认
                            try
                            {
                                Utility.WriteLog("W-BasicAck" + System.DateTime.Now.ToString());
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                                Utility.WriteLog("W-BasicAck-end" + System.DateTime.Now.ToString());
                            }
                            catch(Exception ex)
                            {
                                Utility.WriteLog("确认失败" + ex);
                            }
                            //channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            Utility.WriteLog("返回ACK");
                        };

                        //}

                        //}
                        Utility.WriteLog("next2");
                        channel.BasicConsume(queue: config.QUEENNAME, noAck: false, consumer: consumer);
  • 写回答

1条回答 默认 最新

  • 凌风游 2021-10-13 17:47
    关注

    不需要using释放掉连接,消费端本来就是需要保持长久连接的。消息是可以手动确认的,每处理一条就手动确认一条,确认后队列就会移除该条消息。
    看我的代码示例

    //队列消费者
    public class ReceiveMq
        {
            private ConnectionFactory factory;
            public ReceiveMq() {
                factory = new ConnectionFactory() { HostName = "localhost" };
            }
    
            public void receive(Action<string> action) {
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
    
                channel.QueueDeclare(queue: "helloaa",
                                             durable: false,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: null);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) => {
                    var body = e.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);             
                    action(message);
                    channel.BasicAck(e.DeliveryTag, false); //确认消息
                };
             //channel.BasicConsume(queue: "helloaa", autoAck: true, consumer: consumer); //自动确认消息
            } 
        }
    
    //调用
    ReceiveMq receiveMq = new ReceiveMq();
    receiveMq.receive(msg => {
          Console.WriteLine(" [x] Received {0}", msg);
    });
    

    有帮助麻烦点个采纳【本回答右上角】,谢谢~~有其他问题可以继续交流~

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 10月21日
  • 已采纳回答 10月13日
  • 创建了问题 10月13日

悬赏问题

  • ¥20 需要步骤截图(标签-服务器|关键词-map)
  • ¥50 gki vendor hook
  • ¥15 centos7中sudo命令无法使用
  • ¥15 灰狼算法和蚁群算法如何结合
  • ¥15 这是一个利用ESP32自带按键和LED控制的录像代码,编译过程出现问题,请解决并且指出错误,指导如何处理 ,协助完成代码并上传代码
  • ¥20 stm32f103,hal库 hal_usart_receive函数接收不到数据。
  • ¥20 求结果和代码,sas利用OPTEX程序和D-efficiency生成正交集
  • ¥50 adb连接不到手机是怎么回事?
  • ¥20 抓取数据时发生错误: get_mooncake_data() missing 1 required positional argument: 'driver'的问题,怎么改出正确的爬虫代码?
  • ¥15 vs2022无法联网