雾.607 2022-01-11 10:12 采纳率: 66.7%
浏览 133
已结题

Flink连接MySQL报错,求解决方法

问题遇到的现象和发生背景

使用flink测试连接flink时报错:处理流运算符时出错,
不知道是不是程序写错了还是mysql有问题

问题相关代码,请勿粘贴截图
package flink_scala.apitest.sinktest

import java.sql.{Connection, DriverManager, PreparedStatement}

import flink_scala.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object JdbcSinkTest {
  def main(args: Array[String]): Unit = {
    // 定义流式处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 0.读取数据
    val inputStream = env.readTextFile("C:\\Users\\86182\\IdeaProjects\\flink_scala\\src\\main\\resources\\sensor.txt")


    // 1.先转换成样例类类型(简单转换操作)
    val dataStream = inputStream
      .map( data =>{
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      } )

    dataStream.addSink( new MyJdbcSinkFunc() )

    env.execute("Jdbc sink test")

  }
}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://192.168.10.24:3306/test?useSSL=false", "root", "Password123$")
    insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

运行结果及报错内容
我的解答思路和尝试过的方法
我想要达到的结果
  • 写回答

2条回答 默认 最新

  • CSDN专家-微编程 2022-01-11 11:07
    关注

    这是空指针异常,发生在读取数据那里,仔细检查一下数据是否有问题,估计是数据格式出现了问题或者路径输出了问题导致没有读取到自己想要的数据,无法被获取到,所以报了空异常

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 1月19日
  • 已采纳回答 1月11日
  • 创建了问题 1月11日

悬赏问题

  • ¥20 docker里部署springboot项目,访问不到扬声器
  • ¥15 netty整合springboot之后自动重连失效
  • ¥15 悬赏!微信开发者工具报错,求帮改
  • ¥20 wireshark抓不到vlan
  • ¥20 关于#stm32#的问题:需要指导自动酸碱滴定仪的原理图程序代码及仿真
  • ¥20 设计一款异域新娘的视频相亲软件需要哪些技术支持
  • ¥15 stata安慰剂检验作图但是真实值不出现在图上
  • ¥15 c程序不知道为什么得不到结果
  • ¥15 键盘指令混乱情况下的启动盘系统重装
  • ¥40 复杂的限制性的商函数处理