哦豁641 2023-09-10 15:22 采纳率: 0%
浏览 4

Maya无法正常安装!

目前卡在许可环节
Adsk文件版本与pj版本符合,杀毒软件也关了,但是仍然会弹出FlexNet窗口,显示网络许可不通过……

  • 写回答

1条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2023-09-10 16:21
    关注

    引用 皆我百晓生 小程序回复内容作答:
    在 IntelliJ IDEA 上编写 Scala 代码使用 Flink 消费 Kafka 数据并存储到 Redis 中,可以按照以下步骤进行操作:

    步骤 1:设置项目依赖项

    1. 打开 IntelliJ IDEA,并创建一个新的 Scala 项目。
    2. 在项目的 build.sbt 文件中添加以下依赖项:
    name := "FlinkKafkaRedisExample"
    version := "1.0"
    scalaVersion := "2.12.8"
    
    libraryDependencies ++= Seq(
      "org.apache.flink" %% "flink-scala" % "1.10.0",
      "org.apache.flink" %% "flink-streaming-scala" % "1.10.0",
      "org.apache.flink" %% "flink-connector-kafka" % "1.10.0",
      "org.apache.flink" %% "flink-connector-redis" % "1.10.0",
      "org.apache.kafka" % "kafka-clients" % "2.4.1",
      "redis.clients" % "jedis" % "3.3.0",
      "org.apache.flink" %% "flink-json" % "1.10.0"
    )
    

    步骤 2:编写 Scala 代码
    在 src/main/scala 目录下创建一个名为 Main.scala 的文件,并将以下代码复制到文件中:

    import org.apache.flink.api.common.restartstrategy.RestartStrategies
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper
    
    object Main {
      case class SensorReading(id: String, timestamp: Long, temperature: Double)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.getConfig.setAutoWatermarkInterval(1000L)
        env.enableCheckpointing(60000L)
        env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L))
    
        val brokers = "localhost:9092"
        val topic = "sensor_data"
    
        val kafkaProperties = new java.util.Properties()
        kafkaProperties.setProperty("bootstrap.servers", brokers)
        kafkaProperties.setProperty("group.id", "flink_group")
    
        val kafkaConsumer = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), kafkaProperties)
        kafkaConsumer.setStartFromEarliest()
    
        val stream = env.addSource(kafkaConsumer)
    
        val redisConfig = new FlinkJedisConfigBase {
          override def getHost: String = "localhost"
          override def getPort: Int = 6379
        }
    
        stream
          .map(data => {
            val dataArray = data.split(",")
            SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
          })
          .addSink(new RedisSink[SensorReading](redisConfig, new RedisExampleMapper))
    
        env.execute("Flink Kafka Redis Example")
      }
    
      class RedisExampleMapper extends RedisMapper[SensorReading] {
        override def getCommandDescription: RedisCommandDescription = {
          new RedisCommandDescription(RedisCommand.HSET, "sensor_temperatures")
        }
    
        override def getKeyFromData(data: SensorReading): String = {
          data.id
        }
    
        override def getValueFromData(data: SensorReading): String = {
          data.temperature.toString
        }
      }
    }
    

    步骤 3:运行代码
    保存并运行代码,在 IntelliJ IDEA 中点击运行按钮或使用 sbt run 命令执行代码。

    以上代码会从名为 "sensor_data" 的 Kafka 主题中获取数据,将数据转换为 SensorReading 实例,并将其存储到 Redis 的名为 "sensor_temperatures" 的哈希集合中。

    请确保 Kafka 和 Redis 运行在本地,并且已按照指定的端口启动。

    希望这可以帮助您在 IntelliJ IDEA 上编写 Scala 代码使用 Flink 消费 Kafka 中的数据并存储到 Redis 中。

    评论

报告相同问题?

问题事件

  • 创建了问题 9月10日

悬赏问题

  • ¥15 Windows Script Host 无法找到脚本文件"C:\ProgramData\Player800\Cotrl.vbs”
  • ¥15 matlab自定义损失函数
  • ¥15 35114 SVAC视频验签的问题
  • ¥15 impedancepy
  • ¥15 求往届大挑得奖作品(ppt…)
  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图