数据湖部署,要使用flink流式从rabbitmq接收json数据,传输进入minion存储桶进行存储。构建flink提交的maven项目并打包成jar文件进行提交,报错大致意思是找不到flink-sql-connector-rabbitmq连接器,但是我在pom文件中已经引入了flink-sql-connector-rabbitmq依赖,可能是什么原因。flink-sql-connector-rabbitmq的具体用法是什么?使用的连接器版本不匹配是因为其发行版没有对应flink版本的最高版本。
下面是我的pom文件,去除了其他依赖引用,只保留两个因为其他依赖不保留代码也不报错,但是官网给出的使用要有其他的依赖,可能也是问题原因之一。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>10</maven.compiler.source>
<maven.compiler.target>10</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.19.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-rabbitmq</artifactId>
<version>1.15.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>10</source>
<target>10</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.FlinkRabbitMQToMinIO</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>alimaven2</id>
<name>aliyun maven2</name>
<url>https://repo1.maven.org/maven2/org/apache/flink/</url>
</repository>
<repository>
<id>central</id>
<url>https://maven.aliyun.com/repository/central</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
</project>
下面这段是功能部分代码,具体信息已被指代
```java
package org.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
public class FlinkRabbitMQToMinIO {
public static void main(String[] args) throws Exception {
runFlinkJob();
}
public static void runFlinkJob() throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode() // 指定流模式
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 创建 RabbitMQ 表作为接收表
String createReceiveTableSQL = "CREATE TEMPORARY TABLE hudi_table ("
+ "unit STRING,"
+ "c_value DOUBLE,"
+ "c_time STRING,"
+ "c_damid INT,"
+ "point_name STRING"
+ ") WITH ("
+ "'connector' = 'rabbitmq', "
+ "'queue-name' = 'hudi_queue', "
+ "'host' = 'YOUR_RABBITMQ_HOST', " // 替换为 RabbitMQ 的主机名或 IP
+ "'port' = 'YOUR_RABBITMQ_PORT', " // 替换为 RabbitMQ 的端口
+ "'format' = 'json' "
+ ")";
tableEnv.executeSql(createReceiveTableSQL);
// 创建 MinIO 表作为写入表
String createWriteTableSQL = "CREATE TEMPORARY TABLE minio_table ("
+ "unit STRING,"
+ "c_value DOUBLE,"
+ "c_time STRING,"
+ "c_damid STRING,"
+ "point_name STRING"
+ ") WITH ("
+ "'connector' = 'iceberg', "
+ "'path' = 's3a://YOUR_BUCKET_PATH/', " // 替换为您 MinIO 的存储路径
+ "'format' = 'json', "
+ "'bucket' = 'YOUR_BUCKET_NAME', " // 替换为您的存储桶名称
+ "'access-key' = 'YOUR_ACCESS_KEY', " // 替换为您的 MinIO Access Key
+ "'secret-key' = 'YOUR_SECRET_KEY', " // 替换为您的 MinIO Secret Key
+ "'endpoint' = 'YOUR_MINIO_ENDPOINT', " // 替换为 MinIO 的端点
+ "'write-mode' = 'append'" // 写入模式为追加
+ ")";
tableEnv.executeSql(createWriteTableSQL);
// 从接收表选择数据并插入到 MinIO 表
String insertIntoMinioSQL = "INSERT INTO minio_table SELECT * FROM hudi_table";
TableResult insertResult = tableEnv.executeSql(insertIntoMinioSQL);
insertResult.wait();
// 启动 Flink
env.execute("Flink RabbitMQ to MinIO Example");
}
}
下面是提交报错信息,关键信息已指代
Server Response Message:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
... 2 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
... 2 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
... 2 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.hudi_table'.
Table options are:
'connector'='rabbitmq'
'format'='json'
'host'='YOUR_RABBITMQ_HOST'
'port'='YOUR_RABBITMQ_PORT'
'queue-name'='hudi_queue'
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
... 4 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.hudi_table'.
Table options are:
'connector'='rabbitmq'
'format'='json'
'host'='YOUR_RABBITMQ_HOST'
'port'='YOUR_RABBITMQ_PORT'
'queue-name'='hudi_queue'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:228)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:253)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4033)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2903)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2463)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2377)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2322)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:729)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:715)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3879)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:619)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:230)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
at org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:74)
at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:270)
at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNodeOrFail(SqlNodeToOperationConversion.java:386)
at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertSqlInsert(SqlNodeToOperationConversion.java:741)
at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:349)
at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:260)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
at org.example.FlinkRabbitMQToMinIO.runFlinkJob(FlinkRabbitMQToMinIO.java:61)
at org.example.FlinkRabbitMQToMinIO.main(FlinkRabbitMQToMinIO.java:11)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 7 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='rabbitmq'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:807)
at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:781)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:224)
... 38 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'rabbitmq' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:617)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:803)
... 40 more
```