问题遇到的现象和发生背景
package chapter
import java.sql.PreparedStatement
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object KafkaToFlinkAndSinkToMySql {
case class Event(user: String, url: String)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// Kafka配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "bigdata:9092")
properties.setProperty("group.id", "my_group")
// 创建Kafka数据流
val kafkaDS = env.addSource(new FlinkKafkaConsumer[String]("test2",new SimpleStringSchema(),properties).setStartFromEarliest())(TypeInformation.of(classOf[String]))
// 打印Kafka数据流内容
kafkaDS.print()
// 数据库连接配置
val jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://bigdata:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
// 创建事件数据流
val stream: DataStream[Event] = kafkaDS.map {
str =>
val fields = str.split(",")
Event(fields(0), fields(1))
}(TypeInformation.of(classOf[Event]))
// 将数据流写入MySQL
stream.addSink(JdbcSink.sink(
"insert into clicks (user, url) values (?, ?)",
new JdbcStatementBuilder[Event] {
override def accept(t: PreparedStatement, u: Event): Unit = {
t.setString(1, u.user)
t.setString(2, u.url)
}
},
jdbcConnectionOptions
))
// 执行Flink任务
env.execute()
}
}
输出结果其中kafka的test2的主题中的数据能取出来,但是无法存入MySQL
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Sun Oct 15 14:59:28 CST 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
"Bob", "./cart"
"Alice", "./cart"
"Mary", "./prod?id=1"
"Mary", "./prod?id=2"
"Mary", "./prod?id=3"
"Bob", "./cart"
"Alice", "./cart"
"Mary", "./prod?id=1"
如果将存入MySQL的demo单拎出来是可以存入的
package chapter
import java.sql.PreparedStatement
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object SinkToMySqlTest {
case class Event(user: String, url: String)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream:DataStream[Event] = env.fromElements(Event("Mary", "./home"),
Event("Bob", "./cart"),
Event("Bob", "./cart"),
Event("Alice", "./cart"),
Event("Mary", "./prod?id=1"),
Event("Mary", "./prod?id=2"),
Event("Mary", "./prod?id=3")
)
stream.addSink(JdbcSink.sink(
"insert into clicks (user, url) values (?, ?)",
new JdbcStatementBuilder[Event] {
override def accept(t: PreparedStatement, u: Event): Unit = {
t.setString(1, u.user)
t.setString(2, u.url)
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://bigdata:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
))
env.execute()
}
}
mysql> select * from clicks;
+-------+-------------+
| user | url |
+-------+-------------+
| Mary | ./home |
| Bob | ./cart |
| Bob | ./cart |
| Alice | ./cart |
| Mary | ./prod?id=1 |
| Mary | ./prod?id=2 |
| Mary | ./prod?id=3 |
+-------+-------------+
但是将从kafka取数据的demo和存入MySQL的demo整合到一起就无法存入MySQL,在上述的代码中,可以看到整合之后的程序是可以取到kafka的数据的,但是无法将数据存入MySQL。