geniushht
2021-09-03 11:07
采纳率: 0%
浏览 18
已结题

Flink的job第二次运行会找不到clickhouse的驱动

按照flink的例子改了一下,从rabbitmq读取流数据,通过RichSinkFunction写入到clickhouse中。
第一次运行是正常的,多次写入也没问题。但是cancel job之后,重新submit job,连接clickhouse时就会报找不到clickhouse的驱动。
之后就一直不正常,必须重启flink的taskManager之后,再submit job就正常了。
然后再cancel job就又不行了。又得重启taskManager。

job代码如下:

DataStreamSink<MQModel> rmqSource = env.addSource(
        new RMQSource<MQModel>(rmqConn,"Mysql2Flink", new AbstractDeserializationSchema<MQModel>(){
            @Override
            public MQModel deserialize(byte[] bytes) throws IOException{
                Gson gson = new Gson();
                String str = new String(bytes);
                MQModel model = gson.fromJson(str,MQModel.class);
                return model;
            }
        })
).setParallelism(1).addSink(new TestSink()).name("ClickHouseSink");

RichSink代码如下:

public class TestSink extends RichSinkFunction<FraudDetectionJob.MQModel> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    Connection connection = null;
    final String sqlTemplate = "insert into test_flink.test_1 values(%s,'!%s!')";

    public TestSink() {
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        String user = "default";
        String pwd = "12345678";
        connection = DriverManager.getConnection("jdbc:clickhouse://192.168.0.123:8123", user,pwd);
        logger.info("clickhouse连接成功");

    }

    @Override
    public void close() throws Exception {
        connection.close();
        logger.info("clickhouse连接关闭");
        super.close();
    }

    @Override
    public void invoke(FraudDetectionJob.MQModel msg, Context context) throws Exception {
        long startTime = System.currentTimeMillis();
        int count = 0;

        //数据写入clickhouse
        String sql = String.format(sqlTemplate,msg.getId(),msg.getName());
        Statement statement = connection.createStatement();
        try{
            logger.info(sql);
            statement.execute(sql);
            logger.info("数据 {} 插入完成。", msg.getId());
            connection.commit();
            count++;
        }
        catch (Exception ex){
            logger.error("clickhouse写入异常:");
            ex.printStackTrace();
            connection.rollback();
        }
        finally {
            statement.close();
        }
        long endTime = System.currentTimeMillis();
        logger.info("批量插入完毕用时:" + (endTime - startTime) + " -- 插入记录数:" + count);
    }
}

taskManager关键日志如下:
【启动时加载了clickhouse-jdbc】
2021-08-31 16:08:13,462 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Classpath: /soft/flink-1.13.1/lib/clickhouse-jdbc-0.3.1.jar:/soft/flink-1.13.1/lib/flink-csv-1.13.1.jar:/soft/flink-1.13.1/lib/flink-jdbc_2.11-1.10.1.jar:/soft/flink-1.13.1/lib/flink-jdbc_2.12-1.10.3.jar:/soft/flink-1.13.1/lib/flink-json-1.13.1.jar:/soft/flink-1.13.1/lib/flink-shaded-zookeeper-3.4.14.jar:/soft/flink-1.13.1/lib/flink-table_2.11-1.13.1.jar:/soft/flink-1.13.1/lib/flink-table-blink_2.11-1.13.1.jar:/soft/flink-1.13.1/lib/log4j-1.2-api-2.12.1.jar:/soft/flink-1.13.1/lib/log4j-api-2.12.1.jar:/soft/flink-1.13.1/lib/log4j-core-2.12.1.jar:/soft/flink-1.13.1/lib/log4j-slf4j-impl-2.12.1.jar:/soft/flink-1.13.1/lib/flink-dist_2.11-1.13.1.jar:
【第一次运行,正常】
2021-08-31 16:08:50,055 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f), deploy into slot with allocation id 0972334ef2e4fd3555143e856b2ded79.
2021-08-31 16:08:50,061 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from CREATED to DEPLOYING.
2021-08-31 16:08:50,065 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) [DEPLOYING].
2021-08-31 16:08:50,066 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading a8d7727d7c2fd99efa9fbd2032793b42/p-7d60fbf4740e397406eb4485e3e0c1844c88b691-24f677820dcc4cb77fde38125d62255f from localhost/127.0.0.1:40443
2021-08-31 16:08:50,266 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@79662da9
2021-08-31 16:08:50,268 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager'
2021-08-31 16:08:50,278 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from DEPLOYING to INITIALIZING.
2021-08-31 16:08:50,382 INFO ru.yandex.clickhouse.ClickHouseDriver [] - Driver registered
2021-08-31 16:08:50,517 INFO spendreport.TestSink [] - clickhouse连接成功
2021-08-31 16:08:50,524 INFO org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase [] - No state to restore for the RMQSource.
2021-08-31 16:08:50,590 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from INITIALIZING to RUNNING.
2021-08-31 16:15:13,827 INFO spendreport.TestSink [] - insert into test_hht_flink.test_1 values(-4,'!奇奇怪怪的flink!')
2021-08-31 16:15:13,834 INFO spendreport.TestSink [] - 数据 -4 插入完成。
2021-08-31 16:15:13,835 INFO spendreport.TestSink [] - 批量插入完毕用时:9 -- 插入记录数:1
【关闭job的日志】
2021-09-03 10:18:25,097 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f).
2021-09-03 10:18:25,097 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from RUNNING to CANCELING.
2021-09-03 10:18:25,097 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f).
2021-09-03 10:18:25,112 INFO spendreport.TestSink [] - clickhouse连接关闭
2021-09-03 10:18:25,120 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f) switched from CANCELING to CANCELED.
2021-09-03 10:18:25,121 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (7d719a311c50b477d620d3cae17cc31f).
【第二次启动,挂了】
2021-09-03 10:19:07,514 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4), deploy into slot with allocation id efe7da81d6989155e4736ea027874895.
2021-09-03 10:19:07,515 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4) switched from CREATED to DEPLOYING.
2021-09-03 10:19:07,520 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4) [DEPLOYING].
2021-09-03 10:19:07,520 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading ae3a86a6b894c4ffa46d9079ea358ed8/p-7d60fbf4740e397406eb4485e3e0c1844c88b691-8f8598d8948c00cb5845c062268317de from localhost/127.0.0.1:40443
2021-09-03 10:19:07,598 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@40480bc8
2021-09-03 10:19:07,598 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager'
2021-09-03 10:19:07,598 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4) switched from DEPLOYING to INITIALIZING.
2021-09-03 10:19:07,622 INFO ru.yandex.clickhouse.ClickHouseDriver [] - Driver registered
2021-09-03 10:19:07,623 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: ClickHouseSink (1/1)#0 (50aca424be851e58d127792a5dc1a1f4) switched from INITIALIZING to FAILED with failure cause: java.sql.SQLException: No suitable driver found for jdbc:clickhouse://192.168.0.123:8123
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at spendreport.TestSink.open(TestSink.java:29)

后面无论submit多少次,都是缺少驱动。
如果把写入clickhouse的代码写在KeyedProcessFunction里面,一切都是正常的,无论怎么折腾。

  • 收藏

1条回答 默认 最新

  • 有问必答小助手 2021-09-06 11:08

    你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答


    本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。


    因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。

    打赏 评论

相关推荐 更多相似问题