Kafka集群在虚拟机上,能正常通信能互相ping本地consumer消费者程序正常消费,flink程序不能消费不能打印数据,也没有错误提示
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Cityweather {
public static void main(String[] args) throws Exception {
// 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置checkpoint
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
// 创建 KafkaSource
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop100:9092")
.setTopics("POC_test")
.setGroupId("my-groupsdads")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 从 Kafka 读取数据并创建 DataStream
DataStreamSource<String> weathertest = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "weather_test");
// System.out.println(weatherTest);
weathertest.map(value -> {
System.out.println(value);
Thread.sleep(10000);
return value;
});
// 打印数据流
// weathertest.print();
// 执行任务
env.execute("CityWeather");
}
}