你好按钮啊 2023-01-17 14:16 采纳率: 40%
浏览 120
已结题

Flink RocketMQ连接器

需要有RocketMQ的Flink Connector

   目前因为有接入外部门RocketMQ数据的需求,目前关于Flink并没有官方的RocketMQ,同时为了兼容未来大数据平台的需求,需要有DataStream和FlinkSQL两种方式。

目前找了一个开源的库,并安装到本场Maven仓库并引入了。

   Flink RocketMQ Connector的GitHub地址:https://github.com/apache/rocketmq-flink。
   目前我把这个资源安装到本地的仓库。通过POM引用。

代码

public class RocketMqSourceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        // enable checkpoint
        env.enableCheckpointing(3000);
        Properties consumerProps = new Properties();
        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "srvbd59.net.cn:9876");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "group1");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "rocketTopic");
        RocketMQSourceFunction<String> source = new RocketMQSourceFunction<String>(new SimpleStringDeserializationSchema(), consumerProps);
        source.setStartFromGroupOffsets(OffsetResetStrategy.EARLIEST);
        SingleOutputStreamOperator<String> map = env.addSource(source).map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });
        map.print();
        env.execute("rocketmq-flink-example1");
    }
}

运行结果及详细报错内容

我向Topic投递消息之后,普通的RocketMQConsumer可以消费到数据。但通过Flink RocketMQ Connector(即上面的代码)没有消费打印的数据,也没有抛出错误。

解答思路

控制台也打印了一些报错。但是是和日志相关的错误。代码相关异常并没有,因此无从排查。个人感觉可以通过日志框架看看,是否可以通过日志框架打印可以排查的信息。

我想要达到的结果。

目前我的Flink版本是1.14.5。希望基于这个版本。尝试引入Flink RocketMQ Connector。
1:需要DataStream 和FlinkSql两种都可以实现。
2:资源可以安装到本地仓库。通过POM的方式引用。

  • 写回答

3条回答 默认 最新

  • 阳光宅男xxb 2023-01-17 15:04
    关注

    检查下一下rmq/conf下的broker.conf文件是否配置正确,其次检查下代码是否遗漏。你现在没有思路,无从排查的话,那你就要看下你是对着哪里的例子或者教程写的程序,对一遍看是不是那里写错了或者遗漏了。

    package org.apache.rocketmq.flink.legacy.example;

    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.flink.legacy.RocketMQConfig;
    import org.apache.rocketmq.flink.legacy.RocketMQSink;
    import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
    import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
    import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
    import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    import java.util.Properties;

    import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
    import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;

    public class RocketMQFlinkExample {

    /**
     * Source Config
     *
     * @return properties
     */
    private static Properties getConsumerProps() {
        Properties consumerProps = new Properties();
        consumerProps.setProperty(
                RocketMQConfig.NAME_SERVER_ADDR,
                "127.0.0.1:9876");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink_consumer_test");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "SOURCE_TOPIC");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, DEFAULT_CONSUMER_TAG);
        consumerProps.setProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
        return consumerProps;
    }
    
    /**
     * Sink Config
     *
     * @return properties
     */
    private static Properties getProducerProps() {
        Properties producerProps = new Properties();
        producerProps.setProperty(
                RocketMQConfig.NAME_SERVER_ADDR,
                "127.0.0.1:9876");
        producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "flink_produce_test");
        return producerProps;
    }
    
    public static void main(String[] args) throws Exception {
    
        //final ParameterTool params = ParameterTool.fromArgs(args);
    
        // for local
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    
        // for cluster
        // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        //env.getConfig().setGlobalJobParameters(params);
        env.setStateBackend(new MemoryStateBackend());
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        // start a checkpoint every 10s
        env.enableCheckpointing(10000);
        // advanced options:
        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkpoints have to complete within one minute, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // make sure 500 ms of progress happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig()
                .enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
        Properties consumerProps = getConsumerProps();
        Properties producerProps = getProducerProps();
    
        SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
    
        DataStreamSource<Tuple2<String, String>> source =
                env.addSource(new RocketMQSourceFunction<>(schema, consumerProps))
                        .setParallelism(2);
    
        source.print();
        source.process(new SourceMapFunction())
                .process(new SinkMapFunction("SINK_TOPIC", "*"))
                .addSink(
                        new RocketMQSink(producerProps)
                                .withBatchFlushOnCheckpoint(true)
                                .withBatchSize(32)
                                .withAsync(true))
                .setParallelism(2);
    
        env.execute("rocketmq-connect-flink");
    }
    

    }

    评论

报告相同问题?

问题事件

  • 系统已结题 1月25日
  • 创建了问题 1月17日