当前位置:网站首页>Spark_DSL
Spark_DSL
2022-08-02 14:05:00 【大学生爱编程】
1.Spark-SQL
读取csv,json,jdbc的数据
完全兼容HQL,DSL
DateFrame:基于RDD的表结构,最终还是转化成RDD执行
//1 新版本的spark统一入口
val spark: SparkSession =SparkSession.builder()
.master("local")
.appName("sql")
.getOrCreate()
//2 读取数据构建DataFrame,DF相当于一张表
val linesDF=spark
.read
.format("csv") //指定读取数据的类型
.schema("lines STRING") //指定字段名和字段类型
.option("sep","\t") //指定分割符
.load("data/words.txt")
//3 将DF注册成一个视图,然后才能写sql
linesDF.createOrReplaceTempView("lines")
linesDF.printSchema() //打印表结构
//4 写sql 完全兼容HQL
val resultDF:DataFrame=spark.sql(
"""
|select word,count(1) as num from(
|select explode(split(line,',')) as word from
|lines) as t1
|group by word
|""".stripMargin)
resultDF.show() //show()的话只展示部分数据
//5 将数据保存到hdfs
resultDF
.write
.format("csv")
.option("sep","\t")
.mode(SaveMode.Overwrite)
.save("data/wc")
2.DSL示例
DSL写法: DSL必须在DF中写,从上往下写,代码的思想进行
//构建spark入口
val spark: SparkSession =SparkSession.builder()
.master("local")
.appName("sql")
.getOrCreate()
//读取数据构建DataFrame(相当于一张表)
val linesDF: DataFrame =spark
.read
.format("csv")
.schema("lines STRING")
.option("sep","\t")
.load("data/words.txt")
//DSL不用注册成视图,是类SQL语言,介于代码和SQL之间的一种写法
import org.apache.spark.sql.functions._
import spark.implicits._
//相当于直接对linesDF进行select 传入的是列对象 $"列名"
val resultDF: DataFrame =linesDF.select(explode(split($"lines",","))as "word")
.groupBy($"word")
.agg(count($"word") as "c") //统计单词数量
resultDF.show()
//保存数据
resultDF
.write
.format("csv")
.option("sep","\t")
.mode(SaveMode.Overwrite)
.save("data/wc1")
3.DSL解析json,csv文件
导入依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
直接解析json格式的数据
创建spark sql环境
val spark: SparkSession =SparkSession
.builder()
.master("local")
.appName("dsl")
.config("spark.sql.shuffle.partitions",1)
.getOrCreate()
//读取一个json格式的文件,不需要指定分隔符了直接解析,文件类型,文件路径
val studentDF: DataFrame =spark.read
.format("json")
.load("data/students1.json")
//读取csv格式的文件
val scoreDF: DataFrame =spark
.read
.format("csv")
.option("sep", ",")
.schema("sid STRING,cid STRING,sco DOUBLE") //指定列名和类型
.load("data/score.txt")
1.printSchema() 打印表结构
root
|-- age: long (nullable = true)
|-- clazz: string (nullable = true)
|-- gender: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
2.studentDF.show(100) 默认20条数据
3.studentDF.show(false) 某些值太长,完整打印每一列的数据
4.DSL函数
1.选择数据,相当于RDD中的转换算子,返回值是DataFrame
2.不能使用聚合函数,需要在agg中使用聚合函数
studentDF.select("id","age").show(150) //直接抽出来两列展示
studentDF.selectExpr("name","age+1 as age1").show(50) //传一个表达式可以对列进行操作
+------+----+
| name|age1|
+------+----+
|施笑槐| 23|
|吕金鹏| 25|
|单乐蕊| 23|
3.导入隐式转换,使用列对象的方式进行处理
import spark.implicits._
studentDF.select($"id",$"age").show()
4.导入spark函数,使用列对象的方法
import org.apache.spark.sql.functions._
studentDF.select($"id",substring($"clazz",0,2) as "cc").show()
5.where过滤数据
studentDF.where("gender='女' and age=23").show()
+---+--------+------+----------+------+
|age| clazz|gender| id| name|
+---+--------+------+----------+------+
| 23|文科六班| 女|1500100007|尚孤风|
| 23|文科一班| 女|1500100016|潘访烟|
| 23|理科二班| 女|1500100052|居初兰|
6.分组聚合 groupBy() 聚合函数写在agg()中
分组和聚合要一起使用,结果中只包含分组字段和聚合字段
studentDF.groupBy($"clazz")
.agg(count($"clazz") as "num",round(avg($"age")) as "avgAge") //count里传字段
.show(50)
7.排序 order by()
统计班级人数并且降序排列
studentDF
.groupBy($"clazz")
.agg(count($"clazz") as "num")
.orderBy($"num".desc)
.show()
8.表关联 join
val joinDF: DataFrame =studentDF.join(scoreDF,$"id"===$"sid","inner")
9. withColumn("新列名",row_number() over Window.partitionBy(...).orderBy(...))
/**
* 统计每个班级前十的同学
* 先算每个同学的总分
* withColumn 在DF的基础上增加新的列 需要导包
*/
import org.apache.spark.sql.expressions.Window
joinDF
.groupBy($"id",$"clazz")
.agg(sum($"sco") as "sumSco")
.withColumn("row",row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc))
.where($"row"<=10)
.show()
4.DataSource
4.1 csv:需要手动指定列名和类型
val spark: SparkSession =SparkSession
.builder()
.master("local")
.appName("sql") .config("spark.sql.shuffle.partitions",1)
.getOrCreate()
val csvDF: DataFrame =spark
.read
.format("csv")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.option("sep",",")
.load("data/students.txt")
csvDF.show()
import spark.implicits._
import org.apache.spark.sql.functions._
//求每个班级人数并且保存文件内
csvDF
.groupBy($"clazz")
.agg(count($"clazz") as "num")
.write
.format("csv")
.option("sep",",")
.mode(SaveMode.Overwrite)
.save("data/clazzNum1")
4.2 json parquet 格式读取存储
/**
* 读取json格式的数据,解析是不需要指定列名
* 格式化代码,每一行都要是json格式
* 有列名,存储空间变大,少某一个字段对其他没有影响
*/
val jsonDF: DataFrame =spark
.read
.format("json")
.load("data/students1.json")
//统计性别人数,将数据保存为json格式
jsonDF
.groupBy($"gender")
.agg(count($"gender") as "g")
.write
.format("json")
.mode(SaveMode.Overwrite)
.save("data/gender_num")
---------------------------------------------------------------
//上面数据保存为parquet格式 (带表结构的压缩格式 压缩比取决于信息熵)
//认为不可读,时间换空间
jsonDF
.write
.format("Parquet")
.mode(SaveMode.Overwrite)
.save("data/students")
//读取parquet格式数据,自带表结构不需要手动指定列
val parquetDF: DataFrame =spark
.read
.format("parquet")
.load("data/students")
parquetDF.printSchema()
parquetDF.show(30)
4.3 从JDBC中读取数据
//引入MySQL依赖,指定数据格式、驱动、数据库数据表、用户名密码
val jdbcDF: DataFrame = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://master:3306")
.option("dbtable", "bigdata.students")
.option("user", "root")
.option("password", "123456")
.load()
5.RDD和DF转换
//spark入口
val spark: SparkSession =SparkSession
.builder()
.appName("rdd")
.master("local")
.getOrCreate()
val sc: SparkContext =spark.sparkContext //获取SparkContext对象
//取出数据转化成元组格式
val studentsRDD: RDD[(String, String, String, String, String)] =sc.textFile("data/students.txt")
.map(line=>line.split(","))
.map{
case Array(id:String,name:String,age:String,gender:String,clazz:String)=>
(id,name,age,gender,clazz)
}
//导入隐式转换,调用toDF方法
import spark.implicits._
val studentsDF: DataFrame =studentsRDD.toDF("id","name","age","gender","clazz")
studentsDF.show(30)
DDF转RDD
//saprksession环境
val spark: SparkSession = SparkSession
.builder()
.appName("rdd")
.master("local")
.getOrCreate()
//读取文件
val studentDF: DataFrame = spark
.read
.format("json")
.load("data/students.json")
//DF格式转RDD
import spark.implicits._
val studentRDD: RDD[Row] = studentDF.rdd
//通过列名获取数据
val stuRDD: RDD[(String, String, Long, String, String)] = studentRDD.map((row: Row) => {
val id: String = row.getAs[String]("id")
val name: String = row.getAs[String]("name")
val age: Long = row.getAs[Long]("age")
val gender: String = row.getAs[String]("gender")
val clazz: String = row.getAs[String]("clazz")
(id, name, age, gender, clazz)
})
//模式匹配获取数据
val caseRDD: RDD[(String, String, Long, String, String)] = studentRDD.map {
//需要注意字段顺序
case Row(age: Long, clazz: String, gender: String, id: String, name: String) =>
(id, name, age, gender, clazz)
}
6.窗口函数
row_number
rank
sum
count
avg
lag
lead
分组聚合字段会变少,分区是增加一个字段,其他的保持不变
6.1 sum over 中排序的两种用法
//计算年级前十同学的信息
//分区内加order by,是累加的结果
val joinDF: DataFrame =studentDF.join(scoreDF,$"id"===$"sid")
joinDF
.withColumn("sumSco",sum($"sco") over Window.partitionBy($"id").orderBy($"sco"))
.show()
+----------+------+---+------+--------+----------+-------+-----+------+
| id| name|age|gender| clazz| sid| cid| sco|sumSco|
+----------+------+---+------+--------+----------+-------+-----+------+
|1500100001|施笑槐| 22| 女|文科六班|1500100001|1000003| 0.0| 0.0|
|1500100001|施笑槐| 22| 女|文科六班|1500100001|1000002| 5.0| 5.0|
|1500100001|施笑槐| 22| 女|文科六班|1500100001|1000004| 29.0| 34.0|
|1500100001|施笑槐| 22| 女|文科六班|1500100001|1000006| 52.0| 86.0|
//开窗后再进行排序
val joinDF: DataFrame =studentDF.join(scoreDF,$"id"===$"sid")
joinDF
.withColumn("sumSco",sum($"sco") over Window.partitionBy($"id"))
.orderBy($"sumSco".desc)
.limit(60)
--+ +----------+------+---+------+--------+----------+-------+-----+---
| id| name|age|gender| clazz| sid| cid| sco|sumSco|
+----------+------+---+------+--------+----------+-------+-----+------+
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000001|144.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000002|138.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000003| 88.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000007| 91.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000008| 99.0| 630.0|
|1500100929|满慕易| 21| 女|理科三班|1500100929|1000009| 70.0| 630.0|
|1500100080|巫景彰| 21| 男|理科五班|1500100080|1000001|142.0| 628.0|
|1500100080|巫景彰| 21| 男|理科五班|1500100080|1000002|149.0| 628.0|
|1500100080|巫景彰| 21| 男|理科五班|1500100080|1000003|123.0| 628.0|
6.2 count
//统计每科都及格的学生 count orderBy的效果和row_number效果一样,对sid分区,对cid进行排序,对sid进行统计,累加最后的结果就是区内总人数
scoreDF
//关联科目表
.join(subjectDF, "cid")
//过滤不及格的分数
.where($"sco" >= $"ssco" * 0.6)
//统计每个学生几个的科目数
.withColumn("jige", count($"sid") over Window.partitionBy($"sid"))
//取出都及格的学生
.where($"jige" === 6)
//.show(100)
6.3 avg
//统计总分大于年级平均分的学生:(计算方式与开窗方式)
先计算总分,再根据文理科开窗,计算年级均分
joinDF
.withColumn("sumSco",sum($"sco") over Window.partitionBy($"id"))
.withColumn("avgSco",avg($"sumSco") over Window.partitionBy(substring($"clazz",0,2)))
.show(6000)
|1500100999|钟绮晴| 23| 女|文科五班|1500100999|1000004| 48.0| 371.0|374.00766283524905|
|1500100999|钟绮晴| 23| 女|文科五班|1500100999|1000005| 41.0| 371.0|374.00766283524905|
|1500100999|钟绮晴| 23| 女|文科五班|1500100999|1000006| 10.0| 371.0|374.00766283524905|
|1500100003|单乐蕊| 22| 女|理科六班|1500100003|1000001| 48.0| 359.0| 370.9769392033543|
|1500100003|单乐蕊| 22| 女|理科六班|1500100003|1000002|132.0| 359.0| 370.9769392033543|
|1500100003|单乐蕊| 22| 女|理科六班|1500100003|1000003| 41.0| 359.0| 370.9769392033543|
6.4 lag 取当前行的前面几行的那一条数据,必须分区排序
joinDF
.groupBy($"id",$"clazz") //分组聚合只包含出现的列,所以此处对班级进行一次分组
.agg(sum($"sco" ) as "sumSco")
.withColumn("rm",row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc)) //注意设置降序的位置
.show(1000)
+----------+--------+------+---+
| id| clazz|sumSco| rm|
+----------+--------+------+---+
|1500100308|文科一班| 628.0| 1|
|1500100875|文科一班| 595.0| 2|
|1500100943|文科一班| 580.0| 3|
|1500100871|文科一班| 569.0| 4|
//分组聚合只包含出现的列,所以此处对班级进行一次分组
//**lag函数必须进行分区排序**
joinDF
.groupBy($"id",$"clazz")
.agg(sum($"sco" ) as "sumSco")
.withColumn("rm",row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc))
.withColumn("headSumSco",lag($"sumSco",1,750) over Window.partitionBy($"clazz").orderBy($"sumSco".desc))
.withColumn("cha",$"sumSco"-$"headSumSco")
.show(1000)
+----------+--------+------+---+----------+------+
| id| clazz|sumSco| rm|headSumSco| cha|
+----------+--------+------+---+----------+------+
|1500100308|文科一班| 628.0| 1| 750.0|-122.0|
|1500100875|文科一班| 595.0| 2| 628.0| -33.0|
|1500100943|文科一班| 580.0| 3| 595.0| -15.0|
|1500100871|文科一班| 569.0| 4| 580.0| -11.0|
边栏推荐
- ThinkPHP5.0内置分页函数Paginate无法获取POST页数问题的解决办法
- HBuilderX 核心插件安装提示:“插件XXX下载失败,请检查网络”问题的解决办法
- [ROS] (02) Create & compile ROS package Package
- verilog学习|《Verilog数字系统设计教程》夏宇闻 第三版思考题答案(第七章)
- uniapp小程序禁止遮罩弹窗下的页面滚动的完美解决办法
- Tornado framework routing system introduction and (IOloop.current().start()) start source code analysis
- 主存储器(一)
- Flask-RESTful request response and SQLAlchemy foundation
- 原码、补码、反码
- 8576 Basic operations of sequential linear tables
猜你喜欢

The specific operation process of cloud GPU (Hengyuan cloud) training

static关键字3种作用,简单粗暴对比,好理解

Briefly write about the use and experience of PPOCRLabel

华为防火墙

C语言一维数组练习——将m个元素移动到数组尾部

Unit 8 Middleware

重新学习编程day1 【初始c语言】【c语言编写出计算两个数之和的代码】

c语言三子棋详解!!! (电脑智能下棋)(附上完整代码)

Camera Hal(Hal3)层修改Preview流

What's wrong with running yolov5 (1) p, r, map are all 0
随机推荐
十分钟带你入门Nodejs
C语言初级—常见问题(100~200素数,计算1+11+111+...,从键盘获取一个数并输出有几个位)
C语言字符串——关于指针
The specific operation process of cloud GPU (Hengyuan cloud) training
Flask-RESTful request response and SQLAlchemy foundation
verilog学习|《Verilog数字系统设计教程》夏宇闻 第三版思考题答案(第十一章)
mysql常用函数
Kubernetes架构和组件
Creating seven NiuYun Flask project complete and let cloud
Flask framework in-depth
C语言待解决
标签加id 和 加号 两个文本框 和一个var 赋值
8581 Linear linked list inversion
verilog学习|《Verilog数字系统设计教程》夏宇闻 第三版思考题答案(第九章)
Introduction and use of Haystack
线性代数期末复习存档
ThinkPHP5.0内置分页函数Paginate无法获取POST页数问题的解决办法
C语言日记 1“Hello world“
MarkDown syntax summary
原码、反码、补码和移码