问题遇到的现象和发生背景
用flinkcdc读取mysql数据,写入postgresql数据库,程序运行一段时间后挂了
用代码块功能插入代码,请勿粘贴截图
flink的checkpoint时间是5s,mysql的binlog保存时间是1天
package com.vanke.flink.bigdatadwd.writedwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.vanke.flink.bigdatadwd.bean.UserTrackTableDataDWD;
import com.vanke.flink.bigdatadwd.enitys.StaticEntity;
import com.vanke.flink.bigdatadwd.utils.GlobeDistance;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
public class SynchronizeBizUserTrack_test {
public static void main(String[] args) throws InterruptedException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(StaticEntity.HDFS_URL + "/hudi-warehouse/dwd/ck/flink/dwd_biz_user_track_test/checkpoints");
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
SingleOutputStreamOperator<UserTrackTableDataDWD> odsData = readTableData(env);
odsData.print();
SingleOutputStreamOperator<UserTrackTableDataDWD> etlData = etlOdsData(odsData);
SingleOutputStreamOperator<UserTrackTableDataDWD> result = useDistanceRule(etlData);
writeToPGDWD(result);
try {
env.execute("SynchronizeBizUserTrack_test11");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void writeToPGDWD(SingleOutputStreamOperator<UserTrackTableDataDWD> result) {
String sql = "insert into dwd.dwd_biz_user_track(id,project_id,source,check_type,user_id,name,longitude,latitude,create_Time," +
"track_time,tririga_user_code,label_id,dotted_flag,angle,speed,mark1,mark) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) " +
"on conflict(id) do update set project_id=excluded.project_id,source=excluded.source,check_type=excluded.check_type,user_id=excluded.user_id," +
"name=excluded.name,longitude=excluded.longitude,latitude=excluded.latitude,create_Time=excluded.create_Time,track_time=excluded.track_time," +
"tririga_user_code=excluded.tririga_user_code,label_id=excluded.label_id,dotted_flag=excluded.dotted_flag,angle=excluded.angle,speed=excluded.speed," +
"mark1=excluded.mark1,mark=excluded.mark;";
result.addSink(JdbcSink.sink(
sql,
(statement,data) ->{
statement.setString(1,data.getId());
statement.setString(2,data.getProject_id());
statement.setString(3,data.getSource());
statement.setString(4,data.getCheck_type());
statement.setString(5,data.getUser_id());
statement.setString(6,data.getName());
statement.setDouble(7,data.getLongitude());
statement.setDouble(8,data.getLatitude());
statement.setTimestamp(9,data.getCreate_time());
statement.setTimestamp(10,data.getTrack_time());
statement.setString(11,data.getTririga_user_code());
statement.setString(12,data.getLabel_id());
statement.setString(13,data.getDotted_flag());
statement.setString(14,data.getAngle());
statement.setString(15,data.getSpeed());
statement.setString(16,data.getMark1());
statement.setString(17,data.getMark());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(StaticEntity.PGSQL_URL)
.withDriverName("org.postgresql.Driver")
.withUsername(StaticEntity.PDBName)
.withPassword(StaticEntity.PDBPwssd)
.build()
)).uid("track_sink").name("track_sink");
}
private static SingleOutputStreamOperator<UserTrackTableDataDWD> useDistanceRule(SingleOutputStreamOperator<UserTrackTableDataDWD> etlData) {
return etlData.keyBy(pojo -> pojo.getUser_id())
.process(new KeyedProcessFunction<String, UserTrackTableDataDWD, UserTrackTableDataDWD>() {
private ValueState<String> dateState;
private ValueState<Double> latiState;
private ValueState<Double> longiState;
@Override
public void open(Configuration parameters) throws Exception {
longiState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("longiState", Double.class));
latiState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("latiState", Double.class));
dateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("DateState", String.class));
}
@Override
public void processElement(UserTrackTableDataDWD data, Context ctx, Collector<UserTrackTableDataDWD> collector) throws Exception {
Double firstLongi = longiState.value();
Double firstLati = latiState.value();
String yesterday = dateState.value();
long createTime = data.getCreate_time().getTime();
long trackTime = data.getTrack_time().getTime();
data.setCreate_time(new Timestamp(createTime));
data.setTrack_time(new Timestamp(trackTime));
//这里是用track_time作为数据时间
String today = new SimpleDateFormat("yyyy-MM-dd").format(trackTime);
//测试时间没有时区问题
// System.out.println(today + ":" + data.getId() + ":" + data.getTrack_time() + ":" + trackTime);
// 如果不相等,则是当天的第一条或者最开始的第一条数据
if(!today.equals(yesterday)){
dateState.update(today);
longiState.update(data.getLongitude());
latiState.update(data.getLatitude());
data.setMark1("1");
collector.collect(data);
}else{
// today 与 yesterday 相等,则说明不是第一条数据,需要与上一条数据比较,判断是否过滤
Double secondLongi = data.getLongitude();
Double secondLati = data.getLatitude();
Double latLngDistance = GlobeDistance.getLatLngDistance(firstLongi, firstLati, secondLongi, secondLati);
data.setMark(latLngDistance.toString());
if (latLngDistance <= 50){
data.setMark1("0");
collector.collect(data);
}else{
data.setMark1("2");
collector.collect(data);
longiState.update(data.getLongitude());
latiState.update(data.getLatitude());
}
}
}
}).uid("track_process").name("track_process");
}
private static SingleOutputStreamOperator<UserTrackTableDataDWD> etlOdsData(SingleOutputStreamOperator<UserTrackTableDataDWD> odsData) {
return odsData.filter(pojo -> {
return pojo.getTrack_time() != null && pojo.getProject_id() != null
&& pojo.getUser_id() != null && !"ZYT".equals(pojo.getCheck_type())
&& pojo.getLongitude() != 0 && pojo.getLatitude() != 0;
}).uid("track_filter").name("track_filter");
}
private static SingleOutputStreamOperator<UserTrackTableDataDWD> readTableData(StreamExecutionEnvironment env) {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(StaticEntity.MYSQL_URL)
.port(StaticEntity.MYSQL_PORT)
.databaseList(StaticEntity.MYSQL_DATABASE)
.tableList(StaticEntity.MYSQL_DATABASE + ".biz_user_track")
.username(StaticEntity.MYSQL_USERNAME)
.password(StaticEntity.MYSQL_PASSWORD)
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
return env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").uid("track_source").name("track_source")
.map(json -> {
JSONObject obj = JSON.parseObject(json);
return obj.getObject("after", UserTrackTableDataDWD.class);
}).uid("track_map").name("track_map");
}
}
运行结果及报错内容
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:305) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342]
... 1 more
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1668071717703,db=,server_id=0,file=mysql-bin.096881,pos=53976934,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:180) ~[?:?]
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113) ~[?:?]
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93) ~[?:?]
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65) ~[?:?]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:163) ~[?:?]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:73) ~[?:?]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:140) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342]
... 1 more
我的解答思路和尝试过的方法
暂无
我想要达到的结果
flinkjob一直运行