springboot整合kafka,kafka已正常加载,但是consumer的listner不执行。

<?xml version="1.0" encoding="UTF-8"?>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0

org.springframework.boot
spring-boot-starter-parent
2.1.2.RELEASE
<!-- lookup parent from repository -->

com.unitdream.kafka
consumer
0.0.1-SNAPSHOT
consumer
Demo project for Spring Boot

<properties>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.54</version>
    </dependency>
    <!-- kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>


    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

package com.unitdream.kafka.consumer.kafkaconfig;

import com.unitdream.kafka.consumer.kafkaconfig.listener.KafkaConsumerListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "n1:9092,n2:9092,n3:9092,n4:9092,es1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "alarm");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);

        return factory;
    }
    @Bean
    public KafkaConsumerListener kafkaConsumerListener(){

        return new KafkaConsumerListener();
    }
}

package com.unitdream.kafka.consumer.kafkaconfig.listener;

import com.unitdream.kafka.consumer.services.MqKafkaAnalysisService;
import com.unitdream.kafka.consumer.services.MqKafkaMataService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

public class KafkaConsumerListener {
private static final Logger loger = LoggerFactory.getLogger(KafkaConsumerListener.class);
@Autowired
private MqKafkaMataService mqKafkaMataService;

@Autowired
private MqKafkaAnalysisService mqKafkaAnalysisService;


@KafkaListener(topics = {"alarm"})
public void listener(ConsumerRecord<String, String> record) {
    loger.info("kafka消费[{}-{}]-{}的数据--------------------start",record.topic(),record.partition(),record.offset());
    if (mqKafkaMataService.insertOne(record) > 0) {

        mqKafkaAnalysisService.insertOne(record);
    }
    loger.info("kafka消费[{}-{}]-{}的数据--------------------end",record.topic(),record.partition(),record.offset());
}

}

Xson9589
Xson9589 https://blog.csdn.net/russle/article/details/81258590问题已解决,参见这个链接的帖子
8 个月之前 回复

1个回答

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!