首席IT民工 2019-10-30 14:39 采纳率: 100%
浏览 2795
已采纳

Flink如何将kafka里的消息写入到对应的topic

已知所有kafka里topic为固定格式的json,目前想用flink处理所有topic里的数据,并且写入第二个kafka,sink的topic和source的topic一致,如何实现?

  • 写回答

1条回答 默认 最新

  • 小明同学YYDS 2020-01-06 16:13
    关注

    DEMO

      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "localhost:9092")
        properties.setProperty("group.id", "consumer-group")
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("auto.offset.reset", "latest")
    
        val inputStream = env.addSource(new FlinkKafkaConsumer010[String]("sensor", new SimpleStringSchema(), properties))
        // Transform操作
        val dataStream = sourceStream.map(data => data.toString)
    
        // sink
        dataStream.addSink( new FlinkKafkaProducer010[String]("mym-sink", new SimpleStringSchema(), properties))
        dataStream.print("send to kafka")
    
        env.execute("kafka data process")
      }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 已采纳回答 5月5日

悬赏问题

  • ¥15 DIFY API Endpoint 问题。
  • ¥20 sub地址DHCP问题
  • ¥15 delta降尺度计算的一些细节,有偿
  • ¥15 Arduino红外遥控代码有问题
  • ¥15 数值计算离散正交多项式
  • ¥30 数值计算均差系数编程
  • ¥15 redis-full-check比较 两个集群的数据出错
  • ¥15 Matlab编程问题
  • ¥15 训练的多模态特征融合模型准确度很低怎么办
  • ¥15 kylin启动报错log4j类冲突