Spark SQL
Spark SQL是构建在Spark RDD之上一款ETL(Extract Transformation Load)工具(类似Hive-1.x-构建在MapReduce之上)。同Spark RDD 不同地方在于Spark SQL的API可以给Spark计算引擎提供更多的信息(计算数据结构、转换算子),Spark计算引擎可以根据SparkSQL提供的信息优化底层计算任务。目前为止Spark SQL提供了两种风格的交互API:
Dataset
-API /SQL脚本。
Dataset-API
: 加强版的RDD操作,例如支持map、flatMap、filter等同时还支持select、where、groupBy、cube等,需要用户在操作数据的时候必须知道操作数据的类型,因此通常将这种API操作称为strong-type操作。
SQL脚本
:用户无需关注底层 Dataset API, 用户所有的操作类型都是基于 命名列类型,需要用户在操作数据的不必要关心所操作数据的类型,因此通常将这种操作称为untyped操作。
Spark SQL研究的主要对象是
Dataset
/Dataframe
(加强版本的RDD)。Dataset是一个分布式数据集合在Spark 1.6提供一个新的接口,Dataset提供RDD的优势(强类型,使用强大的lambda函 数)以及具备了Spark SQL执行引擎的优点。Dataset可以通过JVM对象构建,然后可以使用转换函数等(例如:map、flatMap、filter等),目前Dataset API支持Scala和Java 目前Python对Dataset支持还不算完备。
DataFrame是命名列的数据集-
特殊Dataset,他在概念是等价于关系型数据库。DataFrame可以从很多地方构建,比如说结构化数据文件(Json、CSV等)、hive中的表(遗留系统对接)或者外部数据库,使用Dataset[Row]的数据集,可以理解DataFrame就是一个Dataset[Row].
1、快速入门
引入依赖
<properties>
<spark.version>2.4.3</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
SQL脚本
//1.创建sparkSession对象
val spark = SparkSession.builder().appName("wordCount").master("local[6]").getOrCreate()
//2.导入常见的隐式装换|增强 将rdd转换成dataFrame
import spark.implicits._
spark.sparkContext.setLogLevel("FATAL")
//3.创建rdd
val wordRDD:RDD[(String,Int)] = spark.sparkContext.textFile("file:///F:/demo/words")
.flatMap(_.split("\\s+"))
.map((_,1))
//4.将rdd装换为dataFrame 进行操作
val dataFrame = wordRDD.toDF("word","count")
dataFrame.createOrReplaceTempView("t_words")
spark.sql("select word,count(count) count from t_words group by word order by count desc").show()
//5.关闭sparkSession对象
spark.stop()
命名列-API
//1.创建sparkSession对象
val spark = SparkSession.builder().appName("wordCount").master("local[6]").getOrCreate()
//2.导入常见的隐式装换|增强 将rdd转换成dataFrame
import spark.implicits._
spark.sparkContext.setLogLevel("FATAL")
//3.创建rdd
val wordRDD:RDD[(String,Int)] = spark.sparkContext.textFile("file:///F:/demo/words")
.flatMap(_.split("\\s+"))
.map((_,1))
//4.将rdd装换为dataFrame 进行操作
import org.apache.spark.sql.functions._
wordRDD.toDF("word","num")
.groupBy("word")
.agg(sum($"num") as "total")
.orderBy($"total" desc)
.show()
//5.关闭sparkSession对象
spark.stop()
强类型-typed
//1.创建sparkSession对象
val spark = SparkSession.builder().appName("wordCount").master("local[6]").getOrCreate()
//2.导入常见的隐式装换|增强 将rdd转换成dataFrame
import spark.implicits._
spark.sparkContext.setLogLevel("FATAL")
//3.创建rdd
val wordRDD:RDD[(String,Int)] = spark.sparkContext.textFile("file:///F:/demo/words")
.flatMap(_.split("\\s+"))
.map((_,1))
wordRDD.toDS().groupByKey(pair => pair._1)
.agg(typed.sum[(String,Int)](t => t._2).name("total"))
.rdd.sortBy(_._2,false,3)
.toDF("word","num")
.show()
//5.关闭sparkSession对象
spark.stop()
2、Dataset & Dataframe
(1)Dataset create
Dataset是个特殊的RDD(增强版本的RDD),与RDD不同Spark SQL自己维护了一套序列化和反序列化规范,规避在计算过程中多次因为序列化对计算节点的性能损耗,因为Spark SQL提倡的是untyped的操作,用户无需关注操作的类型,只需提供针对命名列操作的算子即可,因此提升计算节点计算性能。
case-class
//先创键样例类
case class User(id:Int,name:String,age:Int)
//展示
List(User(1,"zs",22),User(2,"ls",21)).toDS().show()
//结果
+---+----+---+
| id|name|age|
+---+----+---+
| 1| zs| 22|
| 2| ls| 21|
+---+----+---+
Tuple元组
List((1,"ls",21),(2,"ww",12)).toDS().show()
//结果
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| ls| 21|
| 2| ww| 12|
+---+---+---+
json
创建json格式文件 测试使用
{“id”:1,“name”:“zhanhsan”,“age”:15}
{“id”:2,“name”:“lisi”,“age”:22}
//样例类中不能用Int 需要使用bigint 或者 long
Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot up cast `id` from bigint to int as it may truncate
case class User(id:Long,name:String,age:Long)
val spark = SparkSession.builder().master("local[6]").appName("wordCount").getOrCreate()
spark.read.json("file:///F:/demo/json").as[User].show()
//结果
+---+---+--------+
|age| id| name|
+---+---+--------+
| 15| 1|zhanhsan|
| 22| 2| lisi|
+---+---+--------+
rdd
RDD中元素必须是
元组
或者是case-class
才可以直接通过toDS创建
- 元组
val userRDD = spark.sparkContext.makeRDD(List((1,"张三",true,18,15000.0)))
userRDD.toDS().show()
// 结果
+---+----+----+---+-------+
| _1| _2| _3| _4| _5|
+---+----+----+---+-------+
| 1|张三|true| 18|15000.0|
+---+----+----+---+-------+
- case-class
val userRDD = spark.sparkContext.makeRDD(List(Person(1,"张三",18,true)))
userRDD.toDS().show()
// 结果
+---+----+---+----+
| id|name|age| sex|
+---+----+---+----+
| 1|张三| 18|true|
+---+----+---+----+
(2)DataFrame create
Data Frame是
命名列
的数据集,他在概念是等价于关系型数据库。DataFrame可以从很多地方构建,比如说结构化数据文 件、hive中的表或者外部数据库,使用Dataset[row]的数据集,可以理解DataFrame就是一个Dataset[Row].
json文件
创建json格式文件 测试使用
{“id”:1,“name”:“zhanhsan”,“age”:15}
{“id”:2,“name”:“lisi”,“age”:22}
val spark = SparkSession.builder().master("local[6]").appName("wordCount").getOrCreate()
spark.read.json("file:///F:/demo/json").show()
spark.stop()
//结果
+---+---+--------+
|age| id| name|
+---+---+--------+
| 15| 1|zhanhsan|
| 22| 2| lisi|
+---+---+--------+
case-class
List(User(1,"zhangsan",21),User(2,"lisi",22)).toDF().show()
//结果
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan| 21|
| 2| lisi| 22|
+---+--------+---+
Tuple元组
List((1,"zs",21),(2,"ls",11)).toDF("id","name","age").show()
//结果
+---+----+---+
| id|name|age|
+---+----+---+
| 1| zs| 21|
| 2| ls| 11|
+---+----+---+
RDD创建
- RDD[Row]
val spark = SparkSession.builder().master("local[6]").appName("wordCount").getOrCreate()
val wordPairRDD:RDD[Row] = spark.sparkContext
.makeRDD(List((1,"zz",22),(2,"ss",32)))
.map( t => Row(t._1,t._2,t._3))
val schema = new StructType().add("id",IntegerType).add("name",StringType).add("agg",IntegerType)
spark.createDataFrame(wordPairRDD,schema).show()
- RDD[javaBean]
public class Student implements Serializable {
private Integer id;
private String name;
private Integer age;
public Integer getId() {
return id;
}
public Student(Integer id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}
...set/get
}
val spark = SparkSession.builder().master("local[6]").appName("wordCount").getOrCreate()
val studentRDD:RDD[Student] = spark.sparkContext
.makeRDD(List((1,"zz",22),(2,"ss",32)))
.map( t => new Student(t._1,t._2,t._3))
spark.createDataFrame(studentRDD,classOf[Student]).show()
- RDD[Tuple]
val spark = SparkSession.builder().master("local[6]").appName("wordCount").getOrCreate()
val userRDD:RDD[(Int,String,Int)] = spark.sparkContext.makeRDD(List((1, "zs", 18), (2, "ls", 18)))
spark.createDataFrame(userRDD).toDF("id","name","age").show()
- RDD[case-class]
case class User(id:Int,name:String,age:Int)
val spark = SparkSession.builder().master("local[6]").appName("wordCount").getOrCreate()
val userRDD:RDD[User] = spark.sparkContext
.makeRDD(List((1, "zs", 18), (2, "ls", 18)))
.map(t => User(t._1, t._2, t._3))
spark.createDataFrame(userRDD)
.show()
3、无类型数据集操作 (又称为 DataFrame 操作)
准备数据样本
id name sex age salary dept
---------------------------
1,Michael,false,29,2000,001
2,Andy,true,30,5000,001
3,Justin,true,19,1000,002
4,Kaine,false,20,5000,003
5,Lisa,false,19,1000,002
val spark = SparkSession.builder().appName("test").master("local[6]").getOrCreate()
spark.sparkContext.setLogLevel("FATAL")//关闭spark日志
val EmpRDD:RDD[Employee] = spark.sparkContext.textFile("file:///F:/demo/employee")
.map(_.split(","))
.map(ts => Employee(ts(0).toInt,ts(1),ts(2).toBoolean,ts(3).toInt,ts(4).toDouble,ts(5)))
import spark.implicits._
val EmpDF = EmpRDD.toDF()
printSchema()
打印当前表的结构信息
EmpDF.printSchema()
//结果
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- sex: boolean (nullable = false)
|-- age: integer (nullable = false)
|-- salary: double (nullable = false)
|-- dept: string (nullable = true)
select
投影查询,只查询对应字段的信息
EmpDF.select("id","name","age").show()
//结果
+---+-------+---+
| id| name|age|
+---+-------+---+
| 1|Michael| 29|
| 2| Andy| 30|
| 3| Justin| 19|
| 4| Kaine| 20|
| 5| Lisa| 19|
+---+-------+---+
如果用户想参与字段的计算
EmpDF.select($"id",$"name",$"salary",$"salary"*12 as "年薪" ).show()
//结果
+---+-------+------+-------+
| id| name|salary| 年薪|
+---+-------+------+-------+
| 1|Michael|2000.0|24000.0|
| 2| Andy|5000.0|60000.0|
| 3| Justin|1000.0|12000.0|
| 4| Kaine|5000.0|60000.0|
| 5| Lisa|1000.0|12000.0|
+---+-------+------+-------+
selectExpr
可以在字段中使用SQL脚本/表达式
EmpDF.selectExpr("id","name","salary * 12 as annual_salary").show()
//结果
+---+-------+-------------+
| id| name|annual_salary|
+---+-------+-------------+
| 1|Michael| 24000.0|
| 2| Andy| 60000.0|
| 3| Justin| 12000.0|
| 4| Kaine| 60000.0|
| 5| Lisa| 12000.0|
+---+-------+-------------+
注意: as别名时不能使用中文,否则会出现错误
where
等价于filter算子,将满足条件的记录过滤出来
EmpDF.selectExpr("id","name","age","salary","salary * 12 as annual_salary")
.where("name like '&st&' or annual_salary==12000.0")
.show()
//结果
+---+------+---+------+-------------+
| id| name|age|salary|annual_salary|
+---+------+---+------+-------------+
| 3|Justin| 19|1000.0| 12000.0|
| 5| Lisa| 19|1000.0| 12000.0|
+---+------+---+------+-------------+
withColumn
给结果中添加新的字段
EmpDF.selectExpr("id","name","age","salary","dept")
.where("name like '&st&' or age > 20")
.withColumn("annual_salary",$"salary"*12)
.withColumn("dept_as",$"dept")
.show()
//结果
+---+-------+---+------+----+-------------+-------+
| id| name|age|salary|dept|annual_salary|dept_as|
+---+-------+---+------+----+-------------+-------+
| 1|Michael| 29|2000.0| 001| 24000.0| 001|
| 2| Andy| 30|5000.0| 001| 60000.0| 001|
+---+-------+---+------+----+-------------+-------+
withColumnRenamed
修改字段的名字
EmpDF.selectExpr("id","name","age","salary","dept")
.where("name like '&st&' or age >= 20")
.withColumnRenamed("id","eid")
.show()
//结果
+---+-------+---+------+----+
|eid| name|age|salary|dept|
+---+-------+---+------+----+
| 1|Michael| 29|2000.0| 001|
| 2| Andy| 30|5000.0| 001|
| 4| Kaine| 20|5000.0| 003|
+---+-------+---+------+----+
groupBy
类似于SQL中的
groupBy
字句,必须和聚合函数连用。
EmpDF.selectExpr("id","name","age","salary","dept")
.groupBy("dept")
.sum("salary")
.show()
//结果
+----+-----------+
|dept|sum(salary)|
+----+-----------+
| 003| 5000.0|
| 001| 7000.0|
| 002| 2000.0|
+----+-----------+
agg
负责联合多个聚合算子
count
、max
、min
、avg
等实现常规复合计算
// 第一种方式
EmpDF.select("id","name","age","salary","dept")
.groupBy("dept")
.agg(sum("salary") as "total_salary",avg("salary") as "avg_salary",max("salary") as "max_salary")
.show()
//结果
|dept|total_salary|avg_salary|max_salary|
+----+------------+----------+----------+
| 003| 5000.0| 5000.0| 5000.0|
| 001| 7000.0| 3500.0| 5000.0|
| 002| 2000.0| 1000.0| 1000.0|
+----+------------+----------+----------+
//方式二
EmpDF.select("id","name","age","salary","dept")
.groupBy("dept")
.agg(("salary","sum" ) ,("salary","avg"),("salary","max"))
.show()
//结果
+----+-----------+-----------+-----------+
|dept|sum(salary)|avg(salary)|max(salary)|
+----+-----------+-----------+-----------+
| 003| 5000.0| 5000.0| 5000.0|
| 001| 7000.0| 3500.0| 5000.0|
| 002| 2000.0| 1000.0| 1000.0|
+----+-----------+-----------+-----------+
//方式三
EmpDF.select("id","name","age","salary","dept")
.groupBy("dept")
.agg(Map("age" -> "avg","salary"->"avg"))
.show()
//结果
+----+--------+-----------+
|dept|avg(age)|avg(salary)|
+----+--------+-----------+
| 003| 20.0| 5000.0|
| 001| 29.5| 3500.0|
| 002| 19.0| 1000.0|
+----+--------+-----------+
方式三 :Map方式,key出现重复会导致覆盖
EmpDF.select("id","name","age","salary","dept")
.groupBy("dept")
.agg(Map("age" -> "avg","salary"->"avg","salary"->"max"))
.show()
//结果
+----+--------+-----------+
|dept|avg(age)|max(salary)|
+----+--------+-----------+
| 003| 20.0| 5000.0|
| 001| 29.5| 5000.0|
| 002| 19.0| 1000.0|
+----+--------+-----------+
pivot
实现行转列
先准备样例数据
stuno name course score
-----------------------
001 zhangsan math 80
001 zhangsan chinese 70
001 zhangsan english 70
001 zhangsan history 90
002 wangw math 90
002 wangw chinese 80
case class CourseScore(stuno:String,name:String,course:String,score:Double)
val spark = SparkSession.builder().appName("CourseScore").master("local[6]").getOrCreate()
spark.sparkContext.setLogLevel("FATAL")
val CourseScoreRDD:RDD[CourseScore] = spark.sparkContext.textFile("file:///F:/demo/students")
.map(_.split("\\s+"))
.map(ts => CourseScore(ts(0), ts(1), ts(2), ts(3).toDouble))
val courses = spark.sparkContext.textFile("file:///F:/demo/students")
.map(_.split("\\s+")(2))
.distinct()
.collect()//{"math","history","english","chinese"}
import spark.implicits._
import org.apache.spark.sql.functions._
val courseDF = CourseScoreRDD.toDF()
courseDF.groupBy("stuno","name")
.pivot($"course",courses)
.agg(("score","sum"))
.show()
spark.stop()
//结果
+-----+--------+----+-------+-------+-------+
|stuno| name|math|history|english|chinese|
+-----+--------+----+-------+-------+-------+
| 001|zhangsan|80.0| 90.0| 70.0| 70.0|
| 002| wangw|90.0| null| null| 80.0|
+-----+--------+----+-------+-------+-------+
na
专门处理空值的数据,处理方式drop或者fill
- drop 去除不符合条件的
courseDF.groupBy("stuno","name")
.pivot($"course",courses)
.agg(("score","sum"))
.na.drop(5) //保留至少5个字段不为null的记录
.show()
//结果
+-----+--------+----+-------+-------+-------+
|stuno| name|math|history|english|chinese|
+-----+--------+----+-------+-------+-------+
| 001|zhangsan|80.0| 90.0| 70.0| 70.0|
+-----+--------+----+-------+-------+-------+
- fill 将null填充指定的默认值
courseDF.groupBy("stuno","name")
.pivot($"course",courses)
.agg(("score","sum"))
.na.fill(Map("history" -> -1,"english" -> 0)) // fill(0) 将 所有null 填充 0.0
.show()
//结果
+-----+--------+----+-------+-------+-------+
|stuno| name|math|history|english|chinese|
+-----+--------+----+-------+-------+-------+
| 001|zhangsan|80.0| 90.0| 70.0| 70.0|
| 002| wangw|90.0| -1.0| 0.0| 80.0|
+-----+--------+----+-------+-------+-------+
cube
多维度分析手段,通常用作数据分析,比groupBy更加灵活。
height weght eq iq
80 23 72 85
80 23 70 85
80 25 70 85
82 30 80 70
case class Kinds(height:Int ,weight:Int, eq:Int, iq:Int)
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object KindsTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("kinds").master("local[6]").getOrCreate()
spark.sparkContext.setLogLevel("FATAL")
val KindsRDD:RDD[Kinds] = spark.sparkContext.textFile("file:///F:/demo/kinds")
.map(_.split("\\s+"))
.map(ts => Kinds(ts(0).toInt, ts(1).toInt, ts(2).toInt, ts(3).toInt))
import spark.implicits._
val KindsDF = KindsRDD.toDF()
import org.apache.spark.sql.functions._
KindsDF.cube("height","weight")
.agg(avg("eq") as "avg_eq",avg("iq") as "avg_iq")
.show()
spark.stop()
}
}
//结果
+------+------+-----------------+------+
|height|weight| avg_eq|avg_iq|
+------+------+-----------------+------+
| null| 23| 71.0| 85.0|
| null| null| 73.0| 81.25|
| null| 30| 80.0| 70.0|
| 80| null|70.66666666666667| 85.0|
| null| 25| 70.0| 85.0|
| 80| 25| 70.0| 85.0|
| 82| 30| 80.0| 70.0|
| 82| null| 80.0| 70.0|
| 80| 23| 71.0| 85.0|
+------+------+-----------------+------+
over
配合聚合算子,完成局部分析
准备数据
id name job salary dept
------------------------
1 zhangsan sale 8000.00 1
2 wangw sale 8500.00 1
3 lis clicker 6000.00 2
4 zhaol manager 10000.00 1
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
object EmpTestOver {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("over").master("local[6]").getOrCreate()
spark.sparkContext.setLogLevel("FATAL")
//id name job salary dept
val EmpRDD:RDD[(Int,String,String,Double,String)] = spark.sparkContext.textFile("file:///F:/demo/emp")
.map(_.split("\\s+"))
.map(ts => (ts(0).toInt, ts(1), ts(2), ts(3).toDouble, ts(4)))
import spark.implicits._
val empDF = EmpRDD.toDF("id","name","job","salary","dept")
val w = Window.partitionBy("dept")//获取当前记录所在dept的所有记录
.orderBy($"salary" desc)//按照salary进行降序 负数<-- currentRow(0) --> 正数
.rowsBetween(Window.unboundedPreceding,Window.currentRow)// 比当前记录大的偏移量
import org.apache.spark.sql.functions._
empDF.select("id","name","salary","dept")
.withColumn("rank",count($"salary") over(w))
.withColumn(colName = "avg_salary",avg($"salary") over(Window.partitionBy("dept")))
.withColumn(colName = "max_salary",max($"salary") over(Window.partitionBy("dept")))
.withColumn(colName = "min_salary",min($"salary") over(Window.partitionBy("dept")))
.show()
spark.stop()
}
}
//结果
+---+--------+-------+----+----+-----------------+----------+----------+
| id| name| salary|dept|rank| avg_salary|max_salary|min_salary|
+---+--------+-------+----+----+-----------------+----------+----------+
| 4| zhaol|10000.0| 1| 1|8833.333333333334| 10000.0| 8000.0|
| 2| wangw| 8500.0| 1| 2|8833.333333333334| 10000.0| 8000.0|
| 1|zhangsan| 8000.0| 1| 3|8833.333333333334| 10000.0| 8000.0|
| 3| lis| 6000.0| 2| 1| 6000.0| 6000.0| 6000.0|
+---+--------+-------+----+----+-----------------+----------+----------+
join
用作表连接
val empDF = EmpRDD.toDF("id","name","job","salary","dept")
val empFrame = empDF.select("id","name","salary","dept")
val deptDF = List((1,"销售部"),(2,"运营部"),(3,"研发部")).toDF("deptid","deptname")
empFrame.join(deptDF,$"dept"===$"deptid","left_outer").show()
//结果
+---+--------+-------+----+------+--------+
| id| name| salary|dept|deptid|deptname|
+---+--------+-------+----+------+--------+
| 1|zhangsan| 8000.0| 1| 1| 销售部|
| 2| wangw| 8500.0| 1| 1| 销售部|
| 3| lis| 6000.0| 2| 2| 运营部|
| 4| zhaol|10000.0| 1| 1| 销售部|
+---+--------+-------+----+------+--------+
或者如果两个表字段一样,可以
val empDF = EmpRDD.toDF("id","name","job","salary","dept")
val empFrame = empDF.select("id","name","salary","dept")
val deptDF = List((1,"销售部"),(2,"运营部"),(3,"研发部")).toDF("dept","deptname")
empFrame.join(deptDF,"dept").show()
//结果
+----+---+--------+-------+--------+
|dept| id| name| salary|deptname|
+----+---+--------+-------+--------+
| 1| 1|zhangsan| 8000.0| 销售部|
| 1| 2| wangw| 8500.0| 销售部|
| 2| 3| lis| 6000.0| 运营部|
| 1| 4| zhaol|10000.0| 销售部|
+----+---+--------+-------+--------+
drop
val empDF= empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")
val deptDF=List((1,"销售部"),(2,"运营部")).toDF("deptid","deptname")
empDF.join(deptDF,$"dept" === $"deptid","left_outer")
.drop("deptid")
.show()
//结果
+---+--------+-------+----+--------+
| id| name| salary|dept|deptname|
+---+--------+-------+----+--------+
| 1|zhangsan| 8000.0| 1| 销售部|
| 2| wangw| 8500.0| 1| 销售部|
| 3| lis| 6000.0| 2| 运营部|
| 4| zhaol|10000.0| 1| 销售部|
| 5| win7|10000.0| 3| null|
+---+--------+-------+----+--------+
dropDuplicates
删除重复的记录
val empDF= empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")
val deptDF=List((1,"销售部"),(2,"运营部")).toDF("deptid","deptname")
empDF.join(deptDF,$"dept" === $"deptid","left_outer")
.drop("deptid")
.dropDuplicates("deptname")
.show()
//结果
+---+--------+-------+----+--------+
| id| name| salary|dept|deptname|
+---+--------+-------+----+--------+
| 1|zhangsan| 8000.0| 1| 销售部|
| 5| win7|10000.0| 3| null|
| 3| lis| 6000.0| 2| 运营部|
+---+--------+-------+----+--------+
orderBy
val empDF= empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")
val deptDF=List((1,"销售部"),(2,"运营部")).toDF("deptid","deptname")
empDF.join(deptDF,$"dept" === $"deptid","left_outer")
.orderBy($"dept" desc,$"salary" desc)
.show()
//结果
+---+--------+-------+----+------+--------+
| id| name| salary|dept|deptid|deptname|
+---+--------+-------+----+------+--------+
| 5| win7|10000.0| 3| null| null|
| 3| lis| 6000.0| 2| 2| 运营部|
| 4| zhaol|10000.0| 1| 1| 销售部|
| 2| wangw| 8500.0| 1| 1| 销售部|
| 1|zhangsan| 8000.0| 1| 1| 销售部|
+---+--------+-------+----+------+--------+
limit
分页语句,等价take(n)前n条 取前n条
val empDF= empRDD.toDF("id","name","job","salary","dept").select("id","name","salary","dept")
val deptDF=List((1,"销售部"),(2,"运营部")).toDF("deptid","deptname")
empDF.join(deptDF,$"dept" === $"deptid","left_outer")
.orderBy($"dept" desc,$"salary" desc)
.limit(3) //获取前3
.show()
//结果
+---+--------+------+----+
| id| name|salary|dept|
+---+--------+------+----+
| 1|zhangsan|8000.0| 1|
| 2| wangw|8500.0| 1|
+---+--------+------+----+
4、Dataset/DataFrame SQL
前期准备样例数据
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object DataSetSql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[6]").appName("sql").getOrCreate()
spark.sparkContext.setLogLevel("FATAL")
val studentRDD:RDD[CourseScore] = spark.sparkContext.textFile("file:///F:/demo/students")
.map(_.split("\\s+"))
.map(ts => CourseScore(ts(0), ts(1), ts(2), ts(3).toDouble))
val empRDD:RDD[Employee] = spark.sparkContext.textFile("file:///F:/demo/employee")
.map(_.split(","))
.map(t => Employee(t(0).toInt, t(1), t(2).toBoolean, t(3).toInt, t(4).toDouble, t(5)))
import spark.implicits._
val studentDF = studentRDD.toDF()
val empDF = empRDD.toDF()
val deptDF = List(("001","销售部"),("002","运营部"),("003","行政部")).toDF("deptid","deptname")
studentDF.createOrReplaceTempView("t_student")
empDF.createOrReplaceTempView("t_emp")
deptDF.createOrReplaceTempView("t_dept")
val sql = "" // 主要是研究sql语句
spark.sql(sql).show()
spark.stop()
}
}
准备三张表
+-----+--------+-------+-----+
|stuno| name| course|score|
+-----+--------+-------+-----+
| 001|zhangsan| math| 80.0|
| 001|zhangsan|chinese| 70.0|
| 001|zhangsan|english| 70.0|
| 001|zhangsan|history| 90.0|
| 002| wangw| math| 90.0|
| 002| wangw|chinese| 80.0|
+-----+--------+-------+-----+
+---+-------+-----+---+------+----+
| id| name| sex|age|salary|dept|
+---+-------+-----+---+------+----+
| 1|Michael|false| 29|2000.0| 001|
| 2| Andy| true| 30|5000.0| 001|
| 3| Justin| true| 19|1000.0| 002|
| 4| Kaine|false| 20|5000.0| 003|
| 5| Lisa|false| 19|1000.0| 002|
+---+-------+-----+---+------+----+
+------+--------+
|deptid|deptname|
+------+--------+
| 001| 销售部|
| 002| 运营部|
| 003| 行政部|
+------+--------+
单表查询
select * from t_employee
//结果
+---+-------+-----+---+------+----+
| id| name| sex|age|salary|dept|
+---+-------+-----+---+------+----+
| 1|Michael|false| 29|2000.0| 001|
| 2| Andy| true| 30|5000.0| 001|
| 3| Justin| true| 19|1000.0| 002|
| 4| Kaine|false| 20|5000.0| 003|
| 5| Lisa|false| 19|1000.0| 002|
+---+-------+-----+---+------+----+
where子句过滤
select * from t_employee where name = 'Michael' or age >=29
//结果
+---+-------+-----+---+------+----+
| id| name| sex|age|salary|dept|
+---+-------+-----+---+------+----+
| 1|Michael|false| 29|2000.0| 001|
| 2| Andy| true| 30|5000.0| 001|
+---+-------+-----+---+------+----+
模糊查询
val sql = "select * from t_emp where age >= 20 or name like '%sa%' "
//结果
+---+-------+-----+---+------+----+
| id| name| sex|age|salary|dept|
+---+-------+-----+---+------+----+
| 1|Michael|false| 29|2000.0| 001|
| 2| Andy| true| 30|5000.0| 001|
| 4| Kaine|false| 20|5000.0| 003|
| 5| Lisa|false| 19|1000.0| 002|
+---+-------+-----+---+------+----+
分组聚合
相当于agg操作
val sql = "select dept,avg(salary),max(salary),sum(salary) from t_emp group by dept"
//结果
+----+-----------+-----------+-----------+
|dept|avg(salary)|max(salary)|sum(salary)|
+----+-----------+-----------+-----------+
| 003| 5000.0| 5000.0| 5000.0|
| 001| 3500.0| 5000.0| 7000.0|
| 002| 1000.0| 1000.0| 2000.0|
+----+-----------+-----------+-----------+
分组过滤
val sql = "select dept,avg(salary) as avg_salary,max(salary),sum(salary) from t_emp group by dept having avg_salary>=3500.0"
//结果
+----+----------+-----------+-----------+
|dept|avg_salary|max(salary)|sum(salary)|
+----+----------+-----------+-----------+
| 003| 5000.0| 5000.0| 5000.0|
| 001| 3500.0| 5000.0| 7000.0|
+----+----------+-----------+-----------+
case when
val sql =
"""
select stuno,name,
max(case course when 'math' then score else 0 end) as math,
max(case course when 'english' then score else 0 end) as english,
max(case course when 'chinese' then score else 0 end) as chinese,
max(case course when 'history' then score else 0 end) as history
from t_student
group by stuno,name
"""
//结果
+-----+--------+----+-------+-------+-------+
|stuno| name|math|english|chinese|history|
+-----+--------+----+-------+-------+-------+
| 001|zhangsan|80.0| 70.0| 70.0| 90.0|
| 002| wangw|90.0| 0.0| 80.0| 0.0|
+-----+--------+----+-------+-------+-------+
pivot
val sql =
"""
select * from t_student pivot(max(score) for course in('math','english','chinese','history'))
"""
/結果
+-----+--------+----+-------+-------+-------+
|stuno| name|math|english|chinese|history|
+-----+--------+----+-------+-------+-------+
| 001|zhangsan|80.0| 70.0| 70.0| 90.0|
| 002| wangw|90.0| null| 80.0| null|
+-----+--------+----+-------+-------+-------+
cube
val kindsRDD:RDD[Kinds] = spark.sparkContext.textFile("file:///F:/demo/kinds")
.map(_.split("\\s+"))
.map(t => Kinds(t(0).toInt,t(1).toInt,t(2).toInt,t(3).toInt))
import spark.implicits._
val kindDF = kindsRDD.toDF()
kindDF.createOrReplaceTempView("t_kinds")
val sql =
"""
select height,weight,avg(eq),avg(iq) from t_kinds group by height,weight with cube
"""
//结果
+------+------+-----------------+-------+
|height|weight| avg(eq)|avg(iq)|
+------+------+-----------------+-------+
| null| 23| 71.0| 85.0|
| null| null| 73.0| 81.25|
| null| 30| 80.0| 70.0|
| 80| null|70.66666666666667| 85.0|
| null| 25| 70.0| 85.0|
| 80| 25| 70.0| 85.0|
| 82| 30| 80.0| 70.0|
| 82| null| 80.0| 70.0|
| 80| 23| 71.0| 85.0|
+------+------+-----------------+-------+
表连接
相当于join操作
// 添加列 每个部门的人数
select e.*,d.* from t_employee e left join t_dept d on d.deptid = e.dept
//结果
+---+-------+-----+---+------+----+------+--------+
| id| name| sex|age|salary|dept|deptid|deptname|
+---+-------+-----+---+------+----+------+--------+
| 1|Michael|false| 29|2000.0| 001| 001| 销售部|
| 2| Andy| true| 30|5000.0| 001| 001| 销售部|
| 3| Justin| true| 19|1000.0| 002| 002| 运营部|
| 4| Kaine|false| 20|5000.0| 003| 003| 行政部|
| 5| Lisa|false| 19|1000.0| 002| 002| 运营部|
+---+-------+-----+---+------+----+------+--------+
子查询
//查询部门员工信息,并且查询所在部门的人数
val sql =
"""
select id,name,salary,dept,(select count(*) from t_emp e2 where e2.dept=e1.dept) from t_emp e1
"""
//结果
+---+-------+------+----+--------------------+
| id| name|salary|dept|scalarsubquery(dept)|
+---+-------+------+----+--------------------+
| 4| Kaine|5000.0| 003| 1|
| 1|Michael|2000.0| 001| 2|
| 2| Andy|5000.0| 001| 2|
| 3| Justin|1000.0| 002| 2|
| 5| Lisa|1000.0| 002| 2|
+---+-------+------+----+--------------------+
开窗函数
- 计算部门员工薪资排名
val sql =
"""
select id,name,salary,dept,
count(*) over (partition by dept order by salary desc rows between unbounded preceding and current row) rank
from t_emp
order by dept desc
"""
//结果
+---+-------+------+----+----+
| id| name|salary|dept|rank|
+---+-------+------+----+----+
| 4| Kaine|5000.0| 003| 1|
| 3| Justin|1000.0| 002| 1|
| 5| Lisa|1000.0| 002| 2|
| 2| Andy|5000.0| 001| 1|
| 1|Michael|2000.0| 001| 2|
+---+-------+------+----+----+
如果出现薪资一样的员工,最终排名会存在问题
- 计算部门员工薪资排名 -rank函数,排名不连续
val sql =
"""
select id,name,salary,dept,
rank() over (partition by dept order by salary desc rows between unbounded preceding and current row) rank
from t_emp
order by dept desc
"""
//结果
+---+-------+------+----+-----+
| id| name|salary|dept|count|
+---+-------+------+----+-----+
| 1|Michael|2000.0| 001| 2|
| 2| Andy|5000.0| 001| 1|
| 6| wbd|1500.0| 002| 2|
| 7| lyb|3000.0| 002| 1|
| 3| Justin|1000.0| 002| 3|
| 5| Lisa|1000.0| 002| 3|
| 4| Kaine|5000.0| 003| 1|
+---+-------+------+----+-----+
- 计算部门员工薪资排名 -dense_rank函数,让排名连续
select id,name,salary,dept,
dense_rank() over(partition by dept order by salary desc) count
from t_employee
order by dept desc
//结果
+---+-------+------+----+-----+
| id| name|salary|dept|count|
+---+-------+------+----+-----+
| 4| Kaine|5000.0| 003| 1|
| 3| Justin|1000.0| 002| 1|
| 5| Lisa|1000.0| 002| 1|
| 2| Andy|5000.0| 001| 1|
| 6| Lily|2000.0| 001| 2|
| 1|Michael|2000.0| 001| 2|
| 7| Lucy|1000.0| 001| 3|
+---+-------+------+----+-----+
分页查询
只支持查询前 n 条
select id,name,salary,dept,dense_rank() over(partition by dept order by salary desc) countfrom t_employeeorder by dept desc limit 3
5、自定义函数
单行函数
package com.txb.demo04
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object ConsumerTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("consumer").master("local[6]").getOrCreate()
spark.sparkContext.setLogLevel("FATAL")
import spark.implicits._
//1,Michael,false,29,2000,001
val EmpRDD:RDD[Employee] = spark.sparkContext.textFile("file:///F:/demo/employee")
.map(_.split(","))
//id:Int,name:String,sex:Boolean,age:Int,salary:Double,dept:String
.map(t => Employee(t(0).toInt,t(1),t(2).toBoolean,t(3).toInt,t(4).toDouble,t(5)))
val empDF = EmpRDD.toDF()
empDF.createOrReplaceTempView("t_emp")
spark.udf.register("annual_salary", (age:Int) =>{
if (age >= 22){
"big than 22"
}else{
"small than 22"
}
})
val sql = "select id,name,age,annual_salary(age) from t_emp "
spark.sql(sql).show()
spark.stop()
}
}
//结果
+---+-------+---+----------------------+
| id| name|age|UDF:annual_salary(age)|
+---+-------+---+----------------------+
| 1|Michael| 29| big than 22|
| 2| Andy| 30| big than 22|
| 3| Justin| 19| small than 22|
| 4| Kaine| 20| small than 22|
| 5| Lisa| 19| small than 22|
| 6| wbd| 22| big than 22|
| 7| lyb| 22| big than 22|
+---+-------+---+----------------------+
无类型用户定义聚合函数(了解)
- 用户需要集成
UserDefinedAggregateFunction
抽象类,例如自定义一个求平均的聚合方法:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructType}
object MyAverage extends UserDefinedAggregateFunction {
//表示输入的字段类型
override def inputSchema: StructType = {
//这里的name叫什么无所谓,但是类型不能写错
new StructType().add("salary",DoubleType)
}
//存储计算的中间结果 累加的总数、计数器
override def bufferSchema: StructType = {
new StructType().add("total",DoubleType)
.add("count",IntegerType)
}
//最终聚合后的返回值类型
override def dataType: DataType = DoubleType
//结果类型是否固定 ,一般设置为true
override def deterministic: Boolean = true
//初始化中间变量 buffer
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0.0 //总金额
buffer(1) = 0 //计数
}
//将记录中的数据更新到buffer中
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if(!input.isNullAt(0)){
val salary = input.getAs[Double](0)
buffer(0)=buffer.getDouble(0)+salary
buffer(1)=buffer.getInt(1)+1
}
}
//合并最终结果,注意一定合并到buffer1
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0)=buffer1.getDouble(0)+buffer2.getDouble(0) //总金额
buffer1(1)=buffer1.getInt(0)+buffer2.getInt(0) //总计数
}
//返回最终结果类型
override def evaluate(buffer: Row): Any = {
buffer.getDouble(0)/buffer.getInt(1)
}
}
- 注册用户自定义聚合方法
spark.udf.register("myavg",MyAverage)
- 使用聚合
select dept,myavg(salary) from t_employee group by dept
//结果
+----+------------------+
|dept|myaverage$(salary)|
+----+------------------+
| 003| 5000.0|
| 001| 2500.0|
| 002| 1000.0|
+----+------------------+
6、数据导入/导出
(1)文件系统
JSON
读入/写出
spark.read
.format("json")
.load("file:///F:/demo/json") //读到的json格式数据
.write
.json("file:///F:/write/json")//导出
CSV
写出
import com.txb.demo04.Employee
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object DataSources {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[6]").appName("datasource").getOrCreate()
spark.sparkContext.setLogLevel("FATAL")
import spark.implicits._
//1,Michael,false,29,2000,001
val EmpRDD:RDD[Employee] = spark.sparkContext.textFile("file:///F:/demo/employee")
.map(_.split(","))
//id:Int,name:String,sex:Boolean,age:Int,salary:Double,dept:String
.map(t => Employee(t(0).toInt,t(1),t(2).toBoolean,t(3).toInt,t(4).toDouble,t(5)))
val empDF = EmpRDD.toDF()
empDF.createOrReplaceTempView("t_emp")
empDF.select("id","name","age","salary")
.write
.format("csv")
.option("seq",",") //指定分隔符
.option("header","true")//是否添加表头
.save("file:///F:/demo/csv")
spark.stop()
}
}
读入
spark.read
.option("seq",",")
.option("header","true") // 如果源文件中有表头 必须为true
.csv("file:///F:/demo/csv")
.show()
//结果
+---+-------+---+------+
| id| name|age|salary|
+---+-------+---+------+
| 1|Michael| 29|2000.0|
| 2| Andy| 30|5000.0|
| 3| Justin| 19|1000.0|
| 4| Kaine| 20|5000.0|
| 5| Lisa| 19|1000.0|
| 6| wbd| 22|1500.0|
| 7| lyb| 22|3000.0|
+---+-------+---+------+
数据库RDBMS
- JDBC
写入
spark.read
.option("seq",",")
.option("header","true")
.csv("file:///F:/demo/csv")
.write
.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/test")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","t_emp")
.option("user","root") // 不能写出username 只能些user
.option("password","root")
.mode(SaveMode.Overwrite)
.save()
读出
spark.read
.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/test")
.option("driver","com.mysql.jdbc.Driver")
.option("query","select * from t_emp")
.option("user","root")
.option("password","root")
.load()
.show()
//结果
+---+-------+---+------+
| id| name|age|salary|
+---+-------+---+------+
| 1|Michael| 29|2000.0|
| 5| Lisa| 19|1000.0|
| 2| Andy| 30|5000.0|
| 6| wbd| 22|1500.0|
| 3| Justin| 19|1000.0|
| 4| Kaine| 20|5000.0|
| 7| lyb| 22|3000.0|
+---+-------+---+------+
任意外围系统
如果用户需要将数据写入到第三方系统,用户需要使用foreachPartition方式写出
- 写入redis
自定义 RedisSink
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object SinkRedis {
var config: JedisPoolConfig = null
def getPool():Jedis = { // 创建一个jedis资源池的配置对象
config = new JedisPoolConfig
// 资源池最大存在的连接数
config.setMaxTotal(8)
// 资源池最大空闲连接数
config.setMaxIdle(8)
// 资源池最小空闲连接数
config.setMinIdle(0)
//最大等待时间
config.setMaxWaitMillis(3000)
val jedisPool = new JedisPool(config, "192.168.60.134", 6379)
// 获取一个jedis对象
jedisPool.getResource
}
val jedis: Jedis = getPool()
Runtime.getRuntime.addShutdownHook(new Thread(){
override def run(): Unit = {
println("=-===close=====")
jedis.close()
}
})
}
将从数据库中读出的数据导入到Redis
import org.apache.spark.sql.SparkSession
object DataSources {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[6]").appName("datasource").getOrCreate()
spark.sparkContext.setLogLevel("FATAL")
spark.read
.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/test")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","t_emp")
.option("user","root")
.option("password","root")
.load()
.foreachPartition(rows => {
val jedis = SinkRedis.getPool()
rows.foreach(
row => {
jedis.set(row.getAs[String]("name"),row.toString)
}
)
})
spark.stop()
}
}
结果:
127.0.0.1:6379> KEYS *
- “Lisa”
- “wbd”
- “Kaine”
- “Michael”
- “Justin”
- “lyb”
- “Andy”
127.0.0.1:6379> get Andy
“[2,Andy,30,5000.0]”
转载:https://blog.csdn.net/txbdahaoren/article/details/102451071