scala spark 创建DataFrame的五种方式

scala spark 创建DataFrame的多种方式

1. 通过RDD[Row]和StructType创建

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
 *通过RDD[Row]和StructType创建DataFrame
 */
object DataFrameDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    // 通过SparkSession创建spark入口
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    println("1. 通过RDD[Row]和StructType创建")
    //1. 通过RDD[Row]和StructType创建
    val sparkRdd1: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
    // 将RDD与Row联合
    val rowRdd: RDD[Row] = sparkRdd1.map(t => {
      val per: Array[String] = t.split(",")
      Row(per(0), per(1).toInt, per(2))
    })
    // 创建StructType实例,设置字段名和类型
    val schema: StructType = StructType(
      List(
        StructField("name", StringType),
        StructField("age", IntegerType),
        StructField("sex", StringType)
      )
    )
    // 创建dataFrame
    val dataFrame: DataFrame = spark.createDataFrame(rowRdd, schema)
    // 展示数据
    dataFrame.show()
    // 释放资源
    spark.stop()
  }
}

输出结果

1. 通过RDD[Row]和StructType创建
+----+---+---+
|name|age|sex|
+----+---+---+
|   X| 22|  M|
|   y| 21|  W|
|   N| 22|  M|
+----+---+---+

2. 直接通过Rdd.toDF()创建DataFrame

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 通过RDD.toDF直接创建DataFrame
 */
object DataFrameDemo2 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    // 通过SparkSession创建spark入口
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    println("2. 直接通过Rdd.toDF()创建DataFrame")
    // 创建RDD
    val sparkRdd3: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
    // 将RDD与Row联合
    val toDFRdd: RDD[(String, Int, String)] = sparkRdd3.map(t => {
      val per: Array[String] = t.split(",")
      (per(0), per(1).toInt, per(2))
    })
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 创建DataFrame,设置schema字段名
    //写法1
  //val frame: DataFrame = toDFRdd.toDF("name", "age", "sex")
    //写法2
    val array =Array("name", "age", "sex")
    val frame: DataFrame = toDFRdd.toDF(array: _*)

    // 展示数据
    frame.agg(sum("age") as "avg_age").show()
    // 释放资源
    spark.stop()
  }

}

输出结果

2. 直接通过Rdd.toDF()创建DataFrame
+-------+
|avg_age|
+-------+
|     65|
+-------+

3. 通过RDD和ScalaBean创建DataFrame

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 通过RDD和scalabean创建DataFrame
 */
object DataFrameDemo3 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    // 通过SparkSession创建spark入口
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    println("3. 通过RDD和ScalaBean创建DataFrame")
    val sparkRdd: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
    // 关联SacalaBean的RDD
    val beanRdd: RDD[Per] = sparkRdd.map(t => {
      val per: Array[String] = t.split(",")
      Per(per(0), per(1).toInt, per(2))
    })
    // 必须导入隐式转换才能使用.toDF
    import spark.implicits._
    // 创建dataFrame
    val df: DataFrame = beanRdd.toDF
    // 创建视图
    df.createTempView("t_per")
    // 查询数据
    val res: DataFrame = spark.sql("SELECT name,age FROM t_per ORDER BY age")
    // 展示数据
    res.show()
    // 释放资源
    spark.stop()
  }

  //ScalaBean
  case class Per(name: String, age: Int, sex: String)
}

输出结果

3. 通过RDD和ScalaBean创建DataFrame
+----+---+
|name|age|
+----+---+
|   y| 21|
|   N| 22|
|   X| 22|
+----+---+

4. 通过RDD联合JavaBean创建DataFrame

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 通过RDD和javabean创建DataFrame
 */
object DataFrameDemo4 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    // 通过SparkSession创建spark入口
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    println("4. 通过RDD联合JavaBean创建DataFrame")
    // 创建RDD
    val sparkRdd4: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
    // 将RDD与Row联合
    val javaBeanRdd: RDD[Person] = sparkRdd4.map(t => {
      val per: Array[String] = t.split(",")
      new Person(per(0), per(1).toInt, per(2))
    })
    // 创建DataFrame
    val frame1: DataFrame = spark.createDataFrame(javaBeanRdd, classOf[Person])
    // 展示数据
    frame1.show()
    // 释放资源
    spark.stop()
  }
}

Java bean class

public class Person {
    private String name;
    private int age;
    private String sex;

    public Person(String name, int age, String sex) {
        this.name = name;
        this.age = age;
        this.sex = sex;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }
}

输出结果

4. 通过RDD联合JavaBean创建DataFrame
+---+----+---+
|age|name|sex|
+---+----+---+
| 22|   X|  M|
| 21|   y|  W|
| 22|   N|  M|
+---+----+---+

5. 手写的通过RDD和javabean创建DataFrame

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 通过RDD和javabean创建DataFrame
 */
object DataFrameDemo5 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    // 通过SparkSession创建spark入口
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    println("5、手写的示例5")
    val rdd11 = spark.sparkContext.parallelize(Seq(
      Row(8, "bat"),
      Row(64, "mouse"),
      Row(-27, "horse")
    ))
    rdd11.foreach(println)
   // 创建
    val newRDD: RDD[NumberWord]= rdd11.map(x=>{
      val rdto = convertNumberWordDTO(x.getInt(0)+100,x.getString(1))
      rdto
    })
    val ds = spark.createDataFrame(newRDD, classOf[NumberWord])
    ds.show(100,false)

    println(ds.schema) //StructType(StructField(number,IntegerType,false), StructField(word,StringType,true))
    //调用 someDF.schema.printTreeString()得到someDF的schema:
    ds.schema.printTreeString()
    //root
    // |-- number: integer (nullable = false)
    // |-- word: string (nullable = true)

    // 释放资源
    spark.stop()
  }
  def convertNumberWordDTO(number: Int,word:String) ={
    val rdto = new NumberWord()
    rdto.setNumber(number)
    rdto.setWord(word)
    rdto
  }
}

Java bean class

public class NumberWord {
    private int number;
    private String word;

    public NumberWord() {
    }

    public NumberWord(int number, String word) {
        this.number = number;
        this.word = word;
    }

    public int getNumber() {
        return number;
    }

    public void setNumber(int number) {
        this.number = number;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }
}

输出结果

5、手写的示例5
[8,bat]
[64,mouse]
[-27,horse]
+------+-----+
|number|word |
+------+-----+
|108   |bat  |
|164   |mouse|
|73    |horse|
+------+-----+

StructType(StructField(number,IntegerType,false), StructField(word,StringType,true))
root
 |-- number: integer (nullable = false)
 |-- word: string (nullable = true)

 

  • 4
    点赞
  • 23
    收藏
    觉得还不错? 一键收藏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值