Xson9589 2019-02-15 11:37 采纳率: 0%
浏览 7429

springboot整合kafka,kafka已正常加载,但是consumer的listner不执行。

<?xml version="1.0" encoding="UTF-8"?>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0

org.springframework.boot
spring-boot-starter-parent
2.1.2.RELEASE
<!-- lookup parent from repository -->

com.unitdream.kafka
consumer
0.0.1-SNAPSHOT
consumer
Demo project for Spring Boot

<properties>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.54</version>
    </dependency>
    <!-- kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>


    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

package com.unitdream.kafka.consumer.kafkaconfig;

import com.unitdream.kafka.consumer.kafkaconfig.listener.KafkaConsumerListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "n1:9092,n2:9092,n3:9092,n4:9092,es1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "alarm");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);

        return factory;
    }
    @Bean
    public KafkaConsumerListener kafkaConsumerListener(){

        return new KafkaConsumerListener();
    }
}

package com.unitdream.kafka.consumer.kafkaconfig.listener;

import com.unitdream.kafka.consumer.services.MqKafkaAnalysisService;
import com.unitdream.kafka.consumer.services.MqKafkaMataService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

public class KafkaConsumerListener {
private static final Logger loger = LoggerFactory.getLogger(KafkaConsumerListener.class);
@Autowired
private MqKafkaMataService mqKafkaMataService;

@Autowired
private MqKafkaAnalysisService mqKafkaAnalysisService;


@KafkaListener(topics = {"alarm"})
public void listener(ConsumerRecord<String, String> record) {
    loger.info("kafka消费[{}-{}]-{}的数据--------------------start",record.topic(),record.partition(),record.offset());
    if (mqKafkaMataService.insertOne(record) > 0) {

        mqKafkaAnalysisService.insertOne(record);
    }
    loger.info("kafka消费[{}-{}]-{}的数据--------------------end",record.topic(),record.partition(),record.offset());
}

}

  • 写回答

1条回答 默认 最新

  • HappyWin 2019-02-15 14:16
    关注
    评论

报告相同问题?

悬赏问题

  • ¥20 双层网络上信息-疾病传播
  • ¥50 paddlepaddle pinn
  • ¥15 Stata 面板数据模型选择
  • ¥20 idea运行测试代码报错问题
  • ¥15 网络监控:网络故障告警通知
  • ¥15 django项目运行报编码错误
  • ¥15 请问这个是什么意思?
  • ¥15 STM32驱动继电器
  • ¥15 Windows server update services
  • ¥15 关于#c语言#的问题:我现在在做一个墨水屏设计,2.9英寸的小屏怎么换4.2英寸大屏
  • ¥50 大二 微机原理 课程design 有偿