当前位置:网站首页>Spark - understand parquet

Spark - understand parquet

2022-06-26 03:59:00 BIT_ six hundred and sixty-six

One . quote

parquet Files are common in Spark、Hive、Streamin、MapReduce And other big data scenarios , The efficient data storage and retrieval are realized through column storage and metadata storage , So let's focus on  parquet The file in spark Storage in scenarios , Read and use the pit that may be encountered .

Two .Parquet Loading mode

1.SparkSession.read.parquet

SparkSession be located  org.apache.spark.sql.SparkSession Under the class , In addition to supporting reading parquet Out of the column file ,SparkSession Also supports reading ORC Column storage file , You can refer to : Spark Read ORC FIle

    val conf = new SparkConf()
      .setAppName("ParquetInfo")
      .setMaster("local")

    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()

    spark.read.parquet(path).foreach(row => {
      val head = row.getString(0)
      println(head)
    })

After reading, a Sql.DataFrame, Support common sql Syntax operation , If you don't want to use it sql It can also be done through .rdd The way to get RDD[Row], Then traverse each partition Under the Iterator[Row] that will do .

Tips:

Can be executed later sql operation , Of course, initialization is also supported SqlContext call sql Method , But use SparkSession It can be done .

    val parquetFileDF = spark.read.parquet("path")
    parquetFileDF.createOrReplaceTempView("tableName")
    val resultDf = spark.sql("SELECT * FROM tableName")

    val sqlContext = new SQLContext(sc)
    sqkContext.sql("xxx")

2.SparkContext.HadoopFile

Use hadoopFile When reading, you need to specify the corresponding K-V as well as InputFormat The format of ,Parquet  File corresponding K-V by Void-ArrayWritable, Its InputFormat by : org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, obtain ArrayWritable After that, you can get Writable.

    val sc = spark.sparkContext
    sc.setLogLevel("error")

    val parquetInfo = sc.hadoopFile(path, classOf[MapredParquetInputFormat], classOf[Void], classOf[ArrayWritable])
    parquetInfo.take(5).foreach(info => {
      val writable = info._2.get()
      val head = writable(0)
      println(writable.length + "\t" + head)
    })

 Tips:

Need to be in SparkConf Add serialized configuration to , otherwise hadoopFile The method will report an error :

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

writable The specific content can only be obtained through deserialization , So it's recommended to use SparkSession The official api Read , But you can RcFile SparkSession Direct reading is not supported for the time being , So it can be used sc.hadoopRdd Method to read the data stored in the same column RcFile Format file , You can refer to : Spark Read RcFile

3、 ... and .Parquet storage

1. Static conversion

Parquet -> Parquet, Read parquet Generate Sql.DataFrame Redeposit , similar RDD Of transform:

    spark.read.parquet(path)
      .write.mode(SaveMode.Overwrite)
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .format("parquet")
      .save("/split")

2.RDD[T*] transformation

General data RDD You can do this by joining  import sqlContext.implicits._ The way of implicit conversion is determined by RDD Convert to sql.Dataframe, Then complete parquet The storage , Here is a cover up PairRDD Convert to df And stored methods :

    import sqlContext.implicits._
    val commonStringRdd = sc.emptyRDD[(String, String)].toDF()
    commonStringRdd.write
      .mode(SaveMode.Overwrite)
      .format("parquet")
      .save("")

Tips:

SaveModel It is divided into Append Additional 、Overwrite Cover 、ErrorIfExists Report errors 、Ignore Ignore the four modes , The first two are easy to understand , The latter two former means that an error will be reported if the address already exists , The latter ignores and does not affect the original data if the address already exists .SaveModel By enumeration Enum The way to achieve :

The details of the RDD transformation Sql.DataFrame You can refer to :Spark - RDD / ROW / sql.DataFrame Interturn  .

3.RDD[Row] transformation

If there are generated RDD[Row] You can call it directly sqlContext Will be RDD Convert to DataFrame. here TABLE_SCHEMA It can be regarded as a description of each column of data , similar Hive Of column Information about , Mainly the field name and type , You can also add additional information ,sqlContext Match the corresponding column attributes with Row One by one , If Schema The length does not reach Row Total number of columns for , The subsequent fields can only be read as Null.

    val sqlContext = new SQLContext(sc)
    
    final val TABLE_SCHEME = StructType(Array(
      StructField("A", StringType),
      StructField("B", StringType),
      StructField("C", StringType),
      StructField("D", StringType),
      StructField("E", StringType),
      StructField("F", StringType),
      StructField("G", StringType),
      StructField("H", StringType)
    ))

    val commonRowRdd = sc.emptyRDD[Row]
    sqlContext.createDataFrame(commonRowRdd, TABLE_SCHEME)
      .write.mode(SaveMode.Overwrite)
      .format("parquet")
      .save("/split")

Tips:

Errors may be reported when using the above syntax : Illegal pattern component: XXX , This is because of the internal DataFormat The problem of resolution , Add... To the code .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") that will do .

spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").parquet(path)

Four .Parquet elementary analysis

Parquet Because it's open source , Support multi platform and multi system and efficient storage and coding scheme , This makes it very suitable for task development in big data scenarios , Let's take a brief look at two of his characteristics , Columnar storage and metadata storage :

1. The column type storage - Smaller IO

CSV Is the most common line storage , For some scenarios that require separate features or columns , If it is CSV The file needs to traverse the entire line and split , Finally get the target element , and Parquet Mode through column storage , For individual features, you can directly access , Thus, the efficiency of execution is improved , Reduced data IO.

CSV: A,B,C,D,E -> Split(",")[col]
Parquet: A B C D E -> getString(col)

2. Metadata Store - Higher compression ratio

Parquet Adopt multiple codes encoding The way , Ensure efficient data storage and low space

A.Run Length encoding

Run length encoding , When multiple columns of data in a row have a lot of duplicate data , Can pass "X repeated N Time " How to record , Reduce the cost of recording , although N It could be very big , But the cost of storage is small :

[1,2,1,1,1,1,2] -> 1-1,2-1,1-4,2-1

B.Dictionary encoding

Dictionary code , As the name suggests, it is through mapping , Save data with too many duplicates , for example "0" -> "LongString":

[LongString, LongString, LongString] -> [0, 0, 0]

C.Delta encoding

Incremental encoding , Apply to unix Time stamp , The timestamp record is 1970 year 1 month 1 The number of seconds in the day , The initial timestamp can be directly subtracted when storing the timestamp , Reduce storage , such as  1577808000 As a benchmark , You can reduce a lot of storage space :

[1577808000, 1577808004, 1577808008] -> [0, 4, 8]

 3. Storage - Compression comparison

    val st = System.currentTimeMillis()
    val pairInfo = (0 to 1000000).zipWithIndex.toArray
    val format = "csv" // csv、json、parquet
    sc.parallelize(pairInfo).toDF("A", "B")
      .write
      .mode(SaveMode.Overwrite)
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .save(s"./output/$format")

    val saveType = "gzip" // text、gzip
    sc.parallelize(pairInfo).saveAsTextFile(s"./output/$saveType", classOf[GzipCodec])
    val cost = System.currentTimeMillis() - st
    println(s" Time consuming : $cost")

Use the above two methods to separate 0 To 1000000 The array of is saved to the corresponding file , Look at the size of the storage :

type TextGzipParquetCSVJSON
size (MB)15.8 4.6813.823.8

Compared to tabular data CSV and JSON Storage ,parquet Provides a higher compression ratio ,Amazon S3 Clusters have been compared CSV And parquet The efficiency of , Use Parquet You can cut 87% Size , The speed of inquiry is fast 34 times At the same time, it can save 99.7 Cost of , Therefore, in the scenario of adding large amounts of data and often requiring individual column operations ,Parquet Very suitable .

4. Read - Efficiency comparison

Then read the above files separately :

    val csv = spark.read.csv(path + "/output/csv").rdd.count()
    val parquet = spark.read.parquet(path + "/output/parquet").rdd.count()
    val json = spark.read.json(path + "/output/json").count()
    val common = sc.textFile(path + "/output/common").count()
    val gz = sc.textFile(path + "/output/gzip").count()
type TextGzipParquetCSVJSON
Time consuming (ms)14171448495268706766

comparison CSV,JSON There are advantages , But relative to the number of rows stored Text and Gzip, perform count The row statistics operation of class is obviously not the strength of column storage file , So there is a big difference , In case of big data, statistics on one or several fields ,Parquet It will provide higher performance than line storage files . 

5.selectExpr

Read Parquet In addition to obtaining the original field contents in the file , It can also be done through selectExpr Action for more additional information , The method is located in  org.apache.spark.sql.functions in , It contains collect_list Similar aggregation operations , Also contains count Similar statistical operations , also max、min、isnull wait .

      spark.read.parquet(path).selectExpr("count('_c1')").rdd.foreach(row => {
        println(row.getLong(0))
      })

The above operations are carried out through selectExpr Got count(_c1) Number of features ,count Result:5383.

among _c1 by Parquet Acquired sql.DataFrame Default schema, The default can be obtained by the following methods schema Information :

      val schema = spark.read.parquet(path).schema
      println(schema)

Here's part of it , The feature name is from _c0 Start accumulating in turn , The default is _c0,_c1 , If you define schema Of StructField , Use spark.read.schema().parqeut() Read it out sql.Dataframe Of  selectExpr The column name used by the operation in the function should be changed to the name defined by itself , for example _c1 I define it as age, Then the above expression should be changed to count('age'), Reuse _c1 Will report a mistake . More detailed schema Operation can refer to :Parquet Appoint schema

5、 ... and . summary

Spark - Parquet That's what we usually use ,SparkSession Integrated reading parquet、orc Of API It's very convenient , If necessary, it is suggested to pass directly API Read instead of HadoopRdd / HadoopFile . Finally, I want to say parquet The naming of is really fun ,parquet Floor , Variable length column names are stored , If the plane display also has the feeling of floor .

原网站

版权声明
本文为[BIT_ six hundred and sixty-six]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206260352284546.html