雷雷飘过 2020-08-25 14:51 采纳率: 0%
浏览 1177

flink的timeWindowAll流无法输出数据的问题

我测试了使用flink消费kafka的数据,然后将消费的kafka解析为特定的bean类,然后对流数据进行窗口的指定,这里我使用的是timeWindowAll,但是当数据进入了这个方法里面之后,kafka消费的数据就不见了。但是如果我用countWindowAll窗口的话,数据是可以正常进入MySQL的自定义Sink的,而且两个的代码是一样的,所以不知道问题出在了哪里,想找大神求教。pom的flink版本试过1.9.1和1.11.1版本,现在贴上主类的代码。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    // configure Kafka consumer
    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", KAFKA_BROKER);
    properties.setProperty("auto.offset.reset", "earliest");
    properties.setProperty("enable.auto.commit", "false");
    properties.setProperty("group.id", "test001");
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test_topics", new SimpleStringSchema(), properties);

    DataStreamSource<String> kafkaStream = env.addSource(consumer);
    kafkaStream
            .map(new MapFunction<String, EarlyWarningMysql>() {
                public EarlyWarningMysql map(String s) throws Exception {
                    return JSONObject.parseObject(s, EarlyWarningMysql.class);
                }
            })
                            //这个窗口不可以使用

// .timeWindowAll(Time.seconds(1)).apply(new AllWindowFunction, TimeWindow>() {
// public void apply(TimeWindow window, Iterable values, Collector> out) throws Exception {
// ArrayList list = Lists.newArrayList(values);
// out.collect(list);
// }
// })
//这个窗口是可以使用的
.countWindowAll(10).apply(new AllWindowFunction, GlobalWindow>() {
public void apply(GlobalWindow window, Iterable values, Collector> out) throws Exception {
ArrayList list = Lists.newArrayList(values);
out.collect(list);
}
})
.addSink(new SinkToMysql());

    env.execute();
}
  • 写回答

2条回答 默认 最新

  • 关注
    评论

报告相同问题?

悬赏问题

  • ¥15 有兄弟姐妹会用word插图功能制作类似citespace的图片吗?
  • ¥15 请教:如何用postman调用本地虚拟机区块链接上的合约?
  • ¥15 为什么使用javacv转封装rtsp为rtmp时出现如下问题:[h264 @ 000000004faf7500]no frame?
  • ¥15 乘性高斯噪声在深度学习网络中的应用
  • ¥15 关于docker部署flink集成hadoop的yarn,请教个问题 flink启动yarn-session.sh连不上hadoop,这个整了好几天一直不行,求帮忙看一下怎么解决
  • ¥15 深度学习根据CNN网络模型,搭建BP模型并训练MNIST数据集
  • ¥15 C++ 头文件/宏冲突问题解决
  • ¥15 用comsol模拟大气湍流通过底部加热(温度不同)的腔体
  • ¥50 安卓adb backup备份子用户应用数据失败
  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛