laijunlin_data 2022-11-11 14:34 采纳率: 58.3%
浏览 371
已结题

用flinkcdc读取mysql数据,写入postgresql数据库,程序运行一段时间后挂了

问题遇到的现象和发生背景

用flinkcdc读取mysql数据,写入postgresql数据库,程序运行一段时间后挂了

用代码块功能插入代码,请勿粘贴截图

flink的checkpoint时间是5s,mysql的binlog保存时间是1天

package com.vanke.flink.bigdatadwd.writedwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.vanke.flink.bigdatadwd.bean.UserTrackTableDataDWD;
import com.vanke.flink.bigdatadwd.enitys.StaticEntity;
import com.vanke.flink.bigdatadwd.utils.GlobeDistance;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;



public class SynchronizeBizUserTrack_test {
    public static void main(String[] args) throws InterruptedException {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(5000);
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(StaticEntity.HDFS_URL + "/hudi-warehouse/dwd/ck/flink/dwd_biz_user_track_test/checkpoints");
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        SingleOutputStreamOperator<UserTrackTableDataDWD> odsData = readTableData(env);
        odsData.print();
        SingleOutputStreamOperator<UserTrackTableDataDWD> etlData = etlOdsData(odsData);

        SingleOutputStreamOperator<UserTrackTableDataDWD> result = useDistanceRule(etlData);

        writeToPGDWD(result);

        try {
            env.execute("SynchronizeBizUserTrack_test11");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void writeToPGDWD(SingleOutputStreamOperator<UserTrackTableDataDWD> result) {

        String sql = "insert into dwd.dwd_biz_user_track(id,project_id,source,check_type,user_id,name,longitude,latitude,create_Time," +
                "track_time,tririga_user_code,label_id,dotted_flag,angle,speed,mark1,mark) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) " +
                "on conflict(id) do update set project_id=excluded.project_id,source=excluded.source,check_type=excluded.check_type,user_id=excluded.user_id," +
                "name=excluded.name,longitude=excluded.longitude,latitude=excluded.latitude,create_Time=excluded.create_Time,track_time=excluded.track_time," +
                "tririga_user_code=excluded.tririga_user_code,label_id=excluded.label_id,dotted_flag=excluded.dotted_flag,angle=excluded.angle,speed=excluded.speed," +
                "mark1=excluded.mark1,mark=excluded.mark;";
        result.addSink(JdbcSink.sink(
                sql,
                (statement,data) ->{
                    statement.setString(1,data.getId());
                    statement.setString(2,data.getProject_id());
                    statement.setString(3,data.getSource());
                    statement.setString(4,data.getCheck_type());
                    statement.setString(5,data.getUser_id());
                    statement.setString(6,data.getName());
                    statement.setDouble(7,data.getLongitude());
                    statement.setDouble(8,data.getLatitude());
                    statement.setTimestamp(9,data.getCreate_time());
                    statement.setTimestamp(10,data.getTrack_time());
                    statement.setString(11,data.getTririga_user_code());
                    statement.setString(12,data.getLabel_id());
                    statement.setString(13,data.getDotted_flag());
                    statement.setString(14,data.getAngle());
                    statement.setString(15,data.getSpeed());
                    statement.setString(16,data.getMark1());
                    statement.setString(17,data.getMark());
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(StaticEntity.PGSQL_URL)
                        .withDriverName("org.postgresql.Driver")
                        .withUsername(StaticEntity.PDBName)
                        .withPassword(StaticEntity.PDBPwssd)
                        .build()
        )).uid("track_sink").name("track_sink");
    }


    private static SingleOutputStreamOperator<UserTrackTableDataDWD> useDistanceRule(SingleOutputStreamOperator<UserTrackTableDataDWD> etlData) {
        return etlData.keyBy(pojo -> pojo.getUser_id())
                .process(new KeyedProcessFunction<String, UserTrackTableDataDWD, UserTrackTableDataDWD>() {

                    private ValueState<String> dateState;
                    private ValueState<Double> latiState;
                    private ValueState<Double> longiState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        longiState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("longiState", Double.class));
                        latiState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("latiState", Double.class));
                        dateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("DateState", String.class));
                    }

                    @Override
                    public void processElement(UserTrackTableDataDWD data, Context ctx, Collector<UserTrackTableDataDWD> collector) throws Exception {
                        Double firstLongi = longiState.value();
                        Double firstLati = latiState.value();
                        String yesterday = dateState.value();


                        long createTime = data.getCreate_time().getTime();
                        long trackTime = data.getTrack_time().getTime();
                        data.setCreate_time(new Timestamp(createTime));
                        data.setTrack_time(new Timestamp(trackTime));
                        //这里是用track_time作为数据时间
                        String today = new SimpleDateFormat("yyyy-MM-dd").format(trackTime);
                        //测试时间没有时区问题
//                        System.out.println(today + ":" + data.getId() + ":" + data.getTrack_time() + ":" + trackTime);

                        // 如果不相等,则是当天的第一条或者最开始的第一条数据
                        if(!today.equals(yesterday)){
                            dateState.update(today);
                            longiState.update(data.getLongitude());
                            latiState.update(data.getLatitude());
                            data.setMark1("1");
                            collector.collect(data);
                        }else{
                            // today 与 yesterday 相等,则说明不是第一条数据,需要与上一条数据比较,判断是否过滤
                            Double secondLongi = data.getLongitude();
                            Double secondLati = data.getLatitude();

                            Double latLngDistance = GlobeDistance.getLatLngDistance(firstLongi, firstLati, secondLongi, secondLati);


                            data.setMark(latLngDistance.toString());
                            if (latLngDistance <= 50){
                                data.setMark1("0");
                                collector.collect(data);
                            }else{
                                data.setMark1("2");
                                collector.collect(data);
                                longiState.update(data.getLongitude());
                                latiState.update(data.getLatitude());
                            }

                        }
                    }

                }).uid("track_process").name("track_process");

    }

    private static SingleOutputStreamOperator<UserTrackTableDataDWD> etlOdsData(SingleOutputStreamOperator<UserTrackTableDataDWD> odsData) {
        return odsData.filter(pojo -> {
            return pojo.getTrack_time() != null && pojo.getProject_id() != null
                    && pojo.getUser_id() != null && !"ZYT".equals(pojo.getCheck_type())
                    && pojo.getLongitude() != 0 && pojo.getLatitude() != 0;
        }).uid("track_filter").name("track_filter");
    }

    private static SingleOutputStreamOperator<UserTrackTableDataDWD> readTableData(StreamExecutionEnvironment env) {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(StaticEntity.MYSQL_URL)
                .port(StaticEntity.MYSQL_PORT)
                .databaseList(StaticEntity.MYSQL_DATABASE)
                .tableList(StaticEntity.MYSQL_DATABASE + ".biz_user_track")
                .username(StaticEntity.MYSQL_USERNAME)
                .password(StaticEntity.MYSQL_PASSWORD)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        return env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").uid("track_source").name("track_source")
                .map(json -> {
                    JSONObject obj = JSON.parseObject(json);
                    return obj.getObject("after", UserTrackTableDataDWD.class);
                }).uid("track_map").name("track_map");
    }
}

运行结果及报错内容
java.lang.RuntimeException: One or more fetchers have encountered exception
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:305) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342]
    ... 1 more
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1668071717703,db=,server_id=0,file=mysql-bin.096881,pos=53976934,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
    at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:180) ~[?:?]
    at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113) ~[?:?]
    at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93) ~[?:?]
    at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65) ~[?:?]
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:163) ~[?:?]
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:73) ~[?:?]
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:140) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342]
    ... 1 more

我的解答思路和尝试过的方法

暂无

我想要达到的结果

flinkjob一直运行

  • 写回答

1条回答 默认 最新

  • 三千烦恼丝xzh 2022-11-12 14:18
    关注

    我觉得你可能日志定位错了,看逻辑应该只有去重算子有状态会造成CK超时状态过大崩溃重启,重启之后逻辑就类似你这个日志

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 11月26日
  • 已采纳回答 11月18日
  • 创建了问题 11月11日

悬赏问题

  • ¥15 Python语言实验
  • ¥15 SAP HANA SQL 增加合计行
  • ¥20 用C#语言解决一个英文打字练习器,有偿
  • ¥15 srs-sip外部服务 webrtc支持H265格式
  • ¥15 在使用abaqus软件中,继承到assembly里的surfaces怎么使用python批量调动
  • ¥15 大一C语言期末考试,求帮助🙏🙏
  • ¥15 ch340驱动未分配COM
  • ¥15 Converting circular structure to JSON
  • ¥30 Hyper-v虚拟机相关问题,求解答。
  • ¥15 TSM320F2808PZA芯片 Bootloader