按照flink的例子改了一下,从rabbitmq读取流数据,通过RichSinkFunction写入到clickhouse中。
第一次运行是正常的,多次写入也没问题。但是cancel job之后,重新submit job,连接clickhouse时就会报找不到clickhouse的驱动。
之后就一直不正常,必须重启flink的taskManager之后,再submit job就正常了。
然后再cancel job就又不行了。又得重启taskManager。
job代码如下:
DataStreamSink<MQModel> rmqSource = env.addSource(
new RMQSource<MQModel>(rmqConn,"Mysql2Flink", new AbstractDeserializationSchema<MQModel>(){
@Override
public MQModel deserialize(byte[] bytes) throws IOException{
Gson gson = new Gson();
String str = new String(bytes);
MQModel model = gson.fromJson(str,MQModel.class);
return model;
}
})
).setParallelism(1).addSink(new TestSink()).name("ClickHouseSink");
RichSink代码如下:
public class TestSink extends RichSinkFunction<FraudDetectionJob.MQModel> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
Connection connection = null;
final String sqlTemplate = "insert into test_flink.test_1 values(%s,'!%s!')";
public TestSink() {
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String user = "default";
String pwd = "12345678";
connection = DriverManager.getConnection("jdbc:clickhouse://192.168.0.123:8123", user,pwd);
logger.info("clickhouse连接成功");
}
@Override
public void close() throws Exception {
connection.close();
logger.info("clickhouse连接关闭");
super.close();
}
@Override
public void invoke(FraudDetectionJob.MQModel msg, Context context) throws Exception {
long startTime = System.currentTimeMillis();
int count = 0;
//数据写入clickhouse
String sql = String.format(sqlTemplate,msg.getId(),msg.getName());
Statement statement = connection.createStatement();
try{
logger.info(sql);
statement.execute(sql);
logger.info("数据 {} 插入完成。", msg.getId());
connection.commit();
count++;
}
catch (Exception ex){
logger.error("clickhouse写入异常:");
ex.printStackTrace();
connection.rollback();
}
finally {
statement.close();
}
long endTime = System.currentTimeMillis();
logger.info("批量插入完毕用时:" + (endTime - startTime) + " -- 插入记录数:" + count);
}
}
taskManager关键日志如下:
【启动时加载了clickhouse-jdbc】
2021-08-31 16:08:13,462 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Classpath: /soft/flink-1.13.1/lib/clickhouse-jdbc-0.3.1.jar:/soft/flink-1.13.1/lib/flink-csv-1.13.1.jar:/soft/flink-1.13.1/lib/flink-jdbc_2.11-1.10.1.jar:/soft/flink-1.13.1/lib/flink-jdbc_2.12-1.10.3.jar:/soft/flink-1.13.1/lib/flink-json-1.13.1.jar:/soft/flink-1.13.1/lib/flink-shaded-zookeeper-3.4.14.jar:/soft/flink-1.13.1/lib/flink-table_2.11-1.13.1.jar:/soft/flink-1.13.1/lib/flink-table-blink_2.11-1.13.1.jar:/soft/flink-1.13.1/lib/log4j-1.2-api-2.12.1.jar:/soft/flink-1.13.1/lib/log4j-api-2.12.1.jar:/soft/flink-1.13.1/lib/log4j-core-2.12.1.jar:/soft/flink-1.13.1/lib/log4j-slf4j-impl-2.12.1.jar:/soft/flink-1.13.1/lib/flink-dist_2.11-1.13.1.jar:
【第一次运行,正常】
2021-08-31 16:08:50,055 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f), deploy into slot with allocation id 0972334ef2e4fd3555143e856b2ded79.
2021-08-31 16:08:50,061 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from CREATED to DEPLOYING.
2021-08-31 16:08:50,065 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) [DEPLOYING].
2021-08-31 16:08:50,066 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading a8d7727d7c2fd99efa9fbd2032793b42/p-7d60fbf4740e397406eb4485e3e0c1844c88b691-24f677820dcc4cb77fde38125d62255f from localhost/127.0.0.1:40443
2021-08-31 16:08:50,266 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@79662da9
2021-08-31 16:08:50,268 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager'
2021-08-31 16:08:50,278 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from DEPLOYING to INITIALIZING.
2021-08-31 16:08:50,382 INFO ru.yandex.clickhouse.ClickHouseDriver [] - Driver registered
2021-08-31 16:08:50,517 INFO spendreport.TestSink [] - clickhouse连接成功
2021-08-31 16:08:50,524 INFO org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase [] - No state to restore for the RMQSource.
2021-08-31 16:08:50,590 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from INITIALIZING to RUNNING.
2021-08-31 16:15:13,827 INFO spendreport.TestSink [] - insert into test_hht_flink.test_1 values(-4,'!奇奇怪怪的flink!')
2021-08-31 16:15:13,834 INFO spendreport.TestSink [] - 数据 -4 插入完成。
2021-08-31 16:15:13,835 INFO spendreport.TestSink [] - 批量插入完毕用时:9 -- 插入记录数:1
【关闭job的日志】
2021-09-03 10:18:25,097 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f).
2021-09-03 10:18:25,097 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from RUNNING to CANCELING.
2021-09-03 10:18:25,097 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f).
2021-09-03 10:18:25,112 INFO spendreport.TestSink [] - clickhouse连接关闭
2021-09-03 10:18:25,120 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from CANCELING to CANCELED.
2021-09-03 10:18:25,121 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f).
【第二次启动,挂了】
2021-09-03 10:19:07,514 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4), deploy into slot with allocation id efe7da81d6989155e4736ea027874895.
2021-09-03 10:19:07,515 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4) switched from CREATED to DEPLOYING.
2021-09-03 10:19:07,520 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4) [DEPLOYING].
2021-09-03 10:19:07,520 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading ae3a86a6b894c4ffa46d9079ea358ed8/p-7d60fbf4740e397406eb4485e3e0c1844c88b691-8f8598d8948c00cb5845c062268317de from localhost/127.0.0.1:40443
2021-09-03 10:19:07,598 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@40480bc8
2021-09-03 10:19:07,598 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager'
2021-09-03 10:19:07,598 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4) switched from DEPLOYING to INITIALIZING.
2021-09-03 10:19:07,622 INFO ru.yandex.clickhouse.ClickHouseDriver [] - Driver registered
2021-09-03 10:19:07,623 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4) switched from INITIALIZING to FAILED with failure cause: java.sql.SQLException: No suitable driver found for jdbc:clickhouse://192.168.0.123:8123
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at spendreport.TestSink.open(TestSink.java:29)
后面无论submit多少次,都是缺少驱动。
如果把写入clickhouse的代码写在KeyedProcessFunction里面,一切都是正常的,无论怎么折腾。