问题遇到的现象和发生背景
使用flink测试连接flink时报错:处理流运算符时出错,
不知道是不是程序写错了还是mysql有问题
问题相关代码,请勿粘贴截图
package flink_scala.apitest.sinktest
import java.sql.{Connection, DriverManager, PreparedStatement}
import flink_scala.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
object JdbcSinkTest {
def main(args: Array[String]): Unit = {
// 定义流式处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 0.读取数据
val inputStream = env.readTextFile("C:\\Users\\86182\\IdeaProjects\\flink_scala\\src\\main\\resources\\sensor.txt")
// 1.先转换成样例类类型(简单转换操作)
val dataStream = inputStream
.map( data =>{
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
} )
dataStream.addSink( new MyJdbcSinkFunc() )
env.execute("Jdbc sink test")
}
}
class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{
// 定义连接、预编译语句
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://192.168.10.24:3306/test?useSSL=false", "root", "Password123$")
insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
}
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
// 先执行更新操作,查到就更新
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
// 如果更新没有查到数据,那么就插入
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}