package com.example.rocketmq.comg;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
//顺序消息
public class Comsumer4 {
public static void main(String[] args) throws MQClientException {
//创建消息组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Synccomsumer");
//连接
consumer.setNamesrvAddr("192.168.88.130:9876");
//拿到Simple,再过滤
consumer.subscribe("Simple","*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i=1;i<list.size();i++){
System.out.println("消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
package com.example.rocketmq.comg;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
//顺序消息public class OrderProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//分组
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("192.168.88.130:9876");
producer.start();
for (int i=0;i<5;i++){
for (int j=0;j<10;j++){
//设置消息,有序发送
Message message = new Message("Order", "TagA", ("order_" + i+"_step_"+j).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
int index = id % list.size();
return list.get(index);
}
}, i);
System.out.println("信息发送成功_"+sendResult);
}
}
producer.shutdown();
}
}