SummerReign 2019-02-22 08:41 采纳率: 0%
浏览 789

如何用java实现SparkSQL dataframe添加自增序号列?

用spark分页查询数据,普通的sql()的不支持分页的sql语句
在网上查资料说可以增加一个序列实现
但是基本都是scala语言,代码如下

// 在原Schema信息的基础上添加一列 “id”信息

val schema: StructType = dataframe.schema.add(StructField("id", LongType))

// DataFrame转RDD 然后调用 zipWithIndex

val dfRDD: RDD[(Row, Long)] = dataframe.rdd.zipWithIndex()

val rowRDD: RDD[Row] = dfRDD.map(tp => Row.merge(tp._1, Row(tp._2)))

// 将添加了索引的RDD 转化为DataFrame

val df2 = spark.createDataFrame(rowRDD, schema)

df2.show()

这段代码如何用java实现呢
或者说我想把dataframe的数据分行获取,有什么其他办法吗

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2022-10-27 15:49
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:
    • 看下这篇博客,也许你就懂了,链接:解决sparksql两个DataFrame合并后出现两列相同的情况
    • 除此之外, 这篇博客: SparkSQL之DataFrame 编程(创建DataFrame ,DataFrame数据运算操作 ,输出存储DataFrame)(11)中的 1  从RDD创建DataFrame(从一个已经存在的RDD进行转换) 部分也许能够解决你的问题, 你可以仔细阅读以下内容或者直接跳转源博客中阅读:

      1)  创建 sparkSession ,后续简称 spark ;

      2)  使用 spark 创建原始的 RDD ,对RDD里面的数据进行切割处理 ,将切割处理的数据封装到定义的一个样例类(bean对象)里面 ,返回一个新的 RDD ;

      3)  创建 DataFrame 的两种方法 : 

      第一种 :  spark 调用 createDataFrame ,将新的RDD 放进去

      第二种 : 导入隐式转换(import spark.implicits._) , 然后新的RDD调用 toDF 方法将 RDD 转换成 DataFrame .

      注意 : 如果切割处理的数据不封装到 bean对象里面 ,而是直接以 tuple(元组) 的方式返回生成新的RDD ,后续这个RDD转为 DataFrame 之后 ,其 ROW(行)字段的名字就不是元组里面的字段名字 ,框架从tuple元组结构中,对schema的推断,也是成功的,只是字段名是tuple中的数据访问索引。即 row 的描述信息没有被约束

      object SparkSqlTest3 {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder()
            .appName(this.getClass.getSimpleName)
            .master("local[*]")
            .getOrCreate()
      
          --创建RDD
          val lines: RDD[String] = spark.sparkContext.parallelize(List("lii,13,90.00", "yuu,14,91.09", "koo,12,90.00"))
          val userRDD: RDD[User2] = lines.map(line => {
            val fields = line.split(",")
            val name = fields(0)
            val age = fields(1).toInt
            val fv = fields(2).toDouble
            User2(name, age, fv)
          })
          --创建DataFrame-----第一种方法
          val userDF: DataFrame = spark.createDataFrame(userRDD)
          userDF.printSchema()
          /**
           * root
           * |-- name: string (nullable = true)
           * |-- age: integer (nullable = false)
           * |-- fv: double (nullable = false)
           */
          userDF.show()
          /**
           * +----+---+-----+
           * |name|age|   fv|
           * +----+---+-----+
           * | lii| 13| 90.0|
           * | yuu| 14|91.09|
           * | koo| 12| 90.0|
           * +----+---+-----+
           */
      
          --创建DataFrame的----第二种方法--导入隐式转换
          import spark.implicits._
          val userDF2: DataFrame = userRDD.toDF
          userDF2.show()
          /**
           * +----+---+-----+
           * |name|age|   fv|
           * +----+---+-----+
           * | lii| 13| 90.0|
           * | yuu| 14|91.09|
           * | koo| 12| 90.0|
           * +----+---+-----+
           */
        }
      }
      case class User2(name:String ,age:Int, fv:Double)

      利用框架提供的隐式转换可以直接调用toDF创建,并指定字段名(其实就是约束 row 的信息)

      object DataFrame03 {
        def main(args: Array[String]): Unit = {
          --创建sparksession
          val session = SparkSession.builder()
            .appName(this.getClass.getSimpleName)
            .master("local[*]")
            .getOrCreate()
          --使用 sparksession 创建RDD
          val lines: RDD[String] = session.sparkContext.parallelize(List("huu,12,98.00", "lii,13,99.09", "poo,14,98.09"))
          val rowRDD = lines.map(line => {
            val fields = line.split(",")
            val f0 = fields(0)
            val f1 = fields(1).toInt
            val f2 = fields(2).toDouble
            (f0, f1, f2)
          })
          --创建 DataFrame
          val dataFrame: DataFrame = session.createDataFrame(rowRDD)
          dataFrame.show()    --打印创建的dataF
          --  row 字段信息是元组的索引(字段名是tuple中的数据访问索引)
            +---+---+-----+
            | _1| _2|   _3|
            +---+---+-----+
            |huu| 12| 98.0|
            |lii| 13|99.09|
            |poo| 14|98.09|
            +---+---+-----+     
          --导入隐式转换
          import session.implicits._
          val dataFrame1: DataFrame = rowRDD.toDF("name", "age", "fv")  --对 row 的信息进行约束
          dataFrame1.show()
          --结果如下:
            +----+---+-----+
            |name|age|   fv|
            +----+---+-----+
            | huu| 12| 98.0|
            | lii| 13|99.09|
            | poo| 14|98.09|
            +----+---+-----+    
        }
      }

      将切割处理的数据封装到Spark系统自定义的Row实例类里面 ,这样就可以给row指定字段属性了 ,创建的RDD跟跟row约束的字段名进行关联

      --创建DataFrame = RDD+CaseClass ,然后调用RDD的toDF
      --创建DataFrame = RDD+StructType
      object DateFrame01 {
        def main(args: Array[String]): Unit = {
          --创建sparkSession ,简称 spark
          val spark: SparkSession = SparkSession.builder()
            .appName(this.getClass.getSimpleName)
            .master("local[*]")
            .getOrCreate()
      
          --创建RDD
          val lines: RDD[String] = spark.sparkContext.parallelize(List("HUU,13,98.0","YII,12,98.99","GRR,17,97.08"))
          --处理数据 ,这个 Row 是spark系统自定义的实例类
          val rowRDD: RDD[Row] = lines.map(line => {
            val fields: Array[String] = line.split(",")
            val f0 = fields(0)
            val f1 = fields(1).toInt
            val f2 = fields(2).toDouble
            Row(f0, f1, f2)
          })
          --对Row的描述信息 ,就是所谓的Schema
          val structType: StructType = StructType(List(
            StructField("name", StringType),          --该字段默认可以为空
            StructField("age", IntegerType, false),     --该字段不可以为空
            StructField("fv", DoubleType, false)
          ))
          --对RDD 和Schema 进行关联
          val df: DataFrame = spark.createDataFrame(rowRDD, structType)
          --创建视图
          df.createTempView("v_user")
          --查询数据
          spark.sql(
            """
              |select name,fv from v_user where age >= 13
              |""".stripMargin).show()
         ----结果如下
            +----+-----+
            |name|   fv|
            +----+-----+
            | HUU| 98.0|
            | GRR|97.08|
            +----+-----+
        }
      }

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

悬赏问题

  • ¥15 使用Jdk8自带的算法,和Jdk11自带的加密结果会一样吗,不一样的话有什么解决方案,Jdk不能升级的情况
  • ¥60 SOL语句中Where查询中的 from to 语句能不能从小到大换成从大到小(标签-SQL)
  • ¥15 画两个图 python或R
  • ¥15 在线请求openmv与pixhawk 实现实时目标跟踪的具体通讯方法
  • ¥15 八路抢答器设计出现故障
  • ¥15 请教一下c语言的代码里有一个地方不懂
  • ¥15 opencv 无法读取视频
  • ¥15 用matlab 实现通信仿真
  • ¥15 按键修改电子时钟,C51单片机
  • ¥60 Java中实现如何实现张量类,并用于图像处理(不运用其他科学计算库和图像处理库))