spring boot kafka
实现发布订阅 代码如下
问题为 KafkaConsumer 客户端类 的process 方法会实时收到消息处理?
发布生产类
@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerController.class);
private static final String TOPIC = "topic-test";
@Resource
private KafkaTemplate kafkaTemplate;
public KafkaProducerController(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping("/push")
public ResponseEntity<String> pushMessage(@RequestBody Message message) {
Date time = new Date();
message.setSendTime(time);
kafkaTemplate.send(TOPIC, JSON.toJSONString(message)).addCallback(success
-> LOGGER.info("{}-生产者发送消息成功:{},时间:{}", TOPIC, success, time), failure
-> LOGGER.error("{}-生产者发送消息失败:{}", failure.getMessage()));
return new ResponseEntity<>("success", HttpStatus.OK);
}
}
消费类
订阅类
@Component
public class KafkaConsumer {
@Autowired
private static KafkaTemplate kafkaTemplate;
/**
* 单个topic订阅
*/
@KafkaListener(topics = "topic-test")
public void onDeviceSubStatusMessageDevice(List<ConsumerRecord<String, ?>> listRecord) {
process(listRecord);
}
private void process(List<ConsumerRecord<String, ?>> listRecord) {
System.out.println("订阅者接收到消息");
listRecord.forEach(record -> {
System.out.println(record.key()+record.value());
});
}
//发送数据到kafka
private void sendKafka(String abcd, String efg) {
Map<String, Object> body = new HashMap<>(8);
body.put("time", System.currentTimeMillis());
body.put("abcd", abcd);
body.put("efg", efg);
kafkaTemplate.send("test-topic", JSON.toJSONString(body));
}
}