是这样的,现在我要从来自Kafka的20多个topic中消费出数据,每个topic对应Clickhouse里面的一个表。
但是Clickhouse的JDBC批量插入只支持预编译SQL,即每个 PrepareStatement对象只能批量插入一个表的数据。如下:
Connection connection = getConnection();
PrepareStatement ps = connection.prepareStatement("insert into xxx values (?, ?, ?, ?)");
ps.setObject(1, xxx);
ps.setObject(2, xxx);
ps.setObject(3, xxx);
ps.addBatch();
ps.executeBatch();
ps.clearBatch();
// ......
所以,我在入库程序把每个表的入库分为不同的线程,分别维护不同的PrepareStatement对象,
入库不同的表。比如现在有20个表,我设定每个表3个线程,那么总共就有60个入库线程。
但是这样子做的话,我无法保证入库的速率稳定,因为有的表数据量大,有的因为业务开启有时较大,而分配的入库线程是固定的。各位盆友有什么解决办法吗?
ps:入库程序用的flume,用的官方的KafkaSource,然后写了一个Clickhouse的Sink,每个sink就是对应一个入库clickhouse的线程。Channel用的文件内存通道。当Kafka数据量大时,入库速率远远小于消费速率,可能导致Channel通道满,堆积大量磁盘文件,读写磁盘操作又进一步影响sink取数据,然后越来越慢。。最后Kafka都报一堆问题。
再ps:Clickhouse是6个节点的集群,三个分片,一个副本的配置。我批量插入设置150000条一次批量插入。