weixin_45135483 2020-05-08 14:47 采纳率: 0%
浏览 617

使用ActiveMQ的Topic模式,生产者生产了两条消息,队列中有两条消息,消费者只消费一条就不再消费了

1、问题:使用ActiveMQ的Topic模式,消息队列中有两条消息,消费者只消费一条消息就不再消费了?
代码如下:
生产者代码

package com.babi.common.utils;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/*
 * ACtiveMQ生产者
 * 
 */
public class UpgradeQMSender {
    // MQname
    private static final String MESSAGE_QUEUE = "mopark-upgrade";

    // 默认的ActiveMQ服务器端绑定的端口。
    private static final int PORT = 61616;


    /**
     * MQ发送消息
     * 
     * @param state
     * @param deviceID
     * @param versionId
     */
    public static void sendupgrade(String state, String deviceID, String versionId) {

        // ConnectionFactory :连接工厂,JMS 创建连接
        ConnectionFactory connectionFactory;

        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;

        // Session: 一个发送或接收消息的线程
        Session session;

        // Destination :消息的目的地
        Topic topic;

        // MessageProducer:消息生产者。
        MessageProducer producer;

        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:" + PORT);

        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();

            // 启动
            connection.start();

            // 获取操作连接
            session = connection.createSession(Boolean.TRUE.booleanValue(), Session.AUTO_ACKNOWLEDGE);

            // 获取session注意参数值 liwenhui 是一个服务器的queue,须在在ActiveMq的console配置
            // destination = session.createQueue(MESSAGE_QUEUE);

            // 创建消息 
            topic = session.createTopic(MESSAGE_QUEUE);

            // 得到消息生成者,发送者
            producer = session.createProducer(topic);

            // 设置不持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            // 设置消息过期时间
            producer.setTimeToLive(1000 * 10 * 60);

            CommonLog.info(UpgradeQMSender.class, "state:" + state);
            CommonLog.info(UpgradeQMSender.class, "deviceID:" + deviceID);

            // 发送消息。
            sendupgradeMessage(session, producer, state, deviceID, versionId);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    /**
     * MQ调用方法
     * 
     * @param session
     * @param producer
     * @param state
     * @param deviceID
     * @param versionId
     */
    public static void sendupgradeMessage(Session session, MessageProducer producer, String state, String deviceID,
            String versionId) {
        MapMessage message = null;
        try {
            message = session.createMapMessage();
            message.setString("state", state);
            message.setString("deviceID", deviceID);
            message.setString("versionId", versionId);
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 发送消息到目的地方
        try {
            System.out.println(message);
            producer.send(message);
            CommonLog.info(UpgradeQMSender.class, "消息内容:" + message);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者代码:

package com.babi.lockservice;

import java.io.File;
import java.io.RandomAccessFile;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.springframework.jdbc.support.rowset.SqlRowSet;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

import com.babi.bean.Device;
import com.babi.common.CommonLog;
import com.babi.common.Params;
import com.babi.common.FileUpload.FileUploadFile;
import com.babi.common.FileUpload.FileUploadServer;
import com.babi.common.FileUpload.Filespilt;
import com.babi.common.FileUpload.RemoteFile;
import com.babi.dao.JedisUtils;
import com.babi.dao.LockDAO;

import io.netty.channel.ChannelHandlerContext;
import redis.clients.jedis.Jedis;

public class UpgradeConsumer {
    String LOCKKEY = null; // lockID:lockID
    LockDAO lockdao = new LockDAO(); // 数据库
    Device device = new Device(); // 车锁bean
    JedisUtils jedisUtils = new JedisUtils(); // redis工具类
    public RandomAccessFile randomAccessFile;
    FileUploadFile fileUploadFile;

    public void QMConsumer() throws JMSException {
        Connection connection = null; // 连接
        Session session = null; // 会话:接受或者发送消息的线程
        MessageConsumer consumer = null; // 消息接收者

        CommonLog.info(this.getClass(), "MQ消费者启动中......");
        try {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, Params.MQ_HOST + ":" + Params.MQ_PORT);
            connection = connectionFactory.createConnection();

            connection.start();
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Topic topic = session.createTopic(Params.UPGRADEMQ_NAME);
            consumer = session.createConsumer(topic);
        } catch (Exception e) {
            throw new RuntimeException("创建MQ连接时发生了错误!");
        }
        try {
            // 注册监听器,注册后,列队的消息变化会自动触发监听器,接收消息并处理
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        try {
                            MapMessage mapMessage = (MapMessage) message;
                            if (mapMessage != null) {
                                message.acknowledge();
                                String action = mapMessage.getString("state");
                                String LOCKID = mapMessage.getString("deviceID");
                                String VersionId = mapMessage.getString("versionId");
                                LOCKKEY = "LOCK:" + LOCKID;
                                // 处理MQ发来的消息
                                if (LOCKID != null) {
                                    System.out.println("来自API的下行指令:action=" + action + ", LOCKID=" + LOCKID + ", imei="
                                            + lockdao.findImeiByLockId(LOCKID));
                                    CommonLog.info(this.getClass(), "来自API的下行指令:action=" + action + ", LOCKID=" + LOCKID
                                            + ", imei=" + lockdao.findImeiByLockId(LOCKID));
                                    jedisUtils.setutils(LOCKKEY, "LOCKACK", "FALSE");
                                    jedisUtils.setutils(LOCKKEY, "CMDSTATUS", " ");
                                    ChannelHandlerContext ctx = OSMap2Node.channelMap.get(LOCKID);

                                    if (action.equals(Integer.toString(SERVERCMD.LOCKUPGRADE.getIndex()))) {
                                        String FileUrl = download(VersionId);// 根据VersionId查找固件下载地址
                                        System.out.println("FileUrl" + FileUrl);
                                        String file_name = RemoteFile.downRemoteFile(FileUrl, "firmwareVersion");// 获取下载之后文件的保存的路径
                                        sendFileToLock(ctx, file_name);
                                    }
                                }

                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

                    }
                });
//          }
        } catch (Exception e) {
            throw new RuntimeException("MQ接收消息并处理时发生了错误!");
        }

    }

    /**
     * 发送文件
     * 
     * @param ctx
     * @param file_name
     * @throws Exception
     */
    protected void sendFileToLock(ChannelHandlerContext ctx, String file_name) throws Exception {
        System.out.println("sendFileToLock" + file_name);
        File file = new File("./" + file_name);
        new Filespilt().split(file, ctx);

    }

    /**
     * 根据版本Id查找下载地址
     * 
     * @param VersionId
     * @return
     * @throws Exception
     */
    public String download(String VersionId) throws Exception {
        SqlRowSet rowSet = lockdao.findAddressByVersion(VersionId);
        String url = null;
        if (rowSet.next()) {
            url = rowSet.getString("download_address");
        }
        return url;
    }
}
  • 写回答

1条回答 默认 最新

  • dabocaiqq 2020-09-25 17:55
    关注
    评论

报告相同问题?

悬赏问题

  • ¥15 有赏,i卡绘世画不出
  • ¥15 如何用stata画出文献中常见的安慰剂检验图
  • ¥15 c语言链表结构体数据插入
  • ¥40 使用MATLAB解答线性代数问题
  • ¥15 COCOS的问题COCOS的问题
  • ¥15 FPGA-SRIO初始化失败
  • ¥15 MapReduce实现倒排索引失败
  • ¥15 ZABBIX6.0L连接数据库报错,如何解决?(操作系统-centos)
  • ¥15 找一位技术过硬的游戏pj程序员
  • ¥15 matlab生成电测深三层曲线模型代码