雾.607 2022-01-11 10:12 采纳率: 66.7%
浏览 80
已结题

Flink连接MySQL报错,求解决方法

问题遇到的现象和发生背景

使用flink测试连接flink时报错:处理流运算符时出错,
不知道是不是程序写错了还是mysql有问题

问题相关代码,请勿粘贴截图
package flink_scala.apitest.sinktest

import java.sql.{Connection, DriverManager, PreparedStatement}

import flink_scala.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object JdbcSinkTest {
  def main(args: Array[String]): Unit = {
    // 定义流式处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 0.读取数据
    val inputStream = env.readTextFile("C:\\Users\\86182\\IdeaProjects\\flink_scala\\src\\main\\resources\\sensor.txt")


    // 1.先转换成样例类类型(简单转换操作)
    val dataStream = inputStream
      .map( data =>{
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      } )

    dataStream.addSink( new MyJdbcSinkFunc() )

    env.execute("Jdbc sink test")

  }
}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://192.168.10.24:3306/test?useSSL=false", "root", "Password123$")
    insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

运行结果及报错内容
我的解答思路和尝试过的方法
我想要达到的结果
  • 写回答

2条回答

      报告相同问题?

      相关推荐 更多相似问题

      问题事件

      • 系统已结题 1月19日
      • 已采纳回答 1月11日
      • 创建了问题 1月11日

      悬赏问题

      • ¥15 求一个超难的动态新增元素的 click 事件无效的解决办法。
      • ¥20 怎么修改mediawiki允许上传的文件大小?
      • ¥15 agrySEXPAYm 是毒吗
      • ¥50 Java实现注册登录实现数据库增删改查功能,数据库至少两个表
      • ¥20 求解R语言的数据分析问题
      • ¥20 求GD32F105和305解除读保护方法教程
      • ¥15 C++代码优化,复杂度太高,无法通过clang-tidy检查,用什么办法可以优=化
      • ¥15 关于跨链隐私保护方案
      • ¥15 node mongodb 根据id给子集合list添加对象 请问应该如何操作
      • ¥50 如何得到路径下的绝对路径并且回传到list上