Java极客技术 2016-09-29 06:44 采纳率: 0%
浏览 9690
已结题

启动kafka 消费者有时获取不到消息

kafka消费者启动的时候有时候不能获取到消息,但是重启后就可以了,有时候还要重启好多次。。。不知道是为什么,希望大神能指导一下。

[ INFO ] [2016-09-29 14:34:53] org.hibernate.validator.internal.util.Version [30] - HV000001: Hibernate Validator 5.2.4.Final
[ INFO ] [2016-09-29 14:34:53] com.coocaa.salad.stat.ApplicationMain [48] - Starting ApplicationMain on zhuxiang with PID 1740 (D:\IdeaProjects\green-salad\adx-stat\target\classes started by zhuxiang in D:\IdeaProjects\green-salad)
[ INFO ] [2016-09-29 14:34:53] com.coocaa.salad.stat.ApplicationMain [663] - The following profiles are active: dev
[ INFO ] [2016-09-29 14:34:54] org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext [581] - Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5754de72: startup date [Thu Sep 29 14:34:54 CST 2016]; root of context hierarchy
2016-09-29 14:34:55 JRebel: Monitoring Spring bean definitions in 'D:\IdeaProjects\green-salad\adx-stat\target\classes\spring-integration-consumer.xml'.
[ INFO ] [2016-09-29 14:34:55] org.springframework.beans.factory.xml.XmlBeanDefinitionReader [317] - Loading XML bean definitions from URL [file:/D:/IdeaProjects/green-salad/adx-stat/target/classes/spring-integration-consumer.xml]
[ INFO ] [2016-09-29 14:34:56] org.springframework.beans.factory.config.PropertiesFactoryBean [172] - Loading properties file from URL [jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-09-29 14:34:56 JRebel: Monitoring properties in 'jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties'.
[ INFO ] [2016-09-29 14:34:56] org.springframework.integration.config.IntegrationRegistrar [330] - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
[ INFO ] [2016-09-29 14:34:56] org.springframework.beans.factory.support.DefaultListableBeanFactory [843] - Overriding bean definition for bean 'kafkaConsumerService' with a different definition: replacing [Generic bean: class [com.coocaa.salad.stat.service.KafkaConsumerService]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in file [D:\IdeaProjects\green-salad\adx-stat\target\classes\com\coocaa\salad\stat\service\KafkaConsumerService.class]] with [Generic bean: class [com.coocaa.salad.stat.service.KafkaConsumerService]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in URL [file:/D:/IdeaProjects/green-salad/adx-stat/target/classes/spring-integration-consumer.xml]]
[ INFO ] [2016-09-29 14:34:57] org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor [130] - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
[ INFO ] [2016-09-29 14:34:57] org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor [158] - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
[ INFO ] [2016-09-29 14:34:57] org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [328] - Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [class org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration$$EnhancerBySpringCGLIB$$3dea2e76] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[ INFO ] [2016-09-29 14:34:58] org.springframework.beans.factory.config.PropertiesFactoryBean [172] - Loading properties file from URL [jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-09-29 14:34:58 JRebel: Monitoring properties in 'jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties'.
[ INFO ] [2016-09-29 14:34:58] org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [328] - Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[ INFO ] [2016-09-29 14:34:58] org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [328] - Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[ INFO ] [2016-09-29 14:34:59] org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer [88] - Tomcat initialized with port(s): 8081 (http)

spring-integration-consumer.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka-1.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

<!-- topic test conf -->
<int:channel id="inputFromKafka">
    <int:dispatcher task-executor="kafkaMessageExecutor"/>
</int:channel>
<!-- zookeeper配置 可以配置多个 -->
<int-kafka:zookeeper-connect id="zookeeperConnect"
                             zk-connect="172.20.135.95:2181,172.20.135.95:2182" zk-connection-timeout="10000"
                             zk-session-timeout="10000" zk-sync-time="2000"/>

<!-- channel配置 auto-startup="true"  否则接收不发数据 -->
<int-kafka:inbound-channel-adapter
        id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
        auto-startup="true" channel="inputFromKafka">
    <int:poller fixed-delay="1" time-unit="MILLISECONDS"/>
</int-kafka:inbound-channel-adapter>
<task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500"/>
<bean id="kafkaDecoder"
      class="org.springframework.integration.kafka.serializer.common.StringDecoder"/>

<bean id="consumerProperties"
      class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="properties">
        <props>
            <prop key="auto.offset.reset">smallest</prop>
            <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
            <prop key="fetch.message.max.bytes">5242880</prop>
            <prop key="auto.commit.interval.ms">1000</prop>
            <prop key="auto.commit.enables">true</prop>
        </props>
    </property>
</bean>
<!-- 消息接收的BEEN -->
<bean id="kafkaConsumerService" class="com.coocaa.salad.stat.service.KafkaConsumerService"/>
<!-- 指定接收的方法 -->
<int:outbound-channel-adapter channel="inputFromKafka"
                              ref="kafkaConsumerService" method="processMessage"/>

<int-kafka:consumer-context id="consumerContext"
                            consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
                            consumer-properties="consumerProperties">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
                group-id="group-4" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
                max-messages="5000">
            <!-- 两个TOPIC配置 -->
            <int-kafka:topic id="clientsRequests2" streams="4"/>
            <!--<int-kafka:topic id="sunneytopic" streams="4" />-->
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

kafka版本0.10的

  • 写回答

1条回答 默认 最新

  • dabocaiqq 2016-10-03 15:51
    关注
    评论

报告相同问题?

悬赏问题

  • ¥35 平滑拟合曲线该如何生成
  • ¥100 c语言,请帮蒟蒻写一个题的范例作参考
  • ¥15 名为“Product”的列已属于此 DataTable
  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 自己瞎改改,结果现在又运行不了了
  • ¥15 链式存储应该如何解决
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站