springboot项目 RabbitMQ一启动消费(接收)者进程就挂了
bug背景:
我期望实现下面的效果:
下面是我的4个文件源码(已经省略包和import,这些没有问题),分别是RabbitMqConfig ,Tut3Receiver,和Tut3Sender
/**
* @Author:xsr
* @Date:2023/10/26 14:26
* 模拟 发布/订阅 一个消息被广播到所有的消费者
*/
@Profile({"tut3","publish-subscribe"})
@Configuration
public class RabbitMqConfig {
//装载一个 exchange bean,名为
@Bean
public FanoutExchange fanout() {
return new FanoutExchange("myFanout 交换机");
}
//关于Receiver配置
@Profile("receiver")
private static class ReceiverConfig {
//装载一个Temporary queues
@Bean
public Queue autoDeleteQueue1() {
System.out.println("queue1");
return new AnonymousQueue();
}
//再装载一个Temporary queues
@Bean
public Queue autoDeleteQueue2() {
System.out.println("queue2");
return new AnonymousQueue();
}
//装载binging bean,使得exchange-queue绑定
@Bean
public Binding binding1(FanoutExchange fanout,
Queue autoDeleteQueue1) {
System.out.println("正在bind exchange和queue1");
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}
//装载binging bean,使得exchange-queue绑定
@Bean
public Binding binding2(FanoutExchange fanout,
Queue autoDeleteQueue2) {
System.out.println("正在bind exchange和queue2");
return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
}
@Bean
public Tut3Receiver receiver() {
return new Tut3Receiver();
}
}
//根据profile 装载一个 sender bean
@Profile("sender")
@Bean
public Tut3Sender sender() {
return new Tut3Sender();
}
}
Tut3Receiver.java
/**
* @Author:xsr
* @Date:2023/10/26 14:36
* tut3 消费者 模拟一个消费者群,分别监听autoDeleteQueue1和autoDeleteQueue2
*/
public class Tut3Receiver {
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive1(String in) throws InterruptedException {
System.out.println("receive1");
receive(in, 1);
}
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) throws InterruptedException {
System.out.println("receive2");
receive(in, 2);
}
@RabbitHandler
public void receive(String message,int instance) throws InterruptedException {
// 对执行任务进行记时间
StopWatch watch = new StopWatch();
watch.start();
//执行任务
doWork(message);
watch.stop();
double totalTime = watch.getTotalTimeSeconds();
System.out.println("receive " + instance +
" [x] Done in " + totalTime + "s message is "+message);
}
private void doWork(String task) throws InterruptedException {
// 每个点将占一秒钟的“工作”
for (char c:task.toCharArray()){
if(c=='.'){
Thread.sleep(1000);
}
}
}
}
Tut3Sender.java
/**
* @Author:xsr
* @Date:2023/10/26 14:34
* tut3 发布者
*/
public class Tut3Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanoutExchange;
AtomicInteger dots = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!";
// 用RabbitTemplate指定发送的queue和message
StringBuilder builder = new StringBuilder(message);
if(dots.incrementAndGet()==4){
dots.set(1);
}
for (int i = 0; i < dots.get(); i++) {
builder.append('.');
}
builder.append(count.incrementAndGet());
String appendMessage = builder.toString();
this.template.convertAndSend(fanoutExchange.getName(), appendMessage);
System.out.println("Sender 发送了一条message:"+appendMessage);
}
}
在启动项目时,先运行receiver,然后运行sender,(开启2个springboot实例)
profiles:
active: tut3,publish-subscribe,receiver//先
profiles:
active: tut3,publish-subscribe,sender//后
我已经尝试了在javaSE下模拟上述例子,已经成功运行并没有报错