需要有RocketMQ的Flink Connector
目前因为有接入外部门RocketMQ数据的需求,目前关于Flink并没有官方的RocketMQ,同时为了兼容未来大数据平台的需求,需要有DataStream和FlinkSQL两种方式。
目前找了一个开源的库,并安装到本场Maven仓库并引入了。
Flink RocketMQ Connector的GitHub地址:https://github.com/apache/rocketmq-flink。
目前我把这个资源安装到本地的仓库。通过POM引用。
代码
public class RocketMqSourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
// enable checkpoint
env.enableCheckpointing(3000);
Properties consumerProps = new Properties();
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "srvbd59.net.cn:9876");
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "group1");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "rocketTopic");
RocketMQSourceFunction<String> source = new RocketMQSourceFunction<String>(new SimpleStringDeserializationSchema(), consumerProps);
source.setStartFromGroupOffsets(OffsetResetStrategy.EARLIEST);
SingleOutputStreamOperator<String> map = env.addSource(source).map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
});
map.print();
env.execute("rocketmq-flink-example1");
}
}
运行结果及详细报错内容
我向Topic投递消息之后,普通的RocketMQConsumer可以消费到数据。但通过Flink RocketMQ Connector(即上面的代码)没有消费打印的数据,也没有抛出错误。
解答思路
控制台也打印了一些报错。但是是和日志相关的错误。代码相关异常并没有,因此无从排查。个人感觉可以通过日志框架看看,是否可以通过日志框架打印可以排查的信息。
我想要达到的结果。
目前我的Flink版本是1.14.5。希望基于这个版本。尝试引入Flink RocketMQ Connector。
1:需要DataStream 和FlinkSql两种都可以实现。
2:资源可以安装到本地仓库。通过POM的方式引用。