主要想实现用spark(scala)从mysql里取存到hive,在网上找了好久的教程还是报错,求大神们指教
以下是执行命令
cd $SPARK_HOME
bin/spark-submit --class LIANXI.spark存取 /jar/1.jar
截取的报错信息,其他日志没有截取
21/11/16 11:16:04 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Tue Nov 16 11:16:04 CST 2021 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
21/11/16 11:16:06 INFO execution.SparkSqlParser: Parsing command: Check
21/11/16 11:16:06 INFO execution.SparkSqlParser: Parsing command:
insert overwrite table ods.customer partition(etldate='20211115')
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input '<EOF>'(line 3, pos 0)
== SQL ==
insert overwrite table ods.customer partition(etldate='20211115')
^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at LIANXI.spark存取$.main(spark存取.scala:34)
at LIANXI.spark存取.main(spark存取.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
以下是我写的源码
package LIANXI
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object spark存取 {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val conf=new SparkConf().setMaster("local[*]").setAppName("Connect")
//创建sparksession对象
val spark=SparkSession
.builder()
.appName("Connect_mysql_hive")
.config(conf)
.enableHiveSupport()
.getOrCreate()
//设置隐式转换
import spark.implicits._
//使用load方法读取
val df=spark.read.format("jdbc")
.option("driver","com.mysql.jdbc.Driver")
.option("url","jdbc:mysql://192.168.1.113:3306/shtd_store")
.option("user","root")
.option("password",123456)
.option("dbtable","CUSTOMER")
.option("dbtable","NATION").load()
//创建视图
df.createTempView("Check")
spark.sql(
"""
|insert overwrite table ods.customer partition(etldate='20211115')
|""".stripMargin)
spark.stop()
}
}