Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
Map<String,Object> argsx= new HashMap<String,Object>();
argsx.put("x-max-priority",100);
//声明队列
channel.queueDeclare(QUEUE_2019, true, false, false,argsx);
//交换机声明
channel.exchangeDeclare(Exchange_2019, "direct");
//交换机和队列绑定
channel.queueBind(QUEUE_2019,Exchange_2019,"key2019");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {
try {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
@Override
public void handleCancel(String consumerTag) throws IOException {
// consumer has been cancelled unexpectedly
System.out.println("handleCancel");
}
@Override
public void handleCancelOk(String consumerTag) {
System.out.println("收到来自消息中间件代理的basic.cancel-ok回复,consumerTag=" + consumerTag);
}
};
String ctag=channel.basicConsume(QUEUE_2019,false,consumer);
System.out.println("绑定消费者完毕,输入任何字符串终止消费者"+ctag);
System.in.read();
channel.basicCancel(ctag);
System.out.println("输入任何字符串开启消费者");
System.in.read();
channel.basicConsume(QUEUE_2019,false,consumer);