整合了一下spring boot跟kafka老是一直报错
2018-05-20 18:56:26.920 ERROR 4428 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='myTest--------1' to topic myTest:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
请问一下各位大佬怎么解决的?
相应代码如下,也是参考了网上的:
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
kafka版本及配置
kafka_2.11-1.1.0
配置:
port = 9092
host.name = 阿里内网ip
advertised.host.name = xxx.xxx.xx 阿里外网ip
其他配置默认了
然后spring boot配置
kafka:
bootstrap-servers: xxx.xxx.xxx.xxx:9092
listener.concurrency: 3
producer.batch-size: 1000
consumer.group-id: test-consumer-group
@Configuration
@EnableKafka
public class KafkaConfiguration {
}
@Component
public class MsgProducer {
private Logger log = LoggerFactory.getLogger(MsgProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topicName, String jsonData) {
log.info("向kafka推送数据:[{}]", jsonData);
try {
kafkaTemplate.send(topicName, jsonData);
} catch (Exception e) {
System.out.println("发送数据出错!!!"+topicName+","+jsonData);
System.out.println("发送数据出错=====>"+e);
log.error("发送数据出错!!!{}{}", topicName, jsonData);
log.error("发送数据出错=====>", e);
}
//消息发送的监听器,用于回调返回信息
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
System.out.println("topic-----"+topic);
}
@Override
public void onError(String topic, Integer partition, String key, String value, Exception exception) {
}
@Override
public boolean isInterestedInSuccess() {
log.info("数据发送完毕");
System.out.println("数据发送完毕");
return false;
}
});
}
}
@Component
public class MsgConsumer {
@KafkaListener(topics = {"myTest"})
public void processMessage(String content) {
System.out.println("消息被消费"+content);
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestKafka {
@Autowired
private MsgProducer msgProducer;
@Test
public void test() throws Exception {
msgProducer.sendMessage("myTest", "myTest--------1");
}
}
就是这样;运行test老是一直这个错误