1、问题:使用ActiveMQ的Topic模式,消息队列中有两条消息,消费者只消费一条消息就不再消费了?
代码如下:
生产者代码
package com.babi.common.utils;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/*
* ACtiveMQ生产者
*
*/
public class UpgradeQMSender {
// MQname
private static final String MESSAGE_QUEUE = "mopark-upgrade";
// 默认的ActiveMQ服务器端绑定的端口。
private static final int PORT = 61616;
/**
* MQ发送消息
*
* @param state
* @param deviceID
* @param versionId
*/
public static void sendupgrade(String state, String deviceID, String versionId) {
// ConnectionFactory :连接工厂,JMS 创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地
Topic topic;
// MessageProducer:消息生产者。
MessageProducer producer;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:" + PORT);
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE.booleanValue(), Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值 liwenhui 是一个服务器的queue,须在在ActiveMq的console配置
// destination = session.createQueue(MESSAGE_QUEUE);
// 创建消息
topic = session.createTopic(MESSAGE_QUEUE);
// 得到消息生成者,发送者
producer = session.createProducer(topic);
// 设置不持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 设置消息过期时间
producer.setTimeToLive(1000 * 10 * 60);
CommonLog.info(UpgradeQMSender.class, "state:" + state);
CommonLog.info(UpgradeQMSender.class, "deviceID:" + deviceID);
// 发送消息。
sendupgradeMessage(session, producer, state, deviceID, versionId);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
/**
* MQ调用方法
*
* @param session
* @param producer
* @param state
* @param deviceID
* @param versionId
*/
public static void sendupgradeMessage(Session session, MessageProducer producer, String state, String deviceID,
String versionId) {
MapMessage message = null;
try {
message = session.createMapMessage();
message.setString("state", state);
message.setString("deviceID", deviceID);
message.setString("versionId", versionId);
} catch (Exception e) {
e.printStackTrace();
}
// 发送消息到目的地方
try {
System.out.println(message);
producer.send(message);
CommonLog.info(UpgradeQMSender.class, "消息内容:" + message);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者代码:
package com.babi.lockservice;
import java.io.File;
import java.io.RandomAccessFile;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.springframework.jdbc.support.rowset.SqlRowSet;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import com.babi.bean.Device;
import com.babi.common.CommonLog;
import com.babi.common.Params;
import com.babi.common.FileUpload.FileUploadFile;
import com.babi.common.FileUpload.FileUploadServer;
import com.babi.common.FileUpload.Filespilt;
import com.babi.common.FileUpload.RemoteFile;
import com.babi.dao.JedisUtils;
import com.babi.dao.LockDAO;
import io.netty.channel.ChannelHandlerContext;
import redis.clients.jedis.Jedis;
public class UpgradeConsumer {
String LOCKKEY = null; // lockID:lockID
LockDAO lockdao = new LockDAO(); // 数据库
Device device = new Device(); // 车锁bean
JedisUtils jedisUtils = new JedisUtils(); // redis工具类
public RandomAccessFile randomAccessFile;
FileUploadFile fileUploadFile;
public void QMConsumer() throws JMSException {
Connection connection = null; // 连接
Session session = null; // 会话:接受或者发送消息的线程
MessageConsumer consumer = null; // 消息接收者
CommonLog.info(this.getClass(), "MQ消费者启动中......");
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, Params.MQ_HOST + ":" + Params.MQ_PORT);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = session.createTopic(Params.UPGRADEMQ_NAME);
consumer = session.createConsumer(topic);
} catch (Exception e) {
throw new RuntimeException("创建MQ连接时发生了错误!");
}
try {
// 注册监听器,注册后,列队的消息变化会自动触发监听器,接收消息并处理
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage mapMessage = (MapMessage) message;
if (mapMessage != null) {
message.acknowledge();
String action = mapMessage.getString("state");
String LOCKID = mapMessage.getString("deviceID");
String VersionId = mapMessage.getString("versionId");
LOCKKEY = "LOCK:" + LOCKID;
// 处理MQ发来的消息
if (LOCKID != null) {
System.out.println("来自API的下行指令:action=" + action + ", LOCKID=" + LOCKID + ", imei="
+ lockdao.findImeiByLockId(LOCKID));
CommonLog.info(this.getClass(), "来自API的下行指令:action=" + action + ", LOCKID=" + LOCKID
+ ", imei=" + lockdao.findImeiByLockId(LOCKID));
jedisUtils.setutils(LOCKKEY, "LOCKACK", "FALSE");
jedisUtils.setutils(LOCKKEY, "CMDSTATUS", " ");
ChannelHandlerContext ctx = OSMap2Node.channelMap.get(LOCKID);
if (action.equals(Integer.toString(SERVERCMD.LOCKUPGRADE.getIndex()))) {
String FileUrl = download(VersionId);// 根据VersionId查找固件下载地址
System.out.println("FileUrl" + FileUrl);
String file_name = RemoteFile.downRemoteFile(FileUrl, "firmwareVersion");// 获取下载之后文件的保存的路径
sendFileToLock(ctx, file_name);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// }
} catch (Exception e) {
throw new RuntimeException("MQ接收消息并处理时发生了错误!");
}
}
/**
* 发送文件
*
* @param ctx
* @param file_name
* @throws Exception
*/
protected void sendFileToLock(ChannelHandlerContext ctx, String file_name) throws Exception {
System.out.println("sendFileToLock" + file_name);
File file = new File("./" + file_name);
new Filespilt().split(file, ctx);
}
/**
* 根据版本Id查找下载地址
*
* @param VersionId
* @return
* @throws Exception
*/
public String download(String VersionId) throws Exception {
SqlRowSet rowSet = lockdao.findAddressByVersion(VersionId);
String url = null;
if (rowSet.next()) {
url = rowSet.getString("download_address");
}
return url;
}
}