野生学鸠 2024-09-10 17:52 采纳率: 33.3%
浏览 13
已结题

flink-sql-connector-rabbitmq使用

数据湖部署,要使用flink流式从rabbitmq接收json数据,传输进入minion存储桶进行存储。构建flink提交的maven项目并打包成jar文件进行提交,报错大致意思是找不到flink-sql-connector-rabbitmq连接器,但是我在pom文件中已经引入了flink-sql-connector-rabbitmq依赖,可能是什么原因。flink-sql-connector-rabbitmq的具体用法是什么?使用的连接器版本不匹配是因为其发行版没有对应flink版本的最高版本。

下面是我的pom文件,去除了其他依赖引用,只保留两个因为其他依赖不保留代码也不报错,但是官网给出的使用要有其他的依赖,可能也是问题原因之一。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>10</maven.compiler.source>
        <maven.compiler.target>10</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.19.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-rabbitmq</artifactId>
            <version>1.15.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>10</source>
                    <target>10</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.FlinkRabbitMQToMinIO</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>alimaven2</id>
            <name>aliyun maven2</name>
            <url>https://repo1.maven.org/maven2/org/apache/flink/</url>
        </repository>
        <repository>
            <id>central</id>
            <url>https://maven.aliyun.com/repository/central</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
</project>



下面这段是功能部分代码,具体信息已被指代


```java
package org.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

public class FlinkRabbitMQToMinIO {

    public static void main(String[] args) throws Exception {
        runFlinkJob();
    }

    public static void runFlinkJob() throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode() // 指定流模式
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 创建 RabbitMQ 表作为接收表
        String createReceiveTableSQL = "CREATE TEMPORARY TABLE hudi_table ("
                + "unit STRING,"
                + "c_value DOUBLE,"
                + "c_time STRING,"
                + "c_damid INT,"
                + "point_name STRING"
                + ") WITH ("
                + "'connector' = 'rabbitmq', "
                + "'queue-name' = 'hudi_queue', "
                + "'host' = 'YOUR_RABBITMQ_HOST', " // 替换为 RabbitMQ 的主机名或 IP
                + "'port' = 'YOUR_RABBITMQ_PORT', " // 替换为 RabbitMQ 的端口
                + "'format' = 'json' "
                + ")";

        tableEnv.executeSql(createReceiveTableSQL);

        // 创建 MinIO 表作为写入表
        String createWriteTableSQL = "CREATE TEMPORARY TABLE minio_table ("
                + "unit STRING,"
                + "c_value DOUBLE,"
                + "c_time STRING,"
                + "c_damid STRING,"
                + "point_name STRING"
                + ") WITH ("
                + "'connector' = 'iceberg', "
                + "'path' = 's3a://YOUR_BUCKET_PATH/', " // 替换为您 MinIO 的存储路径
                + "'format' = 'json', "
                + "'bucket' = 'YOUR_BUCKET_NAME', " // 替换为您的存储桶名称
                + "'access-key' = 'YOUR_ACCESS_KEY', " // 替换为您的 MinIO Access Key
                + "'secret-key' = 'YOUR_SECRET_KEY', " // 替换为您的 MinIO Secret Key
                + "'endpoint' = 'YOUR_MINIO_ENDPOINT', " // 替换为 MinIO 的端点
                + "'write-mode' = 'append'"  // 写入模式为追加
                + ")";

        tableEnv.executeSql(createWriteTableSQL);

        // 从接收表选择数据并插入到 MinIO 表
        String insertIntoMinioSQL = "INSERT INTO minio_table SELECT * FROM hudi_table";
        TableResult insertResult = tableEnv.executeSql(insertIntoMinioSQL);
        insertResult.wait();

        // 启动 Flink 
        env.execute("Flink RabbitMQ to MinIO Example");
    }
}



下面是提交报错信息,关键信息已指代

Server Response Message:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    ... 2 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    ... 2 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
    ... 2 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.hudi_table'.

Table options are:

'connector'='rabbitmq'
'format'='json'
'host'='YOUR_RABBITMQ_HOST' 
'port'='YOUR_RABBITMQ_PORT'
'queue-name'='hudi_queue'
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
    ... 4 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.hudi_table'.

Table options are:

'connector'='rabbitmq'
'format'='json'
'host'='YOUR_RABBITMQ_HOST' 
'port'='YOUR_RABBITMQ_PORT'
'queue-name'='hudi_queue'
   at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:228)
    at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:253)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
    at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4033)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2903)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2463)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2377)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2322)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:729)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:715)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3879)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:619)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:230)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
    at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
    at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
    at org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:74)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:270)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNodeOrFail(SqlNodeToOperationConversion.java:386)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertSqlInsert(SqlNodeToOperationConversion.java:741)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:349)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:260)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
    at org.example.FlinkRabbitMQToMinIO.runFlinkJob(FlinkRabbitMQToMinIO.java:61)
    at org.example.FlinkRabbitMQToMinIO.main(FlinkRabbitMQToMinIO.java:11)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    ... 7 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='rabbitmq'
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:807)
    at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:781)
    at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:224)
    ... 38 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'rabbitmq' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:617)
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:803)
    ... 40 more

```

  • 写回答

18条回答 默认 最新

  • 专家-郭老师 Java领域新星创作者 2024-09-10 17:54
    关注

    🕒下午好,题主!目前:🟢[在线]
    📢参考通义千问、由郭老师整理提供!
    📣今日诗号:死生契阔,与子成说。执子之手,与子偕老。

    根据您提供的代码和错误信息,问题可能出在以下几个方面:

    1. RabbitMQ连接器版本不匹配:您使用的flink-sql-connector-rabbitmq版本是1.5.1,而您的Flink版本是1.19.0。请确保您使用的RabbitMQ连接器版本与您的Flink版本兼容。您可以查看官方文档或Maven仓库以获取正确的版本信息。

    2. 缺少依赖:虽然您已经添加了flink-sql-connector-rabbitmq依赖,但可能还需要其他依赖。请检查官方文档以获取完整的依赖列表,并确保它们都已添加到您的pom.xml文件中。

    3. 配置错误:请检查您的RabbitMQ连接字符串是否正确,包括主机名、端口号和队列名称。同时,确保您的MinIO连接字符串也正确无误。

    4. 运行时环境问题:确保您的Flink集群已正确配置并运行正常。此外,检查您的网络设置以确保可以从您的应用程序访问RabbitMQ和MinIO。

    如果您仍然遇到问题,建议您尝试以下步骤:

    1. 更新flink-sql-connector-rabbitmq的版本,确保与您的Flink版本兼容。

    2. 检查并添加所有必要的依赖项到您的pom.xml文件中。

    3. 仔细检查您的RabbitMQ和MinIO连接字符串,确保它们是正确的。

    4. 确保您的Flink集群正在运行,并且可以从您的应用程序访问RabbitMQ和MinIO。

    如果问题仍然存在,建议您查阅Flink和RabbitMQ的官方文档,或者寻求社区支持以获取更多帮助。

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 9月11日
  • 修改了问题 9月10日
  • 创建了问题 9月10日

悬赏问题

  • ¥15 微信小程序 用oss下载 aliyun-oss-sdk-6.18.0.min client报错
  • ¥15 ArcGIS批量裁剪
  • ¥15 labview程序设计
  • ¥15 为什么在配置Linux系统的时候执行脚本总是出现E: Failed to fetch http:L/cn.archive.ubuntu.com
  • ¥15 Cloudreve保存用户组存储空间大小时报错
  • ¥15 伪标签为什么不能作为弱监督语义分割的结果?
  • ¥15 编一个判断一个区间范围内的数字的个位数的立方和是否等于其本身的程序在输入第1组数据后卡住了(语言-c语言)
  • ¥15 Mac版Fiddler Everywhere4.0.1提示强制更新
  • ¥15 android 集成sentry上报时报错。
  • ¥15 抖音看过的视频,缓存在哪个文件