看不懂源码,但是打断点发现好像是ack方法执行了两次,不知道为什么会执行两次ack方法
这是配置代码
#rabbit配置
spring:
rabbitmq:
# rabbit地址
host: 127.0.0.1
# rabbit端口号
port: 5672
# 用户账号和密码
username: guest
password: guest
#rabbit项目名,每个virtualHost的队列是隔离的,相当于数据库
virtual-host: /rabbit
#开启Publisher Confirms 模式,消息发送到交换器后触发回调。
#publisher-confirms: true
#开启PublisherReturn 模式,交换机将消息发送到对应队列失败时触发
#publisher-returns: true
listener:
direct:
#设置监听为手动答应模式
acknowledge-mode: manual
这是绑定交换机代码
@Configuration
public class RabbitConfig {
/**
* 将队列绑定到交换机中,并绑定路由
* @return
*/
@Bean
public Binding bindingExchange() {
return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitEnum.RABBIT.getRoutingKey());
}
/**
* 创建队列
* @return
*/
@Bean
public Queue queue() {
return new Queue(RabbitEnum.RABBIT.getQueue(),true);
}
/**
* 创建交换机
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(RabbitEnum.RABBIT.getDirectExchange(), true, false);
}
}
这是生产者发布消息和消费者消费消息的代码
@Service
public class PushService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 推送方法
*/
public void push(){
System.out.println("开始推送");
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend(RabbitEnum.RABBIT.getDirectExchange(), RabbitEnum.RABBIT.getRoutingKey(),"helloWord", new CorrelationData("1"));
System.out.println("结束推送");
}
/**
* 推送队列的监听
* 注解中为监听的队列
* @param
* @param channel
* @param tag
* @throws IOException
*/
@RabbitListener(queues = "rabbit.direct.queue.push")
public void processDirect(String str, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println("收到消息");
System.out.println(str);
channel.basicAck(tag,true);
}
}
这是消费完成后报的异常
ERROR 12656 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)