当前位置:网站首页>Spark - understand parquet
Spark - understand parquet
2022-06-26 03:59:00 【BIT_ six hundred and sixty-six】
One . quote
parquet Files are common in Spark、Hive、Streamin、MapReduce And other big data scenarios , The efficient data storage and retrieval are realized through column storage and metadata storage , So let's focus on parquet The file in spark Storage in scenarios , Read and use the pit that may be encountered .
Two .Parquet Loading mode
1.SparkSession.read.parquet
SparkSession be located org.apache.spark.sql.SparkSession Under the class , In addition to supporting reading parquet Out of the column file ,SparkSession Also supports reading ORC Column storage file , You can refer to : Spark Read ORC FIle
val conf = new SparkConf()
.setAppName("ParquetInfo")
.setMaster("local")
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
spark.read.parquet(path).foreach(row => {
val head = row.getString(0)
println(head)
})
After reading, a Sql.DataFrame, Support common sql Syntax operation , If you don't want to use it sql It can also be done through .rdd The way to get RDD[Row], Then traverse each partition Under the Iterator[Row] that will do .
Tips:
Can be executed later sql operation , Of course, initialization is also supported SqlContext call sql Method , But use SparkSession It can be done .
val parquetFileDF = spark.read.parquet("path")
parquetFileDF.createOrReplaceTempView("tableName")
val resultDf = spark.sql("SELECT * FROM tableName")
val sqlContext = new SQLContext(sc)
sqkContext.sql("xxx")
2.SparkContext.HadoopFile
Use hadoopFile When reading, you need to specify the corresponding K-V as well as InputFormat The format of ,Parquet File corresponding K-V by Void-ArrayWritable, Its InputFormat by : org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, obtain ArrayWritable After that, you can get Writable.
val sc = spark.sparkContext
sc.setLogLevel("error")
val parquetInfo = sc.hadoopFile(path, classOf[MapredParquetInputFormat], classOf[Void], classOf[ArrayWritable])
parquetInfo.take(5).foreach(info => {
val writable = info._2.get()
val head = writable(0)
println(writable.length + "\t" + head)
})
Tips:
Need to be in SparkConf Add serialized configuration to , otherwise hadoopFile The method will report an error :
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
writable The specific content can only be obtained through deserialization , So it's recommended to use SparkSession The official api Read , But you can RcFile SparkSession Direct reading is not supported for the time being , So it can be used sc.hadoopRdd Method to read the data stored in the same column RcFile Format file , You can refer to : Spark Read RcFile
3、 ... and .Parquet storage
1. Static conversion
Parquet -> Parquet, Read parquet Generate Sql.DataFrame Redeposit , similar RDD Of transform:
spark.read.parquet(path)
.write.mode(SaveMode.Overwrite)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.format("parquet")
.save("/split")
2.RDD[T*] transformation
General data RDD You can do this by joining import sqlContext.implicits._ The way of implicit conversion is determined by RDD Convert to sql.Dataframe, Then complete parquet The storage , Here is a cover up PairRDD Convert to df And stored methods :
import sqlContext.implicits._
val commonStringRdd = sc.emptyRDD[(String, String)].toDF()
commonStringRdd.write
.mode(SaveMode.Overwrite)
.format("parquet")
.save("")
Tips:
SaveModel It is divided into Append Additional 、Overwrite Cover 、ErrorIfExists Report errors 、Ignore Ignore the four modes , The first two are easy to understand , The latter two former means that an error will be reported if the address already exists , The latter ignores and does not affect the original data if the address already exists .SaveModel By enumeration Enum The way to achieve :
The details of the RDD transformation Sql.DataFrame You can refer to :Spark - RDD / ROW / sql.DataFrame Interturn .
3.RDD[Row] transformation
If there are generated RDD[Row] You can call it directly sqlContext Will be RDD Convert to DataFrame. here TABLE_SCHEMA It can be regarded as a description of each column of data , similar Hive Of column Information about , Mainly the field name and type , You can also add additional information ,sqlContext Match the corresponding column attributes with Row One by one , If Schema The length does not reach Row Total number of columns for , The subsequent fields can only be read as Null.
val sqlContext = new SQLContext(sc)
final val TABLE_SCHEME = StructType(Array(
StructField("A", StringType),
StructField("B", StringType),
StructField("C", StringType),
StructField("D", StringType),
StructField("E", StringType),
StructField("F", StringType),
StructField("G", StringType),
StructField("H", StringType)
))
val commonRowRdd = sc.emptyRDD[Row]
sqlContext.createDataFrame(commonRowRdd, TABLE_SCHEME)
.write.mode(SaveMode.Overwrite)
.format("parquet")
.save("/split")
Tips:
Errors may be reported when using the above syntax : Illegal pattern component: XXX , This is because of the internal DataFormat The problem of resolution , Add... To the code .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") that will do .
spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").parquet(path)
Four .Parquet elementary analysis
Parquet Because it's open source , Support multi platform and multi system and efficient storage and coding scheme , This makes it very suitable for task development in big data scenarios , Let's take a brief look at two of his characteristics , Columnar storage and metadata storage :
1. The column type storage - Smaller IO
CSV Is the most common line storage , For some scenarios that require separate features or columns , If it is CSV The file needs to traverse the entire line and split , Finally get the target element , and Parquet Mode through column storage , For individual features, you can directly access , Thus, the efficiency of execution is improved , Reduced data IO.
CSV: A,B,C,D,E -> Split(",")[col]
Parquet: A B C D E -> getString(col)
2. Metadata Store - Higher compression ratio
Parquet Adopt multiple codes encoding The way , Ensure efficient data storage and low space
A.Run Length encoding
Run length encoding , When multiple columns of data in a row have a lot of duplicate data , Can pass "X repeated N Time " How to record , Reduce the cost of recording , although N It could be very big , But the cost of storage is small :
[1,2,1,1,1,1,2] -> 1-1,2-1,1-4,2-1
B.Dictionary encoding
Dictionary code , As the name suggests, it is through mapping , Save data with too many duplicates , for example "0" -> "LongString":
[LongString, LongString, LongString] -> [0, 0, 0]
C.Delta encoding
Incremental encoding , Apply to unix Time stamp , The timestamp record is 1970 year 1 month 1 The number of seconds in the day , The initial timestamp can be directly subtracted when storing the timestamp , Reduce storage , such as 1577808000 As a benchmark , You can reduce a lot of storage space :
[1577808000, 1577808004, 1577808008] -> [0, 4, 8]
3. Storage - Compression comparison
val st = System.currentTimeMillis()
val pairInfo = (0 to 1000000).zipWithIndex.toArray
val format = "csv" // csv、json、parquet
sc.parallelize(pairInfo).toDF("A", "B")
.write
.mode(SaveMode.Overwrite)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.save(s"./output/$format")
val saveType = "gzip" // text、gzip
sc.parallelize(pairInfo).saveAsTextFile(s"./output/$saveType", classOf[GzipCodec])
val cost = System.currentTimeMillis() - st
println(s" Time consuming : $cost")
Use the above two methods to separate 0 To 1000000 The array of is saved to the corresponding file , Look at the size of the storage :
type | Text | Gzip | Parquet | CSV | JSON |
size (MB) | 15.8 | 4.6 | 8 | 13.8 | 23.8 |
Compared to tabular data CSV and JSON Storage ,parquet Provides a higher compression ratio ,Amazon S3 Clusters have been compared CSV And parquet The efficiency of , Use Parquet You can cut 87% Size , The speed of inquiry is fast 34 times At the same time, it can save 99.7 Cost of , Therefore, in the scenario of adding large amounts of data and often requiring individual column operations ,Parquet Very suitable .
4. Read - Efficiency comparison
Then read the above files separately :
val csv = spark.read.csv(path + "/output/csv").rdd.count()
val parquet = spark.read.parquet(path + "/output/parquet").rdd.count()
val json = spark.read.json(path + "/output/json").count()
val common = sc.textFile(path + "/output/common").count()
val gz = sc.textFile(path + "/output/gzip").count()
type | Text | Gzip | Parquet | CSV | JSON |
Time consuming (ms) | 1417 | 1448 | 4952 | 6870 | 6766 |
comparison CSV,JSON There are advantages , But relative to the number of rows stored Text and Gzip, perform count The row statistics operation of class is obviously not the strength of column storage file , So there is a big difference , In case of big data, statistics on one or several fields ,Parquet It will provide higher performance than line storage files .
5.selectExpr
Read Parquet In addition to obtaining the original field contents in the file , It can also be done through selectExpr Action for more additional information , The method is located in org.apache.spark.sql.functions in , It contains collect_list Similar aggregation operations , Also contains count Similar statistical operations , also max、min、isnull wait .
spark.read.parquet(path).selectExpr("count('_c1')").rdd.foreach(row => {
println(row.getLong(0))
})
The above operations are carried out through selectExpr Got count(_c1) Number of features ,count Result:5383.
among _c1 by Parquet Acquired sql.DataFrame Default schema, The default can be obtained by the following methods schema Information :
val schema = spark.read.parquet(path).schema
println(schema)
Here's part of it , The feature name is from _c0 Start accumulating in turn , The default is _c0,_c1 , If you define schema Of StructField , Use spark.read.schema().parqeut() Read it out sql.Dataframe Of selectExpr The column name used by the operation in the function should be changed to the name defined by itself , for example _c1 I define it as age, Then the above expression should be changed to count('age'), Reuse _c1 Will report a mistake . More detailed schema Operation can refer to :Parquet Appoint schema
5、 ... and . summary
Spark - Parquet That's what we usually use ,SparkSession Integrated reading parquet、orc Of API It's very convenient , If necessary, it is suggested to pass directly API Read instead of HadoopRdd / HadoopFile . Finally, I want to say parquet The naming of is really fun ,parquet Floor , Variable length column names are stored , If the plane display also has the feeling of floor .
边栏推荐
- 力扣 515. 在每个树行中找最大值
- 外包干了四年,人直接废了。。。
- YOLOv5改进:更换骨干网(Backbone)
- 面了个字节拿25k出来的测试,算是真正见识到了基础的天花板
- 三元损失英文版
- Open source! Vitae model brushes the world's first again: the new coco human posture estimation model achieves the highest accuracy of 81.1ap
- matplotlib多条折线图,点散图
- 判断两个集合的相同值 ||不同值
- What preparation should I make before learning SCM?
- R语言与机器学习
猜你喜欢
Uni app custom selection date 1 (September 16, 2021)
軟件調試測試的十大重要基本准則
开源!ViTAE模型再刷世界第一:COCO人体姿态估计新模型取得最高精度81.1AP
高性能算力中心 — RoCE — Overview
線程同步之讀寫鎖
1.基础关
Spark - 一文搞懂 parquet
MapReduce execution principle record
MySQL advanced Chapter 1 (installing MySQL under Linux) [2]
ABP framework Practice Series (II) - Introduction to domain layer
随机推荐
xml 解析bean工具类
Concept and implementation of QPS
Sorting out the knowledge points of the renderview renderobject parentdata of the shuttle
816. fuzzy coordinates
169. 多数元素
EF core Basics
Custom parameter QR code picture combined with background picture to generate new picture node environment
刷题记录Day01
Lua语法讲解
When the tiflash function is pushed down, it must be known that it will become a tiflash contributor in ten minutes
Open camera anomaly analysis (I)
ipvs之ipvs0网卡
商城风格也可以很多变,DIY 了解一下
Camera-memory内存泄漏分析(三)
Uni app swiper rotation chart (full screen / card)
Introduction of mybatis invalid
使用SOAPUI访问对应的esb工程
How to solve the problem that iterative semi supervised training is difficult to implement in ASR training? RTC dev Meetup
Dynamic segment tree leetcode seven hundred and fifteen
Prism framework