养乐多滋滋滋 2024-05-17 08:07 采纳率: 79.2%
浏览 2
已结题

安卓实现rabbitmq接收功能不能正常实现

最近在用安卓实现rabbitmq,该配置的都配置了,为什么不能实现发收功能呢?以下是代码

package com.jl.mq;



import androidx.appcompat.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class MainActivity extends AppCompatActivity {

    private String userName = "guest" ;
    private String passWord = "guest" ;
    private String virtualHost = "/" ;
    private String hostName = " 192.168.10.207" ;
    private int portNum = 5672 ;
    private String queueName = "host_query" ;
    private String exchangeName = "text_host " ;
    private String rountingKey = "*.info" ;

    ConnectionFactory factory = new ConnectionFactory() ;



    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        setupConnectionFactory();
        new Thread(new Runnable() {
            @Override
            public void run() {
                basicPublish();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                basicConsume();
            }
        }).start();

    }

    /**
     * Rabbit配置
     */
    private void setupConnectionFactory(){

        factory.setUsername(userName);
        factory.setPassword(passWord);
        factory.setHost(hostName);
        factory.setPort(portNum);

    }

    /**
     * 发消息
     */
    private void basicPublish(){

        try {
            //连接
            Connection connection = factory.newConnection() ;
            //通道
            Channel channel = connection.createChannel() ;
            //声明了一个交换和一个服务器命名的队列,然后将它们绑定在一起。
            channel.exchangeDeclare(exchangeName , "fanout" , true) ;
            String queueName = channel.queueDeclare().getQueue() ;
            channel.queueBind(queueName , exchangeName , rountingKey) ;
            //消息发布
            byte[] msg = "hello word!".getBytes() ;
            channel.basicPublish(exchangeName , rountingKey  ,null , msg);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }

    /**
     * 收消息
     */
    private void basicConsume(){

        try {
            //连接
            Connection connection = factory.newConnection() ;
            //通道
            final Channel channel = connection.createChannel() ;
            //实现Consumer的最简单方法是将便捷类DefaultConsumer子类化。可以在basicConsume 调用上传递此子类的对象以设置订阅:
            channel.basicConsume(queueName , false , "administrator" , new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);

                    String rountingKey = envelope.getRoutingKey() ;
                    String contentType = properties.getContentType() ;
                    String msg = new String(body) ;
                    long deliveryTag = envelope.getDeliveryTag() ;

                    Log.e("TAG" , rountingKey+":rountingKey") ;
                    Log.e("TAG" , contentType+":contentType") ;
                    Log.e("TAG" , msg+":msg") ;
                    Log.e("TAG" , deliveryTag+":deliveryTag") ;

                    channel.basicAck(deliveryTag , false);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }






}

  • 写回答

2条回答 默认 最新

  • 阿里嘎多学长 2024-05-17 08:07
    关注

    以下内容由CHATGPT及阿里嘎多学长共同生成、有用望采纳:


    您想要解决的问题

    您提问的目的是解决在安卓平台上实现RabbitMQ消息接收功能时遇到的问题,即尽管已经进行了相关配置,但消息的发送和接收功能无法正常工作。

    问题出现原因

    1. 网络配置问题:您提供的主机名hostName有一个多余的空格,这可能导致连接失败。
    2. 资源竞争:在basicPublishbasicConsume方法中,您分别创建了两个独立的连接和通道,这可能导致资源竞争或冲突。
    3. 线程管理:您使用了两个新线程来处理发布和消费消息,但没有对线程的生命周期进行管理,这可能导致应用程序在后台运行时出现不稳定。
    4. 异常处理:您的代码中使用了e.printStackTrace()来处理异常,这在生产环境中不是一个好的做法,因为它不会提供足够的错误信息。
    5. 消息确认:在消费消息时,您使用了channel.basicAck(deliveryTag, false);,这表明您希望手动确认消息,但您可能需要确保消息被正确处理后才进行确认。

    问题解决方案

    1. 修正主机名:移除hostName中的多余空格。
    2. 使用单个连接和通道:考虑在应用程序中使用单个连接和通道,以避免潜在的资源竞争。
    3. 改进异常处理:使用日志记录异常信息,而不是仅仅打印堆栈跟踪。
    4. 消息确认:确保消息在被正确处理后才进行确认,避免消息丢失。

    提供代码

    以下是修改后的代码示例,它包括了上述建议的修正:

    package com.jl.mq;
    
    // ... 其他导入 ...
    
    public class MainActivity extends AppCompatActivity {
    
        // ... 其他成员变量 ...
    
        private Connection connection; // 将连接提升为成员变量
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
    
            setupConnectionFactory();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        connection = factory.newConnection(); // 创建连接
                        basicPublish();
                        basicConsume();
                    } catch (IOException | TimeoutException e) {
                        Log.e("RabbitMQ", "Error during connection or operations", e);
                    }
                }
            }).start();
        }
    
        // ... 其他方法 ...
    
        /**
         * 修正后的接收消息方法
         */
        private void basicConsume() throws IOException {
            Channel channel = connection.createChannel();
            String queueName = channel.queueDeclare().getQueue();
            boolean autoAck = false; // 手动确认
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                Log.i("RabbitMQ", "Received: " + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
            channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> { });
        }
    
        // ... 其他方法 ...
    }
    

    代码运行方式

    • 确保您的安卓设备或模拟器可以连接到RabbitMQ服务器。
    • 将上述代码集成到您的安卓项目中。
    • 运行您的应用程序。

    代码预期运行结果

    如果一切配置正确,您应该能够在控制台或日志中看到接收到的消息。

    推荐相关链接

    请注意,由于安卓平台的限制,某些操作可能需要额外的配置,如权限请求等。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 6月1日
  • 已采纳回答 5月24日
  • 修改了问题 5月17日
  • 创建了问题 5月17日