m0_38085042 2022-07-18 18:17 采纳率: 0%
浏览 14

Code Sever调试代码卡住

问题遇到的现象和发生背景

用Code Sever调试Maven工程代码时,打包正常,运行单个java文件的时候就一直卡住了。

问题相关代码,请勿粘贴截图

import com.alibaba.fastjson.JSON;
import com.bigdata.flink.entity.NewsBaseInfo;
import com.bigdata.flink.common.config.MyProperties;
import com.bigdata.flink.serializer.SimpleAvroSchemaFlink;
import com.hundsun.rcmd.bean.avro.GeneralNewsInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class GeneralNewsInfoConsumerFlink {
    private static Properties properties = MyProperties.load();
    private static final String readTopic = properties.getProperty("read.topic");
    private static final String writeTopic = properties.getProperty("write.topic");
    private static final String bootstrapServers = properties.getProperty("bootstrap.servers");
    private static final String zookeeperServers = properties.getProperty("zookeeper.servers");
    private static final String groupId = properties.getProperty("group.id");
    private static final String krb5Conf = properties.getProperty("java.security.krb5.conf");
    private static final String loginConfig = properties.getProperty("java.security.auth.login.config");

    public static void main(String[] args) throws Exception {
        // 1.构建流处理运行环境
        if (krb5Conf != null && !"".equals(krb5Conf)) {
            System.setProperty("java.security.auth.login.config", krb5Conf);
            System.setProperty("java.security.krb5.conf", loginConfig);
            //System.out.println("here1------:"+krb5Conf);
        }
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // number of restart attempts
                30000L // delay
        ));

        env.setParallelism(1);
        //每隔60000ms进行启动一个检查点
        env.enableCheckpointing(60000);
        //确保检查点之间有进行500 ms的进度
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        //最大并发的CheckPoint数量
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        //设置模式为exactly-once
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);


        // 2.设置kafka 配置信息
        // Properties prop = new Properties();
        // prop.put("bootstrap.servers", bootstrapServers);
        // prop.put("group.id", groupId);
        // prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // // 设置反序列化类为自定义的avro反序列化类
        // prop.put("value.deserializer", SimpleAvroSchemaFlink.class.getName());

        //kafka配置
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("zookeeper.connect", zookeeperServers);
        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", SimpleAvroSchemaFlink.class.getName());
        props.put("auto.offset.reset", "latest");
        if(krb5Conf != null && !"".equals(krb5Conf)){
            //kerberos安全认证
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "GSSAPI");
            props.put("sasl.kerberos.service.name", "kafka");
            //System.out.println("here2------:");
        }
        // 3.构建Kafka 连接器
        FlinkKafkaConsumer<GeneralNewsInfo> consumer = new FlinkKafkaConsumer<>(readTopic, new SimpleAvroSchemaFlink(), props);

        // 4.设置Flink层最新的数据开始消费
        consumer.setStartFromLatest();

        // 5.基于kafka构建数据源
        DataStream<GeneralNewsInfo> newsData = env.addSource(consumer);

        // 6.提取重要数据,并进行结果打印
        DataStream<String> baseNewsData =  newsData.map((new MapFunction<GeneralNewsInfo,String>(){

            @Override
            public String map(GeneralNewsInfo generalNewsInfo) throws Exception {
                NewsBaseInfo newsBaseInfo = new NewsBaseInfo();
                newsBaseInfo.setRowkey(generalNewsInfo.getRowkey());
                newsBaseInfo.setTitle(generalNewsInfo.getTitle());
                newsBaseInfo.setDate(generalNewsInfo.getPublishTime().toString());
                newsBaseInfo.setId(generalNewsInfo.getNewsId());
                newsBaseInfo.setRepositoryId(generalNewsInfo.getNewsRepositoryId());
                newsBaseInfo.setContent(generalNewsInfo.getContent());
                return JSON.toJSONString(newsBaseInfo);
            }
        }));

        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(writeTopic, new SimpleStringSchema(), props);

        baseNewsData.addSink(producer).name("SinkKafka");

        // baseNewsData.print();

        // 7.执行任务
        env.execute("GeneralNewsInfoConsumerFlink");
    }
}
运行结果及报错内容

bed553f9 Importing Maven project(s) [Done]
f121eb6c Building [Done]
23073d02 Building [Done]
33c657bb Building [Done]

我想要达到的结果

如何解决这个一直Building 呢

  • 写回答

1条回答 默认 最新

  • Kwan的解忧杂货铺 Java领域优质创作者 2022-07-19 09:51
    关注

    把项目里面的dubug断点全部删除试下

    评论

报告相同问题?

问题事件

  • 创建了问题 7月18日

悬赏问题

  • ¥15 sqlite 附加(attach database)加密数据库时,返回26是什么原因呢?
  • ¥88 找成都本地经验丰富懂小程序开发的技术大咖
  • ¥15 如何处理复杂数据表格的除法运算
  • ¥15 如何用stc8h1k08的片子做485数据透传的功能?(关键词-串口)
  • ¥15 有兄弟姐妹会用word插图功能制作类似citespace的图片吗?
  • ¥200 uniapp长期运行卡死问题解决
  • ¥15 latex怎么处理论文引理引用参考文献
  • ¥15 请教:如何用postman调用本地虚拟机区块链接上的合约?
  • ¥15 为什么使用javacv转封装rtsp为rtmp时出现如下问题:[h264 @ 000000004faf7500]no frame?
  • ¥15 乘性高斯噪声在深度学习网络中的应用