你好按钮啊 2023-01-17 14:16 采纳率: 40%
浏览 98
已结题

Flink RocketMQ连接器

需要有RocketMQ的Flink Connector

   目前因为有接入外部门RocketMQ数据的需求,目前关于Flink并没有官方的RocketMQ,同时为了兼容未来大数据平台的需求,需要有DataStream和FlinkSQL两种方式。

目前找了一个开源的库,并安装到本场Maven仓库并引入了。

   Flink RocketMQ Connector的GitHub地址:https://github.com/apache/rocketmq-flink。
   目前我把这个资源安装到本地的仓库。通过POM引用。

代码

public class RocketMqSourceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        // enable checkpoint
        env.enableCheckpointing(3000);
        Properties consumerProps = new Properties();
        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "srvbd59.net.cn:9876");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "group1");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "rocketTopic");
        RocketMQSourceFunction<String> source = new RocketMQSourceFunction<String>(new SimpleStringDeserializationSchema(), consumerProps);
        source.setStartFromGroupOffsets(OffsetResetStrategy.EARLIEST);
        SingleOutputStreamOperator<String> map = env.addSource(source).map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });
        map.print();
        env.execute("rocketmq-flink-example1");
    }
}

运行结果及详细报错内容

我向Topic投递消息之后,普通的RocketMQConsumer可以消费到数据。但通过Flink RocketMQ Connector(即上面的代码)没有消费打印的数据,也没有抛出错误。

解答思路

控制台也打印了一些报错。但是是和日志相关的错误。代码相关异常并没有,因此无从排查。个人感觉可以通过日志框架看看,是否可以通过日志框架打印可以排查的信息。

我想要达到的结果。

目前我的Flink版本是1.14.5。希望基于这个版本。尝试引入Flink RocketMQ Connector。
1:需要DataStream 和FlinkSql两种都可以实现。
2:资源可以安装到本地仓库。通过POM的方式引用。

  • 写回答

3条回答 默认 最新

  • heart_6662 2023-01-17 14:53
    关注

    望采纳!!!点击回答右侧采纳即可!!
    我猜测可能是因为以下几个原因,可以检测一下:

    Topic 配置问题: 请确保 RocketMQ 中的 topic 名称和 consumerProps 中配置的 topic 名称一致。

    NameServer 配置问题: 请确保 consumerProps 中 NameServer 地址配置正确。

    RocketMQ 版本问题: 请确保 RocketMQ Connector 和您使用的 RocketMQ 版本兼容。

    ConsumerGroup 配置问题: 请确保 consumerProps 中 consumerGroup 配置正确,如果没有配置请确保消费组与生产者的消息对应。

    评论

报告相同问题?

问题事件

  • 系统已结题 1月25日
  • 创建了问题 1月17日

悬赏问题

  • ¥15 有偿求苍穹外卖环境配置
  • ¥15 代码在keil5里变成了这样怎么办啊,文件图像也变了,
  • ¥20 Ue4.26打包win64bit报错,如何解决?(语言-c++)
  • ¥15 clousx6整点报时指令怎么写
  • ¥30 远程帮我安装软件及库文件
  • ¥15 关于#自动化#的问题:如何通过电脑控制多相机同步拍照或摄影(相机或者摄影模组数量大于60),并将所有采集的照片或视频以一定编码规则存放至规定电脑文件夹内
  • ¥20 深信服vpn-2050这台设备如何配置才能成功联网?
  • ¥15 Arduino的wifi连接,如何关闭低功耗模式?
  • ¥15 Android studio 无法定位adb是什么问题?
  • ¥15 C#连接不上服务器,