养乐多滋滋滋 2024-05-16 14:00 采纳率: 79.2%
浏览 3
已结题

安卓实现rabbitmq接收功能不能正常实现

最近在用安卓实现rabbitmq,该配置的都配置了,为什么不能实现发收功能呢?以下是代码

package com.jl.mq;



import androidx.appcompat.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class MainActivity extends AppCompatActivity {

    private String userName = "guest" ;
    private String passWord = "guest" ;
    private String virtualHost = "/" ;
    private String hostName = " 192.168.10.207" ;
    private int portNum = 5672 ;
    private String queueName = "host_query" ;
    private String exchangeName = "text_host " ;
    private String rountingKey = "*.info" ;

    ConnectionFactory factory = new ConnectionFactory() ;



    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        setupConnectionFactory();
        new Thread(new Runnable() {
            @Override
            public void run() {
                basicPublish();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                basicConsume();
            }
        }).start();

    }

    /**
     * Rabbit配置
     */
    private void setupConnectionFactory(){

        factory.setUsername(userName);
        factory.setPassword(passWord);
        factory.setHost(hostName);
        factory.setPort(portNum);

    }

    /**
     * 发消息
     */
    private void basicPublish(){

        try {
            //连接
            Connection connection = factory.newConnection() ;
            //通道
            Channel channel = connection.createChannel() ;
            //声明了一个交换和一个服务器命名的队列,然后将它们绑定在一起。
            channel.exchangeDeclare(exchangeName , "fanout" , true) ;
            String queueName = channel.queueDeclare().getQueue() ;
            channel.queueBind(queueName , exchangeName , rountingKey) ;
            //消息发布
            byte[] msg = "hello word!".getBytes() ;
            channel.basicPublish(exchangeName , rountingKey  ,null , msg);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }

    /**
     * 收消息
     */
    private void basicConsume(){

        try {
            //连接
            Connection connection = factory.newConnection() ;
            //通道
            final Channel channel = connection.createChannel() ;
            //实现Consumer的最简单方法是将便捷类DefaultConsumer子类化。可以在basicConsume 调用上传递此子类的对象以设置订阅:
            channel.basicConsume(queueName , false , "administrator" , new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);

                    String rountingKey = envelope.getRoutingKey() ;
                    String contentType = properties.getContentType() ;
                    String msg = new String(body) ;
                    long deliveryTag = envelope.getDeliveryTag() ;

                    Log.e("TAG" , rountingKey+":rountingKey") ;
                    Log.e("TAG" , contentType+":contentType") ;
                    Log.e("TAG" , msg+":msg") ;
                    Log.e("TAG" , deliveryTag+":deliveryTag") ;

                    channel.basicAck(deliveryTag , false);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }






}

  • 写回答

2条回答 默认 最新

  • 阿里嘎多学长 2024-05-16 14:01
    关注

    以下内容由CHATGPT及阿里嘎多学长共同生成、有用望采纳:


    问题概括

    您正在尝试在安卓应用中实现与RabbitMQ的交互,包括消息的发布和接收功能。您已经配置了必要的设置,但遇到了无法正常实现消息收发的问题。

    问题出现原因

    1. 网络配置hostName变量中包含一个空格,这可能导致连接失败。
    2. 资源管理:在basicPublishbasicConsume方法中,创建的ConnectionChannel对象没有被正确关闭,这可能导致资源泄露。
    3. 线程管理:您在onCreate方法中启动了两个新线程,但没有对线程的生命周期进行管理,这可能导致应用退出时线程仍然在运行。
    4. 错误处理:您的代码中使用了e.printStackTrace()来处理异常,这在安卓开发中通常不足以解决问题,因为它们不会在界面上显示。

    问题解决方案

    1. 修正网络配置:确保hostName变量没有前导或尾随空格。
    2. 管理资源:使用try-with-resources语句来自动关闭ConnectionChannel对象。
    3. 线程管理:确保线程在应用退出时能够正确结束。
    4. 改进错误处理:使用日志记录异常信息,以便更好地调试。

    提供代码

    以下是修改后的代码示例:

    // ...省略其他部分
    
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
    
        setupConnectionFactory();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    basicPublish();
                } catch (IOException | TimeoutException e) {
                    Log.e("RabbitMQ", "Publish error", e);
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    basicConsume();
                } catch (IOException | TimeoutException e) {
                    Log.e("RabbitMQ", "Consume error", e);
                }
            }
        }).start();
    }
    
    // ...省略其他部分
    
    private void basicPublish() {
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            // ...省略其他部分
            channel.basicPublish(exchangeName, rountingKey, null, msg);
        } catch (IOException | TimeoutException e) {
            Log.e("RabbitMQ", "Publish error", e);
        }
    }
    
    private void basicConsume() {
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            // ...省略其他部分
            channel.basicConsume(queueName, false, "administrator", new DefaultConsumer(channel) {
                // ...省略其他部分
            });
        } catch (IOException | TimeoutException e) {
            Log.e("RabbitMQ", "Consume error", e);
        }
    }
    
    // ...省略其他部分
    

    代码运行方式

    • 确保您的安卓设备或模拟器已连接到可以访问RabbitMQ服务器的网络。
    • 将上述代码集成到您的安卓项目中,并在MainActivity中运行。

    代码预期运行结果

    如果代码正确运行,您应该能够在日志中看到接收到的消息。

    推荐相关链接

    请注意,由于安卓应用通常不用于长时间运行的服务,因此在安卓应用中使用RabbitMQ可能不是最佳实践。您可能需要考虑使用服务端组件来处理长时间运行的RabbitMQ交互。

    评论 编辑记录

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 5月17日
  • 创建了问题 5月16日