已知所有kafka里topic为固定格式的json,目前想用flink处理所有topic里的数据,并且写入第二个kafka,sink的topic和source的topic一致,如何实现?
1条回答 默认 最新
- 小明同学YYDS 2020-01-06 16:13关注
DEMO
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val inputStream = env.addSource(new FlinkKafkaConsumer010[String]("sensor", new SimpleStringSchema(), properties)) // Transform操作 val dataStream = sourceStream.map(data => data.toString) // sink dataStream.addSink( new FlinkKafkaProducer010[String]("mym-sink", new SimpleStringSchema(), properties)) dataStream.print("send to kafka") env.execute("kafka data process") }
本回答被题主选为最佳回答 , 对您是否有帮助呢?解决评论 打赏 举报无用 1
悬赏问题
- ¥15 用matlab 设计一个不动点迭代法求解非线性方程组的代码
- ¥15 牛顿斯科特系数表表示
- ¥15 arduino 步进电机
- ¥20 程序进入HardFault_Handler
- ¥15 oracle集群安装出bug
- ¥15 关于#python#的问题:自动化测试
- ¥20 问题请教!vue项目关于Nginx配置nonce安全策略的问题
- ¥15 教务系统账号被盗号如何追溯设备
- ¥20 delta降尺度方法,未来数据怎么降尺度
- ¥15 c# 使用NPOI快速将datatable数据导入excel中指定sheet,要求快速高效