我测试了使用flink消费kafka的数据,然后将消费的kafka解析为特定的bean类,然后对流数据进行窗口的指定,这里我使用的是timeWindowAll,但是当数据进入了这个方法里面之后,kafka消费的数据就不见了。但是如果我用countWindowAll窗口的话,数据是可以正常进入MySQL的自定义Sink的,而且两个的代码是一样的,所以不知道问题出在了哪里,想找大神求教。pom的flink版本试过1.9.1和1.11.1版本,现在贴上主类的代码。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// configure Kafka consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", KAFKA_BROKER);
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("group.id", "test001");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test_topics", new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaStream = env.addSource(consumer);
kafkaStream
.map(new MapFunction<String, EarlyWarningMysql>() {
public EarlyWarningMysql map(String s) throws Exception {
return JSONObject.parseObject(s, EarlyWarningMysql.class);
}
})
//这个窗口不可以使用
// .timeWindowAll(Time.seconds(1)).apply(new AllWindowFunction, TimeWindow>() {
// public void apply(TimeWindow window, Iterable values, Collector> out) throws Exception {
// ArrayList list = Lists.newArrayList(values);
// out.collect(list);
// }
// })
//这个窗口是可以使用的
.countWindowAll(10).apply(new AllWindowFunction, GlobalWindow>() {
public void apply(GlobalWindow window, Iterable values, Collector> out) throws Exception {
ArrayList list = Lists.newArrayList(values);
out.collect(list);
}
})
.addSink(new SinkToMysql());
env.execute();
}