野生学鸠 2024-09-10 17:52 采纳率: 33.3%
浏览 13
已结题

flink-sql-connector-rabbitmq使用

数据湖部署,要使用flink流式从rabbitmq接收json数据,传输进入minion存储桶进行存储。构建flink提交的maven项目并打包成jar文件进行提交,报错大致意思是找不到flink-sql-connector-rabbitmq连接器,但是我在pom文件中已经引入了flink-sql-connector-rabbitmq依赖,可能是什么原因。flink-sql-connector-rabbitmq的具体用法是什么?使用的连接器版本不匹配是因为其发行版没有对应flink版本的最高版本。

下面是我的pom文件,去除了其他依赖引用,只保留两个因为其他依赖不保留代码也不报错,但是官网给出的使用要有其他的依赖,可能也是问题原因之一。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>10</maven.compiler.source>
        <maven.compiler.target>10</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.19.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-rabbitmq</artifactId>
            <version>1.15.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>10</source>
                    <target>10</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.FlinkRabbitMQToMinIO</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>alimaven2</id>
            <name>aliyun maven2</name>
            <url>https://repo1.maven.org/maven2/org/apache/flink/</url>
        </repository>
        <repository>
            <id>central</id>
            <url>https://maven.aliyun.com/repository/central</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
</project>



下面这段是功能部分代码,具体信息已被指代


```java
package org.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

public class FlinkRabbitMQToMinIO {

    public static void main(String[] args) throws Exception {
        runFlinkJob();
    }

    public static void runFlinkJob() throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode() // 指定流模式
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 创建 RabbitMQ 表作为接收表
        String createReceiveTableSQL = "CREATE TEMPORARY TABLE hudi_table ("
                + "unit STRING,"
                + "c_value DOUBLE,"
                + "c_time STRING,"
                + "c_damid INT,"
                + "point_name STRING"
                + ") WITH ("
                + "'connector' = 'rabbitmq', "
                + "'queue-name' = 'hudi_queue', "
                + "'host' = 'YOUR_RABBITMQ_HOST', " // 替换为 RabbitMQ 的主机名或 IP
                + "'port' = 'YOUR_RABBITMQ_PORT', " // 替换为 RabbitMQ 的端口
                + "'format' = 'json' "
                + ")";

        tableEnv.executeSql(createReceiveTableSQL);

        // 创建 MinIO 表作为写入表
        String createWriteTableSQL = "CREATE TEMPORARY TABLE minio_table ("
                + "unit STRING,"
                + "c_value DOUBLE,"
                + "c_time STRING,"
                + "c_damid STRING,"
                + "point_name STRING"
                + ") WITH ("
                + "'connector' = 'iceberg', "
                + "'path' = 's3a://YOUR_BUCKET_PATH/', " // 替换为您 MinIO 的存储路径
                + "'format' = 'json', "
                + "'bucket' = 'YOUR_BUCKET_NAME', " // 替换为您的存储桶名称
                + "'access-key' = 'YOUR_ACCESS_KEY', " // 替换为您的 MinIO Access Key
                + "'secret-key' = 'YOUR_SECRET_KEY', " // 替换为您的 MinIO Secret Key
                + "'endpoint' = 'YOUR_MINIO_ENDPOINT', " // 替换为 MinIO 的端点
                + "'write-mode' = 'append'"  // 写入模式为追加
                + ")";

        tableEnv.executeSql(createWriteTableSQL);

        // 从接收表选择数据并插入到 MinIO 表
        String insertIntoMinioSQL = "INSERT INTO minio_table SELECT * FROM hudi_table";
        TableResult insertResult = tableEnv.executeSql(insertIntoMinioSQL);
        insertResult.wait();

        // 启动 Flink 
        env.execute("Flink RabbitMQ to MinIO Example");
    }
}



下面是提交报错信息,关键信息已指代

Server Response Message:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    ... 2 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    ... 2 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
    ... 2 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.hudi_table'.

Table options are:

'connector'='rabbitmq'
'format'='json'
'host'='YOUR_RABBITMQ_HOST' 
'port'='YOUR_RABBITMQ_PORT'
'queue-name'='hudi_queue'
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
    ... 4 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.hudi_table'.

Table options are:

'connector'='rabbitmq'
'format'='json'
'host'='YOUR_RABBITMQ_HOST' 
'port'='YOUR_RABBITMQ_PORT'
'queue-name'='hudi_queue'
   at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:228)
    at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:253)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
    at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4033)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2903)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2463)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2377)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2322)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:729)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:715)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3879)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:619)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:230)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
    at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
    at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
    at org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:74)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:270)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNodeOrFail(SqlNodeToOperationConversion.java:386)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertSqlInsert(SqlNodeToOperationConversion.java:741)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:349)
    at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:260)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
    at org.example.FlinkRabbitMQToMinIO.runFlinkJob(FlinkRabbitMQToMinIO.java:61)
    at org.example.FlinkRabbitMQToMinIO.main(FlinkRabbitMQToMinIO.java:11)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    ... 7 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='rabbitmq'
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:807)
    at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:781)
    at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:224)
    ... 38 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'rabbitmq' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:617)
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:803)
    ... 40 more

```

  • 写回答

18条回答 默认 最新

  • 专家-郭老师 Java领域新星创作者 2024-09-10 17:54
    关注

    🕒下午好,题主!目前:🟢[在线]
    📢参考通义千问、由郭老师整理提供!
    📣今日诗号:死生契阔,与子成说。执子之手,与子偕老。

    根据您提供的代码和错误信息,问题可能出在以下几个方面:

    1. RabbitMQ连接器版本不匹配:您使用的flink-sql-connector-rabbitmq版本是1.5.1,而您的Flink版本是1.19.0。请确保您使用的RabbitMQ连接器版本与您的Flink版本兼容。您可以查看官方文档或Maven仓库以获取正确的版本信息。

    2. 缺少依赖:虽然您已经添加了flink-sql-connector-rabbitmq依赖,但可能还需要其他依赖。请检查官方文档以获取完整的依赖列表,并确保它们都已添加到您的pom.xml文件中。

    3. 配置错误:请检查您的RabbitMQ连接字符串是否正确,包括主机名、端口号和队列名称。同时,确保您的MinIO连接字符串也正确无误。

    4. 运行时环境问题:确保您的Flink集群已正确配置并运行正常。此外,检查您的网络设置以确保可以从您的应用程序访问RabbitMQ和MinIO。

    如果您仍然遇到问题,建议您尝试以下步骤:

    1. 更新flink-sql-connector-rabbitmq的版本,确保与您的Flink版本兼容。

    2. 检查并添加所有必要的依赖项到您的pom.xml文件中。

    3. 仔细检查您的RabbitMQ和MinIO连接字符串,确保它们是正确的。

    4. 确保您的Flink集群正在运行,并且可以从您的应用程序访问RabbitMQ和MinIO。

    如果问题仍然存在,建议您查阅Flink和RabbitMQ的官方文档,或者寻求社区支持以获取更多帮助。

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 9月11日
  • 修改了问题 9月10日
  • 创建了问题 9月10日

悬赏问题

  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见
  • ¥15 一共有五道问题关于整数幂的运算还有房间号码 还有网络密码的解答?(语言-python)
  • ¥20 sentry如何捕获上传Android ndk 崩溃
  • ¥15 在做logistic回归模型限制性立方条图时候,不能出完整图的困难
  • ¥15 G0系列单片机HAL库中景园gc9307液晶驱动芯片无法使用硬件SPI+DMA驱动,如何解决?