轻书北烟淡寒864 2023-10-15 15:08 采纳率: 66.7%
浏览 5

关于#flink#的问题:但是将从kafka取数据的demo和存入MySQL的demo整合到一起就无法存入MySQL,可以看到整合之后的程序是可以取到kafka的数据的,但是无法将数据存入MySQL

问题遇到的现象和发生背景
package chapter

import java.sql.PreparedStatement
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object KafkaToFlinkAndSinkToMySql {
  case class Event(user: String, url: String)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // Kafka配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "bigdata:9092")
    properties.setProperty("group.id", "my_group")

    // 创建Kafka数据流
        val kafkaDS = env.addSource(new FlinkKafkaConsumer[String]("test2",new SimpleStringSchema(),properties).setStartFromEarliest())(TypeInformation.of(classOf[String]))

    // 打印Kafka数据流内容
    kafkaDS.print()

    // 数据库连接配置
    val jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
      .withUrl("jdbc:mysql://bigdata:3306/test")
      .withDriverName("com.mysql.jdbc.Driver")
      .withUsername("root")
      .withPassword("123456")
      .build()

    // 创建事件数据流
    val stream: DataStream[Event] = kafkaDS.map {
      str =>
        val fields = str.split(",")
        Event(fields(0), fields(1))
    }(TypeInformation.of(classOf[Event]))

    // 将数据流写入MySQL
    stream.addSink(JdbcSink.sink(
      "insert into clicks (user, url) values (?, ?)",
      new JdbcStatementBuilder[Event] {
        override def accept(t: PreparedStatement, u: Event): Unit = {
          t.setString(1, u.user)
          t.setString(2, u.url)
        }
      },
      jdbcConnectionOptions
    ))

    // 执行Flink任务
    env.execute()
  }
}
输出结果其中kafka的test2的主题中的数据能取出来,但是无法存入MySQL
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Sun Oct 15 14:59:28 CST 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
"Bob", "./cart"
"Alice", "./cart"
"Mary", "./prod?id=1"
"Mary", "./prod?id=2"
"Mary", "./prod?id=3"
"Bob", "./cart"
"Alice", "./cart"
"Mary", "./prod?id=1"
如果将存入MySQL的demo单拎出来是可以存入的

package chapter

import java.sql.PreparedStatement

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object SinkToMySqlTest {
  case class Event(user: String, url: String)
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream:DataStream[Event] = env.fromElements(Event("Mary", "./home"),
    Event("Bob", "./cart"),
    Event("Bob", "./cart"),
    Event("Alice", "./cart"),
    Event("Mary", "./prod?id=1"),
    Event("Mary", "./prod?id=2"),
    Event("Mary", "./prod?id=3")
    )

    stream.addSink(JdbcSink.sink(
      "insert into clicks (user, url) values (?, ?)",
      new JdbcStatementBuilder[Event] {
        override def accept(t: PreparedStatement, u: Event): Unit = {
          t.setString(1, u.user)
          t.setString(2, u.url)
        }
      },
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:mysql://bigdata:3306/test")
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("root")
        .withPassword("123456")
        .build()
    ))

    env.execute()
  }
}

mysql> select * from clicks;
+-------+-------------+
| user  | url         |
+-------+-------------+
| Mary  | ./home      |
| Bob   | ./cart      |
| Bob   | ./cart      |
| Alice | ./cart      |
| Mary  | ./prod?id=1 |
| Mary  | ./prod?id=2 |
| Mary  | ./prod?id=3 |
+-------+-------------+
但是将从kafka取数据的demo和存入MySQL的demo整合到一起就无法存入MySQL,在上述的代码中,可以看到整合之后的程序是可以取到kafka的数据的,但是无法将数据存入MySQL。
  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-10-15 19:49
    关注

    【相关推荐】



    • 这篇文章:Flink实战 —— 读取Kafka数据并与MySQL数据关联【附源码】 也许有你想要的答案,你可以看看
    • 除此之外, 这篇博客: flink监听kafka的数据保证程序健壮执行且只执行一次,并将结果保存到mysql中中的 1.接收kafka的数据:Source 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
      //创建一个计算实时流的flink环境
      		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        
      //告诉程序我们要接收哪那台机器上的数据,topic的分区名称(方便查询抓取)以及从头开始读
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", "doit01:9092,doit02:9092,doit03:9092");
              properties.setProperty("group.id", args[1]);
              properties.setProperty("auto.offset.reset", "earliest");
      //设置kafka的框架,传入kafka的话题名称,框架已经性质!
              FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>(args[2], new SimpleStringSchema(), properties);
      //在Checkpoint的时候将Kafka的偏移量保存到Kafka特殊的Topic中,默认是true
              flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
      //将kafka的数据添加到我们程序梳理来源之一
              DataStreamSource<String> lines = env.addSource(flinkKafkaConsumer);
      

      保证数据的健壮Exactly-once前提是必须设置Checkpoint检查站

      //开启Checkpoint(检查站)30秒钟检查一次
              env.enableCheckpointing(30000);
      //设置检查站的模式:(最少的执行次数)
              env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
      //设置如果job cancel后,依然保存对应的checkpoint数据
              env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
      //检查站每次将检查结果的偏移量保存的一个路径(防止数据丢失)
              env.setStateBackend(new FsStateBackend(args[0]));
      //设置一个从起策略(一旦遇到问题,最多伪重启10次,重启延迟为30秒)
              env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 30000));
      

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 创建了问题 10月15日

悬赏问题

  • ¥30 STM32 INMP441无法读取数据
  • ¥100 求汇川机器人IRCB300控制器和示教器同版本升级固件文件升级包
  • ¥15 用visualstudio2022创建vue项目后无法启动
  • ¥15 x趋于0时tanx-sinx极限可以拆开算吗
  • ¥500 把面具戴到人脸上,请大家贡献智慧
  • ¥15 任意一个散点图自己下载其js脚本文件并做成独立的案例页面,不要作在线的,要离线状态。
  • ¥15 各位 帮我看看如何写代码,打出来的图形要和如下图呈现的一样,急
  • ¥30 c#打开word开启修订并实时显示批注
  • ¥15 如何解决ldsc的这条报错/index error
  • ¥15 VS2022+WDK驱动开发环境