问题遇到的现象和发生背景
用Code Sever调试Maven工程代码时,打包正常,运行单个java文件的时候就一直卡住了。
问题相关代码,请勿粘贴截图
import com.alibaba.fastjson.JSON;
import com.bigdata.flink.entity.NewsBaseInfo;
import com.bigdata.flink.common.config.MyProperties;
import com.bigdata.flink.serializer.SimpleAvroSchemaFlink;
import com.hundsun.rcmd.bean.avro.GeneralNewsInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class GeneralNewsInfoConsumerFlink {
private static Properties properties = MyProperties.load();
private static final String readTopic = properties.getProperty("read.topic");
private static final String writeTopic = properties.getProperty("write.topic");
private static final String bootstrapServers = properties.getProperty("bootstrap.servers");
private static final String zookeeperServers = properties.getProperty("zookeeper.servers");
private static final String groupId = properties.getProperty("group.id");
private static final String krb5Conf = properties.getProperty("java.security.krb5.conf");
private static final String loginConfig = properties.getProperty("java.security.auth.login.config");
public static void main(String[] args) throws Exception {
// 1.构建流处理运行环境
if (krb5Conf != null && !"".equals(krb5Conf)) {
System.setProperty("java.security.auth.login.config", krb5Conf);
System.setProperty("java.security.krb5.conf", loginConfig);
//System.out.println("here1------:"+krb5Conf);
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
30000L // delay
));
env.setParallelism(1);
//每隔60000ms进行启动一个检查点
env.enableCheckpointing(60000);
//确保检查点之间有进行500 ms的进度
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//最大并发的CheckPoint数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
//设置模式为exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 2.设置kafka 配置信息
// Properties prop = new Properties();
// prop.put("bootstrap.servers", bootstrapServers);
// prop.put("group.id", groupId);
// prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// // 设置反序列化类为自定义的avro反序列化类
// prop.put("value.deserializer", SimpleAvroSchemaFlink.class.getName());
//kafka配置
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("zookeeper.connect", zookeeperServers);
props.put("group.id", groupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", SimpleAvroSchemaFlink.class.getName());
props.put("auto.offset.reset", "latest");
if(krb5Conf != null && !"".equals(krb5Conf)){
//kerberos安全认证
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "kafka");
//System.out.println("here2------:");
}
// 3.构建Kafka 连接器
FlinkKafkaConsumer<GeneralNewsInfo> consumer = new FlinkKafkaConsumer<>(readTopic, new SimpleAvroSchemaFlink(), props);
// 4.设置Flink层最新的数据开始消费
consumer.setStartFromLatest();
// 5.基于kafka构建数据源
DataStream<GeneralNewsInfo> newsData = env.addSource(consumer);
// 6.提取重要数据,并进行结果打印
DataStream<String> baseNewsData = newsData.map((new MapFunction<GeneralNewsInfo,String>(){
@Override
public String map(GeneralNewsInfo generalNewsInfo) throws Exception {
NewsBaseInfo newsBaseInfo = new NewsBaseInfo();
newsBaseInfo.setRowkey(generalNewsInfo.getRowkey());
newsBaseInfo.setTitle(generalNewsInfo.getTitle());
newsBaseInfo.setDate(generalNewsInfo.getPublishTime().toString());
newsBaseInfo.setId(generalNewsInfo.getNewsId());
newsBaseInfo.setRepositoryId(generalNewsInfo.getNewsRepositoryId());
newsBaseInfo.setContent(generalNewsInfo.getContent());
return JSON.toJSONString(newsBaseInfo);
}
}));
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(writeTopic, new SimpleStringSchema(), props);
baseNewsData.addSink(producer).name("SinkKafka");
// baseNewsData.print();
// 7.执行任务
env.execute("GeneralNewsInfoConsumerFlink");
}
}
运行结果及报错内容
bed553f9 Importing Maven project(s) [Done]
f121eb6c Building [Done]
23073d02 Building [Done]
33c657bb Building [Done]
我想要达到的结果
如何解决这个一直Building 呢