已知所有kafka里topic为固定格式的json,目前想用flink处理所有topic里的数据,并且写入第二个kafka,sink的topic和source的topic一致,如何实现?
1条回答 默认 最新
- 小明同学YYDS 2020-01-06 16:13关注
DEMO
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val inputStream = env.addSource(new FlinkKafkaConsumer010[String]("sensor", new SimpleStringSchema(), properties)) // Transform操作 val dataStream = sourceStream.map(data => data.toString) // sink dataStream.addSink( new FlinkKafkaProducer010[String]("mym-sink", new SimpleStringSchema(), properties)) dataStream.print("send to kafka") env.execute("kafka data process") }
本回答被题主选为最佳回答 , 对您是否有帮助呢?解决评论 打赏 举报无用 1