当前位置:网站首页>Pyspark on HPC (Continued): reasonable partition processing and consolidated output of a single file

Pyspark on HPC (Continued): reasonable partition processing and consolidated output of a single file

2022-06-23 08:13:00 flavorfan

stay HPC Start task on to local Mode run custom spark, Free choice spark、python Version combination to process data ; Start multiple tasks to process independent partition data in parallel , As long as the processing resources are sufficient , The only thing that limits the speed is the disk io. Local cluster processing requires 2 Weekly data ,2 It'll be done in an hour .HPC Usually there is no database , further BI The presentation or processing needs to be pulled back to the local cluster , At this time, the data block ( Such as a day ) Save your data as tsv.gz Pull back to the local cluster .pyspark dataframe Provide write Of save Method , Can write tsv.gz,spark The default is parallel write , So it's offering outpath Write multiple files under the directory . This is the time , It is necessary to splice several in sequence tsv File and compressed to gz Format .

1. process_to_tsv_path

  from pyspark.sql import SparkSession
  def process_to_tsv_path(spark, in_file, out_csv_path,tasks=8):
    result = (
        spark.read.csv(in_file, sep="\t", quote=None, header=True)
        .repartition(tasks)
        .where(...)
        .select(...)
        .write.format("com.databricks.spark.csv").save(out_csv_path)
    )
    return result

repartition After reading the input file , And according to the file size and application cpu、MEM Proper setting of number ; This will out_csv_path Generate corresponding tasks individual csv file . If you put repartition Output after processing write Before , Then the previous processing has only one partition , Only one... Can be called cpu nucleus ( Corresponding to the number of input files ), Waste calculation . Do a comparative experiment , The author's data processing situation is probably different 5 times .

2. tsv_path_to_gz

  import glob, gzip
  def tsv_path_to_gz(out_csv_path, tar_file):
    interesting_files = sorted(glob.glob(f'{out_csv_path}/*.csv'))
    with gzip.open(tar_file, 'wb') as f_out:
        for file_name in interesting_files:
            with open(file_name, 'rb') as f_in:
                f_out.writelines(f_in)
原网站

版权声明
本文为[flavorfan]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/01/202201121641113369.html