使用spark 连接mysql 查询时发现一个错误,经过检测后发现是sql语句的问题,但sql 语句在mysql中是能够查询出来的,但是当使用spark进行查询的时候却发现报错了,sql语句如下:
# 计算出支付通道为alipay的金额最大的前5位商户号?
select pay_channel,oid,sum(money) from pay where pay_channel = 'alipay' group by oid order by sum(money) desc limit 5 ;
select pay_channel,oid,sum(money) from pay where pay_channel = 'alipay' group by oid,pay_channel order by sum(money) desc limit 5 ;
正确代码代码如下,使用的是第二条sql语句 如果使用第一条sql语句是会报错:
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object Test23 {
def main(args: Array[String]): Unit = {
//使用SparkSession.builder.替代SQLContext
val sqlContext = SparkSession.builder.
master("local[*]")
.appName("TestMysql")
.getOrCreate()
val url = "jdbc:mysql://hadoop01:3306/spark?characterEncoding=UTF-8"
val table = "pay"
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
//需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
val df = sqlContext.read.jdbc(url, table, properties)
df.createOrReplaceTempView("pay")
val frame: DataFrame = sqlContext.sql("select pay_channel,oid,sum(money) from pay where pay_channel = 'alipay' group by oid,pay_channel order by sum(money) desc limit 5 ")
val rdd = frame.rdd
rdd.foreach(println(_))
}
}
此两行sql语句在mysql中都是能正常查出来结果的,它们两个的区别就是第二条sql语句使用pay_channel字段多进行了一次分组,不过我在sql语句中已经把pay_channel作为了一个条件固定死了,为啥还要进行分组,不然会报错,报错内容如下:
Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'pay.`pay_channel`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
GlobalLimit 5
+- LocalLimit 5
+- Project [pay_channel#3, oid#0, sum(money)#23]
+- Sort [sum(money)#23 DESC NULLS LAST], true
+- Aggregate [oid#0], [pay_channel#3, oid#0, sum(money#6) AS sum(money)#23]
+- Filter (pay_channel#3 = alipay)
+- SubqueryAlias pay
+- Relation[oid#0,pos_name#1,order_num#2,pay_channel#3,pay_method#4,posId#5,money#6,pay_time#7,ord_status#8,rec_state#9] JDBCRelation(pay) [numPartitions=1]
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:247)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:280)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:280)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:280)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
at com.czxy.exercise05.Test23$.main(Test23.scala:31)
at com.czxy.exercise05.Test23.main(Test23.scala)
只是语法的区别么?求大佬解答一下原因!