activemq点对点:
消费者:
public class AMQSend{
public static void send(String filePath, String newFilename, String contentType, String queueName) throws Exception{
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
//发送消息的线程
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
//设置数据持久化
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//将文件内容复制到StringBuffer里面
StringBuffer sb = new StringBuffer();
Reader reader = null;
BufferedReader br = null;
try {
reader = new FileReader(filePath);
br = new BufferedReader(reader);
String data = null;
while ((data = br.readLine()) != null) {
sb.append(data);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(reader != null) {
try {
reader.close();
}catch (Exception e){
e.printStackTrace();
}
if(br != null) {
try {
br.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
TextMessage message;
message = session.createTextMessage(sb.toString());
producer.send(message);
producer.close();
session.close();
connection.close();
}catch(Exception e){
System.out.println("上传异常:\r\n" + e.getMessage());
}
}
}
消费者:
public class AMQReci{
public static void Reci(String queueName) throws Exception {
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
// 目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try{
if(message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
String text = tm.getText();
System.out.println(text);
}
}catch(JMSException e){
e.printStackTrace();
}
}
});
consumer.close();
session.close();
connection.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
测试代码:
import java.io.File;
public class AMQTest {
@SuppressWarnings("static-access")
public static void main(String[] args) throws Exception {
String fileName = "a";
String contentType = "txt";
String queueName = "queue.test1";
AMQSend amqSend = new AMQSend();
AMQReci amqReci = new AMQReci();
File file = new File("E:\test");
File[] tempList = file.listFiles();
System.out.println("该目录下对象个数." + tempList.length);
try{
for (int i = 0; i < tempList.length; i++) {
if (tempList[i].isFile()) {
amqSend.send(tempList[i].getAbsolutePath(), fileName, contentType, queueName);
amqReci.Reci(queueName);
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}
测试结果:(文件夹下有两个txt文件)
该目录下对象个数.2
This is t2
This is t1
This is t2
This is t1
This is t2
This is t1
每次运行的消费者有可能接收不到消息,然后下次运行会把上菜在消息队列的内容
取出来,但是有可能会乱序。
我要怎么样才能保证每次调用都能够发送和稳定可靠接收?
PS:可能问题太长了,希望有大神能帮忙解决,任务很急。