Flink任务在由不可预测异常导致任务挂掉,然后自动重新提交之后会重复消费kafka里的数据怎么解决?

如代码所示,同一个flink任务里有多个处理不同数据流的flatMap,从kafka不同的topic中取数据进行处理,其中stream1有不可预测的异常没有抓住,或者因为环境问题导致整个任务被cancel,然后自动重新启动,重启任务之后都会概率性出现有数条kafka里的数据被重复处理,有木有大神知道怎么解决啊?

StreamExecutionEnvitoment env = StreamExecutionEnvitoment.getExecutionEnvitoment();
env.enableCheckpointing(5000);

FlinkKafkaConsumer011<String> myConsumer1 = new FlinkKafkaConsumer011<>(topic1, Charset.forName("ISO8859-1"), prop);
myConsumer1.setStartFromLatest();
DataStream<String> stream1 = env.addSource(myConsumer).name(topic);
stream1.flatMap(new Handle1()).setParallelism(1).setMaxParallelism(1).addSink(new Sink1());

FlinkKafkaConsumer011<String> myConsumer2 = new FlinkKafkaConsumer011<>(topic2, Charset.forName("ISO8859-1"), prop);
myConsumer2.setStartFromLatest();
DataStream<String> stream2 = env.addSource(myConsumer).name(topic);
stream2.flatMap(new Handle2()).setParallelism(1).setMaxParallelism(1).addSink(new Sink2());

1个回答

flink支持失败点恢复机制

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
Flink任务提交模式
local模式本地运行,不需要集群环境IDE开发时,local模式方便本地测试standalone需要搭建flink集群提交命令flink run -m artemis-02:6123 -c com.test.WordCount2 ./Flink.jar hdfs://artemis-02:9000/tmp/lvxw/tmp/logs/words hdfs://artemis-02:9000/t...
flink任务提交
flink安装启动后,默认端口为8081。 浏览器启动 http://ip:8081,可以进入可视化界面。可以看到正在运行的任务及状态。 若要提交新的任务,点击“Submit new Job” -- “Add New+”,上传编译好的jar包工程 点击“upload”,等待上传完成:   点击新上传的jar包,可进行提交操作,&quot;submit&quot;:   提交后可看到任务运行及...
flink消费kafka数据
maven配置 &lt;dependency&gt; &lt;groupId&gt;org.apache.flink&lt;/groupId&gt; &lt;artifactId&gt;flink-scala_2.11&lt;/artifactId&gt; &lt;version&gt;1.7.2&lt;/version&gt; &lt;/dependency&gt; ...
flink消费kafka 数据
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.fli...
Flink提交任务(第一篇)——打包任务程序
文章目录1. 引言2. buildProgram的执行逻辑2.1. 如果没有指定入口类名称,那么就从jar包里找出来2.2. 提取任务jar文件中所有依赖的JAR包2.3. 通过入口类名称来真正加载执行入口类 针对flink-1.7-release 1. 引言 Flink客户端通过命令行的形式提交任务时,即执行./flink run xxxx.jar时,真正执行的run逻辑如下: protecte...
Kafka之重新消费数据
文章目录kafka版本声明修改`offset(偏移量)`通过使用不同的`groupId` kafka版本声明 使用的是kafka 0.10.0.1版本 修改offset(偏移量) 在使用consumer消费的时候,每个topic会产生一个offset(偏移量),offset(偏移量)是在kafka服务端__consumer__offsets这个Topic下存储的,我们修改这个offset(...
Flink提交任务至yarn
在flink on yarn模式中,flink yarn-session的两种提交方式 1.公用一个yarn-session 在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在yarn集群中,除非手工停止。 2.每个job提供一个yarn-session     每次提交都会创建一个新的flink集群,任务之间互相独立,互不影...
kafka之重复消费数据
为什么80%的码农都做不了架构师?&gt;&gt;&gt; ...
flink消费kafka消息
package testMaven.testMaven; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streami
kafka连接flink流计算,实现flink消费kafka的数据
一、启动Kafka集群和flink集群 环境变量配置(注:kafka 3台都需要设置,flink仅master设置就好) [root@master ~]# vim /etc/profile 配置完执行命令: [root@master ~]# source /etc/profile 2.创建执行文件,添加启动服务 [root@master ~]# vim start_kafka....
Kafka重复消费数据问题
  kafka重复消费的问题,主要的原因还是在指定的时间内,没有进行kafka的位移提交,导致根据上一次的位移重新poll出新的数据,而这个数据就是上一次没有消费处理完全的(即没有进行offset提交的),这也是导致kafka重复数据的原因. 改为代码中就是,代码中会指定一个session-time来进行kafka数据的poll,供consumer进行消费处理..一次进行poll的数据量由ma...
Kafka消费数据重复解决方案
YupDB内通过kafka传递进来的数据有重复现象(Kafka消费数据重复) 遇到这种问题,基本上是心跳或offset更新不及时导致。 在kafka环境中,有以下几个参数对于数据重复有很好的效果。 auto.commit.interval.ms consumer向zookeeper提交offset的频率,单位是秒,默认60*1000 此值太大会导致数据重复消费,将其调小可避免重复数据。建议值100...
spark任务提交端口占用异常
错误提示: 7/05/05 15:51:07 WARN AbstractLifeCycle: FAILED org.spark-project.jetty.server.Server@3c8bdd5b: java.net.BindException: Address already in use java.net.BindException: Address already in use ...
spark提交任务端口占用异常
当在同一台机器上提交多个spark任务时 并且是以client的方式提交,会报端口占用错误 17/05/05 15:51:07 WARN AbstractLifeCycle: FAILED org.spark-project.jetty.server.Server@3c8bdd5b: java.net.BindException: Address already in use java.net.
flink从安装到提交任务
本操作全部在Windows下环境进行操作,linux环境下更为简单 下载: flink这东西安装要启动local模式还是比较简单的,首先从官网上下载 http://flink.apache.org/downloads.html   我下载的是最新版本1.1.3, 解压flink: 下载完后对其进行解压,我解压所在的目录在D:\flink-1.1.3-bin-hadoop26-sca
多线程向flink集群提交任务失败
Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Could not upload the program's JAR files to the JobManager.n at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:454)n at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:99)n at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)n at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)n at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:345)n ... 14 common frames omittednCaused by: org.apache.flink.runtime.client.JobSubmissionException: Could not upload the program's JAR files to the JobManager.n at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:410)n at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:451)n ... 19 common frames omittednCaused by: java.io.IOException: Could not retrieve the JobManager's blob port.n at org.apache.flink.runtime.blob.BlobClient.uploadJarFiles(BlobClient.java:745)n at org.apache.flink.runtime.jobgraph.JobGraph.uploadUserJars(JobGraph.java:565)n at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:407)n ... 20 common frames omittednCaused by: java.io.IOException: PUT operation failed: Could not transfer error messagen at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:512)n at org.apache.flink.runtime.blob.BlobClient.put(BlobClient.java:374)n at org.apache.flink.runtime.blob.BlobClient.uploadJarFiles(BlobClient.java:771)n at org.apache.flink.runtime.blob.BlobClient.uploadJarFiles(BlobClient.java:740)n ... 22 common frames omittednCaused by: java.io.IOException: Could not transfer error messagen at org.apache.flink.runtime.blob.BlobClient.readExceptionFromStream(BlobClient.java:799)n at org.apache.flink.runtime.blob.BlobClient.receivePutResponseAndCompare(BlobClient.java:537)n at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:508)n ... 25 common frames omittednCaused by: java.lang.ClassNotFoundException: org.apache.hadoop.ipc.RemoteExceptionn at java.net.URLClassLoader.findClass(URLClassLoader.java:381)n at java.lang.ClassLoader.loadClass(ClassLoader.java:424)n at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)n at java.lang.ClassLoader.loadClass(ClassLoader.java:357)n at java.lang.Class.forName0(Native Method)n at java.lang.Class.forName(Class.java:348)n at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)n at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)n at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)n at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)n at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)n at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)n at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)n at java.lang.Throwable.readObject(Throwable.java:914)n at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)n at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)n at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)n at java.lang.reflect.Method.invoke(Method.java:497)n at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)n at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)n at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)n at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)n at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)n at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)n at org.apache.flink.runtime.blob.BlobClient.readExceptionFromStream(BlobClient.java:795)n ... 27 common frames omitted
十三、Flink源码阅读--Client提交任务过程
我们以flink中自带的wordcount例子作为提交任务,查看在客户端Flink提交任务的详细源码步骤。 入口分析 首先我们看一下提交命令如下: bin/flink run examples/batch/WordCount.jar 接着会在bin/flink shell脚本中找到提交的主类org.apache.flink.client.cli.CliFrontend。我们从CliFron...
Flink从kafka中消费数据--解析
一、情况介绍: 基于scala语言的Flink从kafka中消费数据,然后使用protobuf解析,当然默认是使用string解析的,此处需要指定接收的数据格式 package cetc.kakfa2flink import java.io.IOException import java.util.Properties import com.hxy.protobuf.DSFusion impo...
flink消费kafka数据直接到hdfs
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.fli...
kafka超时导致的重复消费的问题
今天看到群友遇到个问题: 问题的表象是超时导致autoCommit失败,从而导致重复消费 错误内容是: 2018-08-21 00:00:00.938 [kudu-0-C-1] WARN o.a.k.c.consumer.internals.ConsumerCoordinator - Auto offset commit failed for group sm: Commit canno...
Flink流处理之迭代任务
前面我们分析过Flink对迭代在流图中的特殊处理,使得迭代中的反馈环得以转化为普通的DAG模型。这一篇我们将剖析运行时的流处理迭代任务的执行机制。这里涉及到两个任务类:
Flink消费Kafka初步了解
这两年一直听说Flink是新一代的流式处理框架,并且今年阿里巴巴还把自己改造后的Blink贡献给Flink,于是研究起了Flink,将工作原Spark看能否重构为Flink。 Flink的基本介绍这里就不做过多赘述,自行百度或Flink官网查看。 遇到的问题: 作为流处理框架,避免不了和各种消息中间件打交道,Kafka就是其中重要的一个。Flink自己提供了一套消费生产者API共基本使...
kafka重新消费
kafka重新消费的两种方式 低级API AUTO_OFFSET_RESET_CONFIG 方式一:低级API public class MylowerConsumer { public static void main(String[] args) { //1.brokers节点 ArrayList&amp;lt;String&amp;gt; list = new A...
Flink消费kafka消息实战
本次实战的内容是开发Flink应用,消费来自kafka的消息,进行实时计算
通用任务消费系统设计
yaf是PHP框架中运行速度最快的MVC框架,是由鸟哥用C编写,同时yar也是由鸟哥用C编写的微服务框架,鸟哥框架特点:简单易用、性能好。本课程共分为以下几个部分来讲解:yaf框架深入剖析,搭建企业内部私有composer库,自己编写ORM框架,console应用,yar微服务框架及案例,redis、yac通用二级缓存,基于yaf、redis、yar框架生产者/消费者模式通用任务消费系统及案例。
Kafka重复消费
目录 1.出现的情况 2.解决方案出现的情况1.crash来不及提交Offset
kafka重复消费问题
问题描述 采用kafka读取消息进行处理时,consumer会重复读取afka队列中的数据。问题原因 kafka的consumer消费数据时首先会从broker里读取一批消息数据进行处理,处理完成后再提交offset。而我们项目中的consumer消费能力比较低,导致取出的一批数据在session.timeout.ms时间内没有处理完成,自动提交offset失败,然后kafka会重新分配part
kafka重复消费解决方案
重复消费场景: 1.消费端消费能力比较低,处理消息速度慢 2.根据kafka消费特性,消费者在每个partion上的位置都是一个整数,即消费下一条消息的偏移量。这个状态可以定期检查点,使得消息的确认变得非常的方便,消费者可以倒退回旧的偏移量,重新消费。 3.消息处理完之后提交下一个消费的offset,而在session-time-out前,消息还没有处理完,但是已经超时被kafka视为消费失败...
kafka消费命令异常
[root@master conf]# ^C [root@master conf]# kafka-console-consumer.sh –zookeeper master:2181,node1:2181,node2:2181 –from-beginning –topic BROWSER_DATA [2017-07-25 11:16:19,698] ERROR Error processing
关于kafka重新消费数据问题
我们在使用consumer消费数据时,有些情况下我们需要对已经消费过的数据进行重新消费,这里介绍kafka中两种重新消费数据的方法。   1. 修改offset 我们在使用consumer消费的时候,每个topic会产生一个偏移量,这个偏移量保证我们消费的消息顺序且不重复。Offest是在zookeeper中存储的,我们可以设置consumer实时或定时的注册offest到zookeeper...
kafka添加 partion导致 spark 实时任务数据减少
场景: kafka原有两个分区,添加一个分区后,一直运行的实时的spark任务  数据减少约3分之一   分析: spark 将kafka 的offsite维护在checkpoint 里面,当spark任务运行的时候,给kafka添加新的分区,原来的checkpoint只维护原来的两个分区的 offsite,新的分区的offsite 无法维护(checkpoint 只在第一次创建的时候,创建新
自动任务
开发十年,就只剩下这套Java开发体系了 &amp;gt;&amp;gt;&amp;gt;    ...
Spark本地化策略导致的任务执行时间异常
本地化执行时间异常 这两天遇到一个很奇怪的问题,利用receiver和direct两种方式来消费kafka中的数据,官网说direct形式的效率更高但是实际效果却不是很好没有那么快。 这是因为经过查看运行时间表发现,每个批次都等待了3秒的时间。因此联想到spark.locally.wait的默认等待时间是三秒因此可能是为了等待数据的本地化导致的时间延长,因此把时间调低来解决。本地化策略是为了减...
完成预测分类任务
<p>rn <p>rn 20周年限定:唐宇迪老师一卡通!<span style="color:#337FE5;">可学唐宇迪博士全部课程</span>,仅售799元(原价10374元),<span style="color:#E53333;">还送漫威授权机械键盘+CSDN 20周年限量版T恤+智能编程助手!</span>rn </p>rn <p>rn 点此链接购买:rn </p>rn <table>rn <tbody>rn <tr>rn <td>rn <a href="https://edu.csdn.net/topic/teachercard?utm_source=jsk20xqy" target="_blank"><span style="color:#337FE5;">https://edu.csdn.net/topic/teachercard?utm_source=jsk20xqy</span></a>rn </td>rn </tr>rn </tbody>rn </table>rn</p>rn购买课程后,可扫码进入学习群,获取唐宇迪老师答疑rn<p style="font-family:&quot;color:#3D3D3D;font-size:16px;background-color:#FFFFFF;">rn <img src="https://img-bss.csdn.net/201908070554499608.jpg" alt="" /><span style="font-family:&quot;"></span> rn</p>rn<p>rn <br />rn</p>rn<p>rn 深度学习系列课程从基本的神经网络开始讲起,逐步过渡到当下流行的卷积与递归神经网络架构。课程风格通俗易懂,方便大家掌握深度学习的原理。课程以实战为导向,结合当下热门的Tensorflow框架进行案例实战,让同学们上手建模实战。对深度学习经典项目,从数据处理开始一步步带领大家完成多个项目实战任务!rn</p>
预测任务下次执行时间
预测任务下次执行时间 Date date = new Date(); //表达式 CronTrigger trigger = new CronTrigger(&amp;quot;0 0 8 * * ?&amp;quot;); //未来十次执行时间 for(int i=1;i&amp;amp;lt;11;i++){ SimpleTriggerCon...
房价预测任务概述
<p>rn <span> </span> rn</p>rn<p>rn <p>rn 20周年限定:唐宇迪老师一卡通!<span style="color:#337FE5;">可学唐宇迪博士全部课程</span>,仅售799元(原价10374元),<span style="color:#E53333;">还送漫威正版授权机械键盘+CSDN 20周年限量版T恤+智能编程助手!</span>rn </p>rn <p>rn 点此链接购买:rn </p>rn <table>rn <tbody>rn <tr>rn <td>rn <span style="color:#337FE5;"><a href="https://edu.csdn.net/topic/teachercard?utm_source=jsk20xqy" target="_blank">https://edu.csdn.net/topic/teachercard?utm_source=jsk20xqy</a><br />rn</span>rn </td>rn </tr>rn </tbody>rn </table>rn<span>&nbsp;</span>rn</p>rn购买课程后,可扫码进入学习群<span style="font-family:&quot;">,获取唐宇迪老师答疑</span>rn<p>rn <img src="https://img-bss.csdn.net/201908070602566897.jpg" alt="" /> rn</p>rn<p>rn Python数据挖掘技术系列视频培训教程基于真实数据集进行案例实战,使用Python数据科学库从数据预处理开始一步步进行数据建模。对于每个案例首先进行流程解读与数据分析,建立特征工程,详细解读其中每一步原理与应用。该课程共有十一大章节,88课时,从泰坦尼克号获救预测进行数据分析作为第一章节,后边依次是用户画像、Xgboost实战、京东用户购买意向预测、Kaggle数据科学调查、房价预测、论文与BenchMark的意义、Python实现音乐推荐系统、fbprophet时间序列预测、用电敏感客户分类、数据特征。rn</p>
人口普查预测任务概述
<p>rn <br />rn</p>rn<p>rn <p>rn 20周年限定:唐宇迪老师一卡通!<span style="color:#337FE5;">可学唐宇迪博士全部课程</span>,仅售799元(原价10374元),<span style="color:#E53333;">还送漫威正版授权机械键盘+CSDN 20周年限量版T恤+智能编程助手!</span>rn </p>rn <p>rn 点此链接购买:rn </p>rn <table>rn <tbody>rn <tr>rn <td>rn <span style="color:#337FE5;"><a href="https://edu.csdn.net/topic/teachercard?utm_source=jsk20xqy" target="_blank">https://edu.csdn.net/topic/teachercard?utm_source=jsk20xqy</a><br />rn</span>rn </td>rn </tr>rn </tbody>rn </table>rn&nbsp;rn</p>rn购买课程后,可扫码进入学习群<span>,获取唐宇迪老师答疑</span> rn<p>rn <br />rn</p>rn<p>rn <span id="__kindeditor_bookmark_end_1__"></span><img src="https://img-bss.csdn.net/201908070549586910.jpg" alt="" /> rn</p>rn<p>rn 进阶实战课程旨在帮助同学们掌握机器学习进阶算法原理并应用Python工具包进行实战任务,学习过程中建议大家先掌握机器学习经典算法再加入进阶实战课程中。课程整体风格通俗易懂,用最接地气的方式带大家轻松入门机器学习各大高深算法并结合真实数据集进行项目实战。rn</p>
Kafka重复消费和丢失数据研究
Kafka重复消费原因、数据丢失 底层根本原因:已经消费了数据,但是offset没提交。 原因1:强行kill线程,导致消费后的数据,offset没有提交。 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。例如: try { consumer.un
Kafka重复消费同一Topic数据
在 高级API 中,消费者要从头开始消费某个 topic 的全量数据,需要满足2个条件: (1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过); (2)指定"auto.offset.reset"参数的值为earliest; “What to do when there is no initial offset in Kafka or if the current of...
Linux 自动任务生成数据
/* * 自动任务跑数据 * 订单组日工作量报表数据写入 * * $start_date 0000-00-00 00:00:00 开始时间 * $email_start_date 0000-00-00 00:00:00 开始时间 * $end_dates 0000-00-00 00:00:
相关热词 c# 标准差 计算 c#siki第五季 c#入门推荐书 c# 解码海康数据流 c# xml的遍历循环 c# 取 查看源码没有的 c#解决高并发 委托 c#日期转化为字符串 c# 显示问号 c# 字典对象池