红尘丶世界 2020-04-09 08:02 采纳率: 66.7%
浏览 256
已结题

spark sql 语法问题,新手求指点

使用spark 连接mysql 查询时发现一个错误,经过检测后发现是sql语句的问题,但sql 语句在mysql中是能够查询出来的,但是当使用spark进行查询的时候却发现报错了,sql语句如下:

# 计算出支付通道为alipay的金额最大的前5位商户号?
select pay_channel,oid,sum(money) from pay where pay_channel = 'alipay' group by oid order by sum(money) desc limit 5 ;

select pay_channel,oid,sum(money) from pay where pay_channel = 'alipay' group by oid,pay_channel order by sum(money) desc limit 5 ;

正确代码代码如下,使用的是第二条sql语句 如果使用第一条sql语句是会报错:

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}


object Test23 {
  def main(args: Array[String]): Unit = {
    //使用SparkSession.builder.替代SQLContext
    val sqlContext = SparkSession.builder.
      master("local[*]")
      .appName("TestMysql")
      .getOrCreate()
    val url = "jdbc:mysql://hadoop01:3306/spark?characterEncoding=UTF-8"
    val table = "pay"
    val properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "123456")
    //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
    val df = sqlContext.read.jdbc(url, table, properties)
    df.createOrReplaceTempView("pay")
    val frame: DataFrame = sqlContext.sql("select pay_channel,oid,sum(money) from pay where pay_channel = 'alipay' group by oid,pay_channel order by sum(money) desc limit 5 ")
    val rdd = frame.rdd
    rdd.foreach(println(_))

  }
}

此两行sql语句在mysql中都是能正常查出来结果的,它们两个的区别就是第二条sql语句使用pay_channel字段多进行了一次分组,不过我在sql语句中已经把pay_channel作为了一个条件固定死了,为啥还要进行分组,不然会报错,报错内容如下:

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'pay.`pay_channel`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
GlobalLimit 5
+- LocalLimit 5
   +- Project [pay_channel#3, oid#0, sum(money)#23]
      +- Sort [sum(money)#23 DESC NULLS LAST], true
         +- Aggregate [oid#0], [pay_channel#3, oid#0, sum(money#6) AS sum(money)#23]
            +- Filter (pay_channel#3 = alipay)
               +- SubqueryAlias pay
                  +- Relation[oid#0,pos_name#1,order_num#2,pay_channel#3,pay_method#4,posId#5,money#6,pay_time#7,ord_status#8,rec_state#9] JDBCRelation(pay) [numPartitions=1]

    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:247)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:280)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:280)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:280)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
    at com.czxy.exercise05.Test23$.main(Test23.scala:31)
    at com.czxy.exercise05.Test23.main(Test23.scala)

只是语法的区别么?求大佬解答一下原因!

  • 写回答

1条回答 默认 最新

  • 关注
    评论

报告相同问题?

悬赏问题

  • ¥50 导入文件到网吧的电脑并且在重启之后不会被恢复
  • ¥15 (希望可以解决问题)ma和mb文件无法正常打开,打开后是空白,但是有正常内存占用,但可以在打开Maya应用程序后打开场景ma和mb格式。
  • ¥20 ML307A在使用AT命令连接EMQX平台的MQTT时被拒绝
  • ¥20 腾讯企业邮箱邮件可以恢复么
  • ¥15 有人知道怎么将自己的迁移策略布到edgecloudsim上使用吗?
  • ¥15 错误 LNK2001 无法解析的外部符号
  • ¥50 安装pyaudiokits失败
  • ¥15 计组这些题应该咋做呀
  • ¥60 更换迈创SOL6M4AE卡的时候,驱动要重新装才能使用,怎么解决?
  • ¥15 让node服务器有自动加载文件的功能