实例化 2018-05-20 11:08 采纳率: 0%
浏览 7140
已结题

spring boot 整合kafka报错怎么解决?

整合了一下spring boot跟kafka老是一直报错

 2018-05-20 18:56:26.920 ERROR 4428 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='myTest--------1' to topic myTest:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

请问一下各位大佬怎么解决的?

相应代码如下,也是参考了网上的:

 <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.0</version>
        </dependency>

kafka版本及配置
kafka_2.11-1.1.0
配置:

 port = 9092
host.name = 阿里内网ip
advertised.host.name = xxx.xxx.xx 阿里外网ip

其他配置默认了
然后spring boot配置

 kafka: 
   bootstrap-servers: xxx.xxx.xxx.xxx:9092 
   listener.concurrency: 3 
   producer.batch-size: 1000 
   consumer.group-id: test-consumer-group 


@Configuration
@EnableKafka
public class KafkaConfiguration {

}


@Component
public class MsgProducer {

    private Logger log = LoggerFactory.getLogger(MsgProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topicName, String jsonData) {
        log.info("向kafka推送数据:[{}]", jsonData);
        try {
            kafkaTemplate.send(topicName, jsonData);
        } catch (Exception e) {
            System.out.println("发送数据出错!!!"+topicName+","+jsonData);
            System.out.println("发送数据出错=====>"+e);
            log.error("发送数据出错!!!{}{}", topicName, jsonData);
            log.error("发送数据出错=====>", e);
        }

        //消息发送的监听器,用于回调返回信息
        kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
            @Override
            public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
                System.out.println("topic-----"+topic);
            }

            @Override
            public void onError(String topic, Integer partition, String key, String value, Exception exception) {
            }

            @Override
            public boolean isInterestedInSuccess() {
                log.info("数据发送完毕");
                System.out.println("数据发送完毕");
                return false;
            }
        });
    }
}
@Component
public class MsgConsumer {

        @KafkaListener(topics = {"myTest"})
        public void processMessage(String content) {
            System.out.println("消息被消费"+content);
        }
}

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestKafka {

    @Autowired
    private MsgProducer msgProducer;

    @Test
    public void test() throws Exception {
        msgProducer.sendMessage("myTest", "myTest--------1");
    }
}

就是这样;运行test老是一直这个错误

  • 写回答

2条回答 默认 最新

  • devmiao 2018-05-20 11:46
    关注
    评论

报告相同问题?

悬赏问题

  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料