love_xiaoshaung 2019-01-09 20:11 采纳率: 0%
浏览 4570

Flink任务在由不可预测异常导致任务挂掉,然后自动重新提交之后会重复消费kafka里的数据怎么解决?

如代码所示,同一个flink任务里有多个处理不同数据流的flatMap,从kafka不同的topic中取数据进行处理,其中stream1有不可预测的异常没有抓住,或者因为环境问题导致整个任务被cancel,然后自动重新启动,重启任务之后都会概率性出现有数条kafka里的数据被重复处理,有木有大神知道怎么解决啊?

StreamExecutionEnvitoment env = StreamExecutionEnvitoment.getExecutionEnvitoment();
env.enableCheckpointing(5000);

FlinkKafkaConsumer011<String> myConsumer1 = new FlinkKafkaConsumer011<>(topic1, Charset.forName("ISO8859-1"), prop);
myConsumer1.setStartFromLatest();
DataStream<String> stream1 = env.addSource(myConsumer).name(topic);
stream1.flatMap(new Handle1()).setParallelism(1).setMaxParallelism(1).addSink(new Sink1());

FlinkKafkaConsumer011<String> myConsumer2 = new FlinkKafkaConsumer011<>(topic2, Charset.forName("ISO8859-1"), prop);
myConsumer2.setStartFromLatest();
DataStream<String> stream2 = env.addSource(myConsumer).name(topic);
stream2.flatMap(new Handle2()).setParallelism(1).setMaxParallelism(1).addSink(new Sink2());
  • 写回答

1条回答

  • bella20100531 2019-06-06 18:04
    关注

    flink支持失败点恢复机制

    评论

报告相同问题?

悬赏问题

  • ¥15 关于#java#的问题,请各位专家解答!
  • ¥15 急matlab编程仿真二阶震荡系统
  • ¥20 TEC-9的数据通路实验
  • ¥15 ue5 .3之前好好的现在只要是激活关卡就会崩溃
  • ¥50 MATLAB实现圆柱体容器内球形颗粒堆积
  • ¥15 python如何将动态的多个子列表,拼接后进行集合的交集
  • ¥20 vitis-ai量化基于pytorch框架下的yolov5模型
  • ¥15 如何实现H5在QQ平台上的二次分享卡片效果?
  • ¥30 求解达问题(有红包)
  • ¥15 请解包一个pak文件