雾.607 2022-01-11 10:12 采纳率: 66.7%
浏览 133
已结题

Flink连接MySQL报错,求解决方法

问题遇到的现象和发生背景

使用flink测试连接flink时报错:处理流运算符时出错,
不知道是不是程序写错了还是mysql有问题

问题相关代码,请勿粘贴截图
package flink_scala.apitest.sinktest

import java.sql.{Connection, DriverManager, PreparedStatement}

import flink_scala.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object JdbcSinkTest {
  def main(args: Array[String]): Unit = {
    // 定义流式处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 0.读取数据
    val inputStream = env.readTextFile("C:\\Users\\86182\\IdeaProjects\\flink_scala\\src\\main\\resources\\sensor.txt")


    // 1.先转换成样例类类型(简单转换操作)
    val dataStream = inputStream
      .map( data =>{
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      } )

    dataStream.addSink( new MyJdbcSinkFunc() )

    env.execute("Jdbc sink test")

  }
}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://192.168.10.24:3306/test?useSSL=false", "root", "Password123$")
    insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

运行结果及报错内容
我的解答思路和尝试过的方法
我想要达到的结果
  • 写回答

2条回答 默认 最新

  • CSDN专家-微编程 2022-01-11 11:07
    关注

    这是空指针异常,发生在读取数据那里,仔细检查一下数据是否有问题,估计是数据格式出现了问题或者路径输出了问题导致没有读取到自己想要的数据,无法被获取到,所以报了空异常

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 1月19日
  • 已采纳回答 1月11日
  • 创建了问题 1月11日

悬赏问题

  • ¥15 训练的多模态特征融合模型准确度很低怎么办
  • ¥15 kylin启动报错log4j类冲突
  • ¥15 超声波模块测距控制点灯,灯的闪烁很不稳定,经过调试发现测的距离偏大
  • ¥15 import arcpy出现importing _arcgisscripting 找不到相关程序
  • ¥15 onvif+openssl,vs2022编译openssl64
  • ¥15 iOS 自定义输入法-第三方输入法
  • ¥15 很想要一个很好的答案或提示
  • ¥15 扫描项目中发现AndroidOS.Agent、Android/SmsThief.LI!tr
  • ¥15 怀疑手机被监控,请问怎么解决和防止
  • ¥15 Qt下使用tcp获取数据的详细操作