雾.607 2022-01-11 10:12 采纳率: 66.7%
浏览 103
已结题

Flink连接MySQL报错,求解决方法

问题遇到的现象和发生背景

使用flink测试连接flink时报错:处理流运算符时出错,
不知道是不是程序写错了还是mysql有问题

问题相关代码,请勿粘贴截图
package flink_scala.apitest.sinktest

import java.sql.{Connection, DriverManager, PreparedStatement}

import flink_scala.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object JdbcSinkTest {
  def main(args: Array[String]): Unit = {
    // 定义流式处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 0.读取数据
    val inputStream = env.readTextFile("C:\\Users\\86182\\IdeaProjects\\flink_scala\\src\\main\\resources\\sensor.txt")


    // 1.先转换成样例类类型(简单转换操作)
    val dataStream = inputStream
      .map( data =>{
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      } )

    dataStream.addSink( new MyJdbcSinkFunc() )

    env.execute("Jdbc sink test")

  }
}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://192.168.10.24:3306/test?useSSL=false", "root", "Password123$")
    insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

运行结果及报错内容
我的解答思路和尝试过的方法
我想要达到的结果
  • 写回答

2条回答 默认 最新

      报告相同问题?

      相关推荐 更多相似问题

      问题事件

      • 系统已结题 1月19日
      • 已采纳回答 1月11日
      • 创建了问题 1月11日

      悬赏问题

      • ¥15 香农解码的代码问题,无法输出解码结果
      • ¥15 内网同一网段设备和wifi隔离
      • ¥15 Python操作注册表
      • ¥45 入门级别的一段VUE前端拍照像后端发送请求的代码,帮排错
      • ¥15 anaconda打开spyder后一直闪退,不知道怎么办
      • ¥15 解决迷宫问题中无法运行的问题
      • ¥15 关于aspnetcore中使用mqttnet库的entire
      • ¥15 关于#python#的问题,请各位专家解答!
      • ¥100 关于远控软件的两个问题
      • ¥15 基于STM32的AD8232心电采集装置设计