当前位置:网站首页>Spark - understand parquet
Spark - understand parquet
2022-06-26 03:59:00 【BIT_ six hundred and sixty-six】
One . quote
parquet Files are common in Spark、Hive、Streamin、MapReduce And other big data scenarios , The efficient data storage and retrieval are realized through column storage and metadata storage , So let's focus on parquet The file in spark Storage in scenarios , Read and use the pit that may be encountered .

Two .Parquet Loading mode
1.SparkSession.read.parquet
SparkSession be located org.apache.spark.sql.SparkSession Under the class , In addition to supporting reading parquet Out of the column file ,SparkSession Also supports reading ORC Column storage file , You can refer to : Spark Read ORC FIle
val conf = new SparkConf()
.setAppName("ParquetInfo")
.setMaster("local")
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
spark.read.parquet(path).foreach(row => {
val head = row.getString(0)
println(head)
})After reading, a Sql.DataFrame, Support common sql Syntax operation , If you don't want to use it sql It can also be done through .rdd The way to get RDD[Row], Then traverse each partition Under the Iterator[Row] that will do .
Tips:
Can be executed later sql operation , Of course, initialization is also supported SqlContext call sql Method , But use SparkSession It can be done .
val parquetFileDF = spark.read.parquet("path")
parquetFileDF.createOrReplaceTempView("tableName")
val resultDf = spark.sql("SELECT * FROM tableName")
val sqlContext = new SQLContext(sc)
sqkContext.sql("xxx")2.SparkContext.HadoopFile
Use hadoopFile When reading, you need to specify the corresponding K-V as well as InputFormat The format of ,Parquet File corresponding K-V by Void-ArrayWritable, Its InputFormat by : org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, obtain ArrayWritable After that, you can get Writable.
val sc = spark.sparkContext
sc.setLogLevel("error")
val parquetInfo = sc.hadoopFile(path, classOf[MapredParquetInputFormat], classOf[Void], classOf[ArrayWritable])
parquetInfo.take(5).foreach(info => {
val writable = info._2.get()
val head = writable(0)
println(writable.length + "\t" + head)
})Tips:
Need to be in SparkConf Add serialized configuration to , otherwise hadoopFile The method will report an error :

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")writable The specific content can only be obtained through deserialization , So it's recommended to use SparkSession The official api Read , But you can RcFile SparkSession Direct reading is not supported for the time being , So it can be used sc.hadoopRdd Method to read the data stored in the same column RcFile Format file , You can refer to : Spark Read RcFile
3、 ... and .Parquet storage
1. Static conversion
Parquet -> Parquet, Read parquet Generate Sql.DataFrame Redeposit , similar RDD Of transform:
spark.read.parquet(path)
.write.mode(SaveMode.Overwrite)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.format("parquet")
.save("/split")2.RDD[T*] transformation
General data RDD You can do this by joining import sqlContext.implicits._ The way of implicit conversion is determined by RDD Convert to sql.Dataframe, Then complete parquet The storage , Here is a cover up PairRDD Convert to df And stored methods :
import sqlContext.implicits._
val commonStringRdd = sc.emptyRDD[(String, String)].toDF()
commonStringRdd.write
.mode(SaveMode.Overwrite)
.format("parquet")
.save("")Tips:
SaveModel It is divided into Append Additional 、Overwrite Cover 、ErrorIfExists Report errors 、Ignore Ignore the four modes , The first two are easy to understand , The latter two former means that an error will be reported if the address already exists , The latter ignores and does not affect the original data if the address already exists .SaveModel By enumeration Enum The way to achieve :

The details of the RDD transformation Sql.DataFrame You can refer to :Spark - RDD / ROW / sql.DataFrame Interturn .
3.RDD[Row] transformation
If there are generated RDD[Row] You can call it directly sqlContext Will be RDD Convert to DataFrame. here TABLE_SCHEMA It can be regarded as a description of each column of data , similar Hive Of column Information about , Mainly the field name and type , You can also add additional information ,sqlContext Match the corresponding column attributes with Row One by one , If Schema The length does not reach Row Total number of columns for , The subsequent fields can only be read as Null.
val sqlContext = new SQLContext(sc)
final val TABLE_SCHEME = StructType(Array(
StructField("A", StringType),
StructField("B", StringType),
StructField("C", StringType),
StructField("D", StringType),
StructField("E", StringType),
StructField("F", StringType),
StructField("G", StringType),
StructField("H", StringType)
))
val commonRowRdd = sc.emptyRDD[Row]
sqlContext.createDataFrame(commonRowRdd, TABLE_SCHEME)
.write.mode(SaveMode.Overwrite)
.format("parquet")
.save("/split")Tips:

Errors may be reported when using the above syntax : Illegal pattern component: XXX , This is because of the internal DataFormat The problem of resolution , Add... To the code .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") that will do .
spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").parquet(path)Four .Parquet elementary analysis
Parquet Because it's open source , Support multi platform and multi system and efficient storage and coding scheme , This makes it very suitable for task development in big data scenarios , Let's take a brief look at two of his characteristics , Columnar storage and metadata storage :
1. The column type storage - Smaller IO
CSV Is the most common line storage , For some scenarios that require separate features or columns , If it is CSV The file needs to traverse the entire line and split , Finally get the target element , and Parquet Mode through column storage , For individual features, you can directly access , Thus, the efficiency of execution is improved , Reduced data IO.
CSV: A,B,C,D,E -> Split(",")[col]
Parquet: A B C D E -> getString(col)2. Metadata Store - Higher compression ratio
Parquet Adopt multiple codes encoding The way , Ensure efficient data storage and low space
A.Run Length encoding
Run length encoding , When multiple columns of data in a row have a lot of duplicate data , Can pass "X repeated N Time " How to record , Reduce the cost of recording , although N It could be very big , But the cost of storage is small :
[1,2,1,1,1,1,2] -> 1-1,2-1,1-4,2-1
B.Dictionary encoding
Dictionary code , As the name suggests, it is through mapping , Save data with too many duplicates , for example "0" -> "LongString":
[LongString, LongString, LongString] -> [0, 0, 0]
C.Delta encoding
Incremental encoding , Apply to unix Time stamp , The timestamp record is 1970 year 1 month 1 The number of seconds in the day , The initial timestamp can be directly subtracted when storing the timestamp , Reduce storage , such as 1577808000 As a benchmark , You can reduce a lot of storage space :
[1577808000, 1577808004, 1577808008] -> [0, 4, 8]
3. Storage - Compression comparison
val st = System.currentTimeMillis()
val pairInfo = (0 to 1000000).zipWithIndex.toArray
val format = "csv" // csv、json、parquet
sc.parallelize(pairInfo).toDF("A", "B")
.write
.mode(SaveMode.Overwrite)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.save(s"./output/$format")
val saveType = "gzip" // text、gzip
sc.parallelize(pairInfo).saveAsTextFile(s"./output/$saveType", classOf[GzipCodec])
val cost = System.currentTimeMillis() - st
println(s" Time consuming : $cost")Use the above two methods to separate 0 To 1000000 The array of is saved to the corresponding file , Look at the size of the storage :

| type | Text | Gzip | Parquet | CSV | JSON |
| size (MB) | 15.8 | 4.6 | 8 | 13.8 | 23.8 |
Compared to tabular data CSV and JSON Storage ,parquet Provides a higher compression ratio ,Amazon S3 Clusters have been compared CSV And parquet The efficiency of , Use Parquet You can cut 87% Size , The speed of inquiry is fast 34 times At the same time, it can save 99.7 Cost of , Therefore, in the scenario of adding large amounts of data and often requiring individual column operations ,Parquet Very suitable .
4. Read - Efficiency comparison
Then read the above files separately :
val csv = spark.read.csv(path + "/output/csv").rdd.count()
val parquet = spark.read.parquet(path + "/output/parquet").rdd.count()
val json = spark.read.json(path + "/output/json").count()
val common = sc.textFile(path + "/output/common").count()
val gz = sc.textFile(path + "/output/gzip").count()| type | Text | Gzip | Parquet | CSV | JSON |
| Time consuming (ms) | 1417 | 1448 | 4952 | 6870 | 6766 |
comparison CSV,JSON There are advantages , But relative to the number of rows stored Text and Gzip, perform count The row statistics operation of class is obviously not the strength of column storage file , So there is a big difference , In case of big data, statistics on one or several fields ,Parquet It will provide higher performance than line storage files .
5.selectExpr

Read Parquet In addition to obtaining the original field contents in the file , It can also be done through selectExpr Action for more additional information , The method is located in org.apache.spark.sql.functions in , It contains collect_list Similar aggregation operations , Also contains count Similar statistical operations , also max、min、isnull wait .
spark.read.parquet(path).selectExpr("count('_c1')").rdd.foreach(row => {
println(row.getLong(0))
})The above operations are carried out through selectExpr Got count(_c1) Number of features ,count Result:5383.
among _c1 by Parquet Acquired sql.DataFrame Default schema, The default can be obtained by the following methods schema Information :
val schema = spark.read.parquet(path).schema
println(schema)
Here's part of it , The feature name is from _c0 Start accumulating in turn , The default is _c0,_c1 , If you define schema Of StructField , Use spark.read.schema().parqeut() Read it out sql.Dataframe Of selectExpr The column name used by the operation in the function should be changed to the name defined by itself , for example _c1 I define it as age, Then the above expression should be changed to count('age'), Reuse _c1 Will report a mistake . More detailed schema Operation can refer to :Parquet Appoint schema
5、 ... and . summary
Spark - Parquet That's what we usually use ,SparkSession Integrated reading parquet、orc Of API It's very convenient , If necessary, it is suggested to pass directly API Read instead of HadoopRdd / HadoopFile . Finally, I want to say parquet The naming of is really fun ,parquet Floor , Variable length column names are stored , If the plane display also has the feeling of floor .

边栏推荐
- 如何解决 Iterative 半监督训练 在 ASR 训练中难以落地的问题丨RTC Dev Meetup
- chrome页面录制,重放功能
- R language and machine learning
- High performance computing center roce overview
- Nepal graph learning Chapter 3_ Multithreading completes 6000w+ relational data migration
- xml 解析bean工具类
- Comparison of static methods and variables with instance methods and variables
- Conditional variables for thread synchronization
- Uni app swiper rotation chart (full screen / card)
- ASP. Net startup and running mechanism
猜你喜欢
![[collection of good books] from technology to products](/img/b1/a119d61ff20d19d1e42e5c243de42c.png)
[collection of good books] from technology to products

The stc-isp burning program for 51 single chip microcomputer always shows that "the target single chip microcomputer is being detected..." the cold start board does not respond

After a test of 25K bytes, I really saw the basic ceiling

EF core Basics

Dix critères de base importants pour les essais de débogage de logiciels

用eclipse连mysql数据库出错然后出现图中的话是咋回事呀

Spark - 一文搞懂 parquet

使用SOAPUI访问对应的esb工程

The style of the mall can also change a lot. DIY can learn about it

【QT】对话框dialog
随机推荐
Comparison of static methods and variables with instance methods and variables
[QT] resource file import
【Flink】Flink Sort-Shuffle写流程简析
Concept and implementation of QPS
(15)Blender源码分析之闪屏窗口显示菜单功能
DETR3D 多2d图片3D检测框架
1.基础关
Three level menu applet
What preparations should be made to develop an app from scratch
English version of ternary loss
After four years of outsourcing, people are directly abandoned...
線程同步之讀寫鎖
Intelligent manufacturing learning videos and books
JS to achieve the effect of text marquee
ABP framework
刷题记录Day01
MapReduce执行原理记录
Oracle technology sharing Oracle 19.14 upgrade 19.15
2020 summary: industrial software development under Internet thinking
What preparation should I make before learning SCM?