kafka集成storm出现异常

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: No leader found for partition 1 at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) at backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:565) at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: java.lang.RuntimeException: No leader found for partition 1 at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:79) ... 6 more Caused by: java.lang.RuntimeException: No leader found for partition 1 at storm.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:120) at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:68) ... 7 more

2

2个回答

kafka集群 每个 partition都有一台机器充当leader 如果这台机器挂掉了,集群会自动指定某个follower 转变成leader。

看报错最有可能是leader 挂掉了,没有出现新的leader。

估计是配置有问题

3
qq_32009167
qq_32009167
2 年多之前 回复

前言

在前面Storm系列之——基本概念一文中,提到过Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。本文就如何集成Kafka进行介绍。

Kafka的基本介绍: http://blog.csdn.net/xeseo/article/details/18311955

准......
答案就在这里:storm kafka集成
----------------------Hi,地球人,我是问答机器人小S,上面的内容就是我狂拽酷炫叼炸天的答案,除了赞同,你还有别的选择吗?

-1
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
storm集成kafka实例
storm是流式计算框架,而kafka是一个消息队列,在生产环境中两者经常配合使用,kafka缓存消息,storm可以从kafka中读取消息。因为流式消息的产生可能是不均匀的,经过kafka缓存之后,可以将不均匀的消息流变为均匀的传给storm用于计算。 下面的代码实现了将kafka的“test”topic产生的消息传给storm,然后storm将输出导入kafka的“test2”topic,因为
storm集成kafka简单使用示例
KafkaStormSample.javapackage kafkaUse.kafkaUse.withStorm;import java.util.Properties; import java.util.UUID;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm
storm安装与kafka整合
1.storm是什么 1.1storm的介绍 storm是twitter公司开源贡献给apache的一款实时流式处理的一个开源软件,主要用于解决数据的实时计算以及实时的处理等方面的问题 1.2storm的特点 Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。St...
storm集成kafka简单使用示例2
StormKafkaTopo.javapackage stormUse.stormUse;import java.util.Properties;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.
大数据系列之实时处理Storm(五)Storm与Kafka集成
我们最常用的或许就是Storm从Kafka中读取数据转换成Tuple了,现在我们就将Storm与Kafka来进行整合。 1.pom.xml <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <v...
Kafka分布式消息系统实战(与Java+Scala+Hadoop+Storm集成)
1.课程研发环境 Kafka的版本:kafka_2.9.2-0.8.1.1.tgz和kafka_2.11-0.10.0.0.tgz 开发工具: Linux;Eclipse;Scala IDE 2.内容简介 Kafka是分布式的消息队列,作为云计算服务的基石,它广泛的应用在实时数据流方面,是实时数据处理的数据中枢,广泛应用在很多互联网企业,例如:linkedin,facebook,腾讯,百度,阿里等。实时数据流是现在互联网公司、甚至拥有大规模数据的传统企业的主要模式, 实时数据(Real-time Activity Data)就是那些非交易,不需要秒级响应的数据, 但在后续的分析中产生极大作用,例如个性化推荐、运营服务监控、精细化营销、报表等 。
storm笔记 与kafka的集成
   storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景。因此,storm的发布包中也包含了一个集成jar,支持从kafka读出数据,供storm应用使用。这里结合自己的应用做个简单总结。   由于storm已经提供了storm-kafka,因此可以直接使用,使用kafka的低级api读取数据。如果有需要的话,自己实...
STORM入门之(集成KafkaSpout)
此篇基于原有两篇文章基础上扩展 STORM入门之(集成KafkaBolt) 传送门:http://blog.csdn.net/yl3395017/article/details/77452604 KafkaSpout更新 主要是构建KafkaSpout基本配置操作 /** * 构建KafkaSpout */ private static void bui
kafka和storm的环境安装详解
kafka和storm集群的环境安装前言(参照的文章: http://www.panchengming.com/2018/01/26/pancm70/)storm和kafka集群安装是没有必然联系的,我将这两个写在一起,是因为他们都是由zookeeper进行管理的,也都依赖于JDK的环境,为了不重复再写一遍配置,所以我将这两个写在一起。若只需一个,只需挑选自己选择的阅读即可。这两者的依赖如下:St...
第 12 讲 SpringBoot集成Kafka消息中间件
第十二课 SpringBoot集成Kafka消息中间件 文章目录第十二课 SpringBoot集成Kafka消息中间件1. kafka简介2. linux下启动kafka3. SpringBoot集成kafka3.1 引入依赖: pom.xml3.2 配置kafka:application.yml3.3 编写消息生成类:KafkaProducerController3.4 编写消息消费类:Kafk...
Storm应用系列之——集成Kafka
本文系原创系列,转载请注明。 原帖地址:http://blog.csdn.net/xeseo 前言 在前面Storm系列之——基本概念一文中,提到过Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。本文就如何集成Kafka进行介绍。 Kafka的基本介绍:http://blog.csdn
kafka+storm整合代码
package com.ljt.stormandkafka.kafkaAndStorm;import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts;import backtype.storm.Config; i
Storm作为新消费者对接Kafka 0.10.x+版本
Storm应用场景—作为新消费者对接Kafka 0.10.x+版本(一) 00 背景 随着Kafka版本的升级,Storm作为消费者对接Kafka 0.10.x+版本的方式上,与之前存在差异,现将新的方式记录下来,为以后遇到使用Storm实时处理新版Kafka数据的场景时提供参考。 01 架构简介 架构如下图所示。 使用Flume组件采集数据时,采用双层架构,第一层的作用是采集,第二层的作用是聚...
storm+kafka整合异常处理
[摘要:1 拷贝kafka依附jar包到storm lib [root@hdmaster libs]# cp kafka_2.10-0.8.2.1.jar /opt/apache-storm-0.9.5/lib/ [root@hdmaster libs]# cp scala-library-2.10.4.ja]  1 拷贝kafka依赖jar包到storm lib [root@hdmast
storm集成kafka插demo.zip
storm集成kafka插件demo
Storm-Kafka之——Storm集成Kafka时遇见的问题
一、KafkaSpout 引起的 log4j 的问题问题描述:SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. SLF4J: See also http://www.slf4j.org/codes.html#log4jD...
kafka、storm、zookeeper集群环境java代码编写部署
重要备注:1、整个java代码编写使用到的工具是 IntelliJ IDEA                2、前提是kafka、zookeeper、storm集群环境已经能够使用                3、特别注意本地pom文件中版本号一定要与集群环境的版本号对应代码编写:首先是pom文件:<?xml version="1.0" encoding="UTF-8"?> <...
Storm与Kafka集成用eclipse调试集群连接不上的问题
Exception in thread"main" org.apache.storm.utils.NimbusLeaderNotFoundException: Couldnot find leader nimbus from seed hosts [localhost]. Did you specify a validlist of nimbus hosts for config nimbus.s
flume+kafka+storm整合实现实时计算小案例
    我们做数据分析的时候常常会遇到这样两个场景,一个是统计历史数据,这个就是要分析历史保存的日志。我们会使用hadoop,具体框架可以设计为:1.flume收集日志;2.HDFS输入路径存储日志;3.MapReduce计算,将结果输出到HDFS输出路径;4.hive+sqoop实现将结果转储到mysql5.我们会使用crontab定时执行一个脚本来做具体这里就不展开来说了,我会在另一个帖子讲到...
storm 0.10.0 kafkaSpout 总是读取旧消息 offset丢失问题
Storm开发免不了本地测试,storm 0.9.6的版本一切正常,升级到0.10.0本地localCluster运行出现了问题。 kafkaSpout每次启动读取消息都是from-beginning。 经过各种测试,普通的kafka消费者没有这个错误,而且提交到集群也没问题。网上大量查找解决办法,终于找到了原文地址:http://www.bubuko.com/infodetail-672915
storm+kafka+jdbc整合实例
storm版本:1.0.1 kafka版本:0.8.2.2
storm bolt作为kafka消息队列生产者
项目的storm拓扑处理压缩包后,对正常的通过校验的压缩包里的交易文件。在bolt里读取交易文件的每条记录。把每一条的交易记录+压缩包名+交易文件名+服务商+批次号的等信息组成一个字符串当做消息发送到kafka消息队列。但是目前还不知道哪里定义消息队列。以及哪里初始化zookeeper节点信息,基本所有的配置信息都是在zookeeper里面的。
STORM整合kafka消费数据
参见我的git项目:https://github.com/jimingkang/StormTwo/tree/master/src/user_visit 项目文件: 1)package user_visit; import cloudy.spout.OrderBaseSpout; import com.ibf.base.spout.SourceSpout; import backtype.
Kafka和Storm的整合
主要难点在于实现一个KafkaSpout,用于Storm接收从Kafka传来的消息 //发送第一步,加入需要发送列表中 protected void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) { for (TopicPartition tp : consumerRecords.par...
storm与kafka整合jar包
storm与kafka整合jar包。storm与kafka整合jar包storm与kafka整合jar包storm与kafka整合jar包
Storm-kafka集成——1.1.0版本storm中tuple取KafkaSpout数据详解
问题描述:KafkaSpout拉取kafka topic数据,下一级bolt从kafkaspout获取数据,tuple到底采用什么方法取出spout中的消息呢?KafkaSpout创建:/* *根据数据源topic和zk_id创建并返回kafkaSpout * */ public static KafkaSpout init(String spout_topic,String zk_id){ ...
【storm-kafka】storm和kafka结合处理流式数据
首先简单描述下storm Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。Storm经常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等领域。Storm的部署管理非常简单,而且,在同类的流式计算工具,Storm的性能也是非常出众的。 关于kafka Kafka是一种高吞吐量的分布式发布
(三 )kafka-jstorm集群实时日志分析 之 ---------jstorm集成spring 续(代码)
本地模式启动的. package com.doctor.kafkajstrom; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.RandomStringUtils; import org.slf4j.Log
kafka+storm+influxdb的程序demo
程序架构包含了kafka收集,storm的实时处理,influxdb的数据存储
storm从kafka中读数据
========================================== 定义从kafka中读出的数据 import java.io.UnsupportedEncodingException; import java.util.List; import backtype.storm.spout.Scheme; import backtype.storm.tuple.F
104-storm 整合 kafka之保存MySQL数据库
整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。实际上在 apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,通过本章可以学习如何整合
flume-kafka-storm源程序
flume kafka storm集成源代码和文档介绍
使用flume-ng+kafka+storm+mysql 搭建实时日志处理平台
一、架构介绍         因为要采集的日志已经在线上,为了不侵入代码(主要也是其他产品不会因为我们搞这个日志监控而修改代码后重新上线),已经不能再规范日志化输出,也就是需要对老系统进行日志分析。对于不同的应用,不同的日志类型,比如nginx日志、tomcat日志、应用日志等都需要分别采集;调研了flume和Logstash,当然还有更轻量级的filebeta;最后选择了flume,有以下几个
Storm-kafka 的整合 官网文档翻译
Storm Kafka 提供的 storm core 和Trident spout 实现,用于从kafka 0.8.x中消费数据。 Spouts Spouts 支持Trident 和 core Storm spouts. 这两种spout的实现,通过BrokerHost接口来同步kafkaBroker host 和partition(分区)的映射关系和控制kafka相关参数的kafkaC...
基于Flume+Kafka+ Elasticsearch+Storm的海量日志实时分析平台
基于Flume+Kafka+ Elasticsearch+Storm的海量日志实时分析平台 原文网址:http://www.weixinnu.com/tag_article/1542492037 作者:互联网技术联盟 微信公众号 日期:2016年4月26日 袁晓亮 猎聘网 架构中间件负责人 互联网技术联盟 ITA1024讲师团成员 本篇文章整理自袁晓亮4月26日在
storm读kafka数据源保证消息不丢失的方法
ack设置成-1 unclean设置为false就不丢数了,除非所有集群都同时挂,磁盘缓存没刷新 再设置个mini isr=2更大保证一下 如果你们ack设置的默认为1,那么切换leader的时候丢数很正常 看你们的需求了,吞吐量跟性能做个平衡 还有topic级别两个配置 ack是生产者的 unclean、mini isr是top
【配置】Storm和Kafka的对接:KafkaSpout
前言:Strom从Kafka中读取数据,本质是实现一个Storm中的Spout,来读取Kafka中的数据;这个Spout,可以称为Kafka Spout。支持两种类型的Spout: Core storm spout; Trident spout;
Storm-Kafka使用笔记(一):Scheme和Mapper
一、背景因为是项目驱动的去了解Storm-kafka,所以会由于分工去先了解某一部分,顺序有点杂乱。二、Storm-Kafka介绍了解Storm的都知道,Storm拓扑中比较基本的就是Spout和Bolt,Spout作为数据发射源,可以从数据库或者其他地方取得数据然后发射出去,Bolt就是中间一个个运算处理的组件,最后一个Bolt可以担任数据处理结果汇总或者数据落地的角色。三、Storm-Kafka
简单Storm消费Kafka数据并存储到redis实例(订单信息处理)
maven依赖 <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.5</ve
Storm-Kafka 一分钟例子
以下文章使用了kafka作为storm的sport数据源,依赖于docker-compose环境,如果自己本机已经有了zookeeper与kafka环境则可以使用自己的环境。 使用教程 docker-compose.yml version: '2' services: zookeeper: image: wurstmeister/zookeeper por...
文章热词 机器学习教程 Objective-C培训 交互设计视频教程 颜色模型 设计制作学习
相关热词 mysql关联查询两次本表 native底部 react extjs glyph 图标 storm大数据视频教程 大数据storm视频教程