当前位置:网站首页>Spark RDD Programming Guide(2.4.3)

Spark RDD Programming Guide(2.4.3)

2022-06-22 23:03:00 M_ O_

Original address :
https://spark.apache.org/docs/latest/rdd-programming-guide.html

Overview

From a high-level point of view , spark The application consists of a driver ( Run user's main function ) And executing various concurrent operations on the cluster .spark The main abstraction of is Elastic distributed data sets (RDD), It is a collection of elements across cluster nodes , Can be operated concurrently .RDD It's from Hadoop file system ( Or other Hadoop Supported file systems ) Or existing in the driver Scala Set created by , And transform it . Users can also make spark cache RDD To the memory , So that it can be reused efficiently during concurrent operations . Last , RDD It will automatically recover when the node fails .

spark The second abstraction in is that it can be used by concurrent operations Shared variables . By default , When spark When executing functions on different nodes in the form of task sets , It sends a copy of each variable required by the function to each task . occasionally , A variable needs to be shared between tasks , Or share between tasks and drivers .spark Two types of shared variables are supported : Broadcast variables , Can be used to cache values in memory on all nodes ; accumulator , They are only used for “ add to ” The variable of , Such as counters and summators .

Initializing Spark

The first thing to do , To create a JavaSparkContext object , It tells spark How to access a cluster . In order to create JavaSparkContext, You first need to create a SparkConf object .

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appName The parameter is the application name , It will be UI The interface shows .master Is the primary node address of the cluster , Use local Specify to execute locally . In fact, when running in a cluster , There's no need to hard code master Address , Instead, package the application , adopt spark-submit The command is passed to the cluster for execution . This parameter is usually set to use local Mode to test .

Resilient Distributed Datasets (RDDs)

Spark Around a ** Elastic distributed data sets (RDD)** To expand the concept of ,RDD Is a set of fault-tolerant elements that can be operated in parallel . establish RDD There are two ways : Parallelization Existing collection in the driver , Or reference datasets from external storage systems , For example, shared file systems 、HDFS、HBase Or provide Hadoop Any data source for input format .

Parallelized Collections

Parallel collections are created by calling on existing collections in the driver program javasParallelize Method to create . Copy the elements of the collection to form a distributed dataset that can be operated in parallel . for example , Here's how to create a containing number 1 To 5 Parallel collection of :

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

Once created , Distributed datasets (distdata) You can operate in parallel . for example , We can call distdata.reduce((a,b)->a+b) To accumulate the elements in the list . We will describe operations on distributed datasets later .

An important parameter of a parallel set is Partition number To cut the dataset into .spark A task will be run for each partition of the cluster . Usually , You need to create a cluster for each CPU Distribute 2-4 Zones . Usually ,spark The number of partitions will be automatically set according to the cluster . however , You can also pass it as the second parameter to parallelize( for example sc.parallelize(data,10)) To set it up manually . Be careful : The term is used in some parts of the code slices( A synonym for partition ) To maintain backward compatibility .

External Datasets

Spark It can be downloaded from Hadoop Any storage source supported for creating distributed datasets , Including local file system 、HDFS、Cassandra、HBase、Amazon S3 etc. .Spark Support for text files 、 Sequence files and any other Hadoop Input format .

have access to sparkContext Of text file Method to create a text file RDD. This method uses URI( Local path on the computer or hdfs:/s3a:/ etc. URI) access files , And read it as a row set . Here's an example call :

JavaRDD<String> distFile = sc.textFile("data.txt");

Once created ,distfile You can operate through dataset operations . for example , We can use map and reduce operation , Add the sizes of all rows , As shown below :distfile.map(s->s.length()).reduce((a,b)->a+b)

stay spark Some considerations for reading files in :

  • If the path is on the local file system , The file must also be accessible on the same path on the work node . Copy files to all staff or use a network shared file system .

  • Spark All file based input methods , Include ‘textfile’, Both are supported in the catalog 、 Run on compressed files and wildcards . for example , have access to textfile(“/my/directory”)textfile(“/my/directory/*.txt”) and textfile(“/my/directory/*.gz”)

  • textfile Method also accepts an optional second parameter , Used to control the number of partitions of a file . By default ,spark Create a partition for each block of the file (HDFS The default block in is 128MB), But you can also request more partitions by passing larger values . Please note that , Partition cannot be less than blocks .

In addition to text files ,spark Of Java API It also supports several other data formats :

  • For serialized files , Use SparkContext Of sequenceFile[K, V] Method , K and V It's in the file key and values The type of . They have to be Hadoop Of Writeable Subclass of interface , for example IntWriteable and Text.

  • For others Hadoop Input format , have access to JavaSparkContext.hadoopRDD Method , The method is arbitrary JobConf And input format class 、key Classes and value class . For... With input source Hadoop Homework , Use the same settings . When the input format is based on the new MapReduce API (org.apache.hadoop.mapreduce) when , You can also use  JavaSparkContext.newAPIHadoopRDD .

  • JavaRDD.saveAsObjectFile  and JavaSparkContext.objectFile Support for simple serialization java Object format save RDD. Although this is not as good as avro This special format is valid , But it provides a way to preserve any RDD The easy way to .

RDD Operations

RDDs There are two types of operations :

  • transformations: Create a new dataset using the original dataset
  • actions: After operating the data set, return the result to the driver (driver program)
    for example map It's a transformations operation , It converts each element in the original dataset into a new dataset .
    for example reduce It's a action operation , It aggregates all the elements in the dataset through a function , And return the final result to the driver .

be-all transformations The operation is inert (Lazy) Of , In other words, it will not be calculated immediately .spark Just record these operations , These operations are calculated when a action When returning results to the driver . This mechanism of delaying computation makes spark More efficient , such as map The operation is followed by a reduce operation , This will return a simple result to the driver , Instead of going back map Big data set after operation .

By default , Every transformation Operation at each run action Will be double counted , But you can put RDD Persist into memory ( Use persist perhaps cache Method ), such spark Will keep the elements in the cluster , So that the next visit will be faster .spark It also supports saving RDD To the hard disk or in multiple nodes .

Example

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

The first line defines a basic... Using an external file RDD, This data set is not loaded into memory , lines Variables only point to files .
The second line defines lineLengths As map Results of operation , It is a new data set , Of course not immediately calculated .
On the third line, we run reduce operation , It's a action, Now spark Start performing calculations on different machines , And summarize the data to the driver .

If we want to use again lineLengths, We can do it in reduce Before adding

lineLengths.persist(StorageLevel.MEMORY_ONLY());

This allows you to lineLengths Cache into memory , When it was first calculated .

Passing Functions to Spark

Transfer function to spark
spark Of api Very dependent on passing functions from the driver to the cluster . stay Java in , The function refers to the implementation of org.apache.spark.api.java.function The class of the interface in the package . There are two ways to create these classes :

  • Implement these interfaces in custom classes , Whether named or anonymous classes , Then pass an instance of the class to spark.
  • Use lambda Expression creation function
    Example :
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

Be careful , stay Java Anonymous classes in can access... In the scope final Variable .spark The copy of these variables will be copied to each work node .

Understand the scope

spark One of the more difficult things in is to understand the scope and life cycle of variables and methods , When executing code in a cluster . A common mistake is RDD Modify variables outside their scope . Here is a use foreach() To add one counter Example of the value of .

Consider the following summation code , According to whether they are in the same JVM On the implementation , The result will be different . A common example is , This code runs in local mode ’local’ And running in cluster mode ’cluster’ when , The results returned are different .

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

Local vs. cluster modes

The behavior of the above code is unknown , May not work according to your idea . To carry out jobs, spark take RDD The operation of is divided into tasks, Every task By a executor perform . Before implementation , spark Calculation task Of Closure (closure). Closure means executor stay RDD Variables and methods that must be seen when performing operations on ( In the example foreach()). This closure is serialized and sent to each executor.

The variables in the closure are copied to each executor, So it was ’foreach’ Function reference counter Variable , No longer point to... In the driver counter Variable . There is still one in the driver counter Variable , But it's important for executors It's invisible , executor Only the copied variables are visible . therefore , final Variable counter Still 0, Because all right counter The operations of are all references to the one serialized in the closure .

When running in local mode , In some cases , 'foreach’ Run on the same JVM in , At this time, the accessed variable is the original one , that counter Will be updated .

In order to correctly implement the above requirements , One way is to use Accumulator. spark Medium Accumulators Provides a mechanism for safely updating variables , When execution is sliced to cluster execution .

Usually , Closure - A structure similar to a loop or a local method , The global state should not be modified . spark There is no defined or defined modification behavior for objects outside the closure . Some code that does this may work properly when executed locally , But in a distributed environment, it fails . When global joint statistics is required , Please use accumulator.

Printing elements of an RDD

Print RDD The elements in
Another common way to write is to try to use ’rdd.foreach(println)' perhaps ’rdd.map(println)' To print RDD The elements in . But in cluster In mode , writes stdout The data in will be written to executor Of stdout In the pipeline , Not a driver stdout, So the printing of these elements can not be seen . If you need to print all the elements on the driver side , have access to collect() Method , This method can transform RDD The data in is transferred to the driver side , Like this rdd.collect().foreach(println), because collect() Will be able to RDD All the data in the are transmitted , It's too big , We only need to print a small part , have access to  rdd.take(100).foreach(println).

Working with Key-Value Pairs

Most of the spark Of RDD Operations can be of any type , Some of them are only used for key-value Right RDD On . One of the most common operations is shuffle, It is used according to key Polymeric elements .

stay java in , key-value Yes, it comes from Scala Standard library scala.Tuple2 Class implements the , You can simply call new Tuple2(a,b) To create a key value pair , And then use tuple._1() and tuple._2() To visit them .

key-value Of RDDs It's using JavaPairRDD Class implements the . You can use certain types of map operation ( image mapToPair and flatMapToPair) from JavaRDDs establish JavaPairRDDs.JavaPairRDD Have universal RDD Operation and specific key-value operation .

for instance , The following code uses reduceByKey Operation to calculate the number of occurrences of each line in the file :

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

We can also use  counts.sortByKey(), For example, sort key value pairs in dictionary order , And then use counts.collect() Send the data back to the driver side .

Be careful : When using custom objects in key value pairs , Make sure equals() Functions and hashCode() The functions are consistent .

Transformations

The following table lists some of the common transformations supported by Spark.
The following table is spark Commonly supported transformations operation

TransformationMeaning
map(func) adopt func Transform each element , Returns a new distributed data set
filter(func) adopt func Test each element , Will result in true Put the element of into the new data set and return
flatMap(func) Be similar to map operation , The difference is that each element can be mapped to 0~N Output results ( therefore func It returns a sequence, not a single element )
mapPartitions(func) Be similar to map operation , But at run time, it is based on RDD Each partition executes , Transfer all the data in the partition to at one time func in , therefore func You need to use this type Iterator => Iterator, T It's the original type , U It's a new type , If there's not enough memory , This operation is possible OOM
mapPartitionsWithIndex(func) Be similar to mapPartitions operation , but func An integer parameter is added to represent the index value of the partition , therefore func You need to use this type (Int, Iterator) => Iterator
sample(withReplacementfractionseed) Sample the data ,withReplacement Indicates whether an element can participate in sampling repeatedly , fraction Is the adoption rate , The value is [0,1], seed It's a random seed
union(otherDataset) Returns the union of two data sets
intersection(otherDataset) Returns the intersection of two data sets
distinct([numPartitions])) Returns the data after the data set is de duplicated , numPartitions Specify the number of tasks
groupByKey([numPartitions]) Aggregate data sets for key values (K,V)=>(K,Interable). Note if you're trying to do something for everyone key Find the aggregation result ( For example, sum or average ), Use reduceByKey  or  aggregateByKey It will be more efficient . By default , Concurrency level depends on RDD The number of partitions , But it can go through numPartitions Appoint
reduceByKey(func, [numPartitions]) Press... On the health value pair Key polymerization (K,V)=>(K,U), U Yes for each. key The value of is passed func The result of aggregation , here U The type and V Of the same type
aggregateByKey(zeroValue)(seqOpcombOp, [numPartitions]) Press the key value pair Key Aggregate (K,V)=>(K,U), here U The type and V The types of can be different , zeroValue Is the initial value , seqOp yes (U,V)=>U Transformation of , combOp yes (U,U)=>U Transformation of
sortByKey([ascending], [numPartitions]) Press Key Sort ,ascending Specifies whether to sort in ascending order
join(otherDataset, [numPartitions]) Merged data set (K, V) and (K, W) => (K, (V, W)) , Allied join Operation has leftOuterJoinrightOuterJoin and  fullOuterJoin.
cogroup(otherDataset, [numPartitions]) Merge datasets and press Key polymerization (K, V) , (K, W) => (K, (Iterable, Iterable)), This operation can also be done with groupWith.
cartesian(otherDataset) Merge data sets as Cartesian Products T and U, Returns a key value pair dataset (T, U).
pipe(command[envVars]) Yes RDD Each partition executes shell Order or Perl Script , RDD The elements in are from stdin Input , After the script is executed, it passes stdout As strings Output to RDD, And back to
coalesce(numPartitions) Reduce RDD The number of partitions to numPartitions, Usually in filter The operation is more effective after large data sets
repartition(numPartitions) take RDD Repartition of data in , To balance the load
repartitionAndSortWithinPartitions(partitioner) take RDD Repartition and sorting of data in

Actions

The following list shows spark Commonly used action.

ActionMeaning
reduce(func) The result of aggregating elements in a dataset , adopt func function ( Combine the two elements into one ), func To meet the exchange rate and combination rate
collect() Returns all elements in the data set to the driver
count() Returns the total number of elements in the dataset
first() Returns the first element in the dataset , Equivalent to take(1)
take(n) Before returning to the dataset n Elements
takeSample(withReplacementnum, [seed]) Return to the data set after sampling num Elements
takeOrdered(n[ordering]) Before fetching data set n Elements , Sort them and return
saveAsTextFile(path) Write the elements in the dataset to a text file , This text file can be stored on the local file system ,HDFS Or other Hadoop Supported file systems , spark Is the calling element toString Method to convert an element to text, And put it on a line in the file
saveAsSequenceFile(path) Save the elements in the dataset to the local file system /HDFS Or other support Hadoop File system directory , As a Hadoop SequenceFile, This operation is only implemented for Hadoop Of Writeable The key value of the interface is valid for the dataset ( Or key value pairs are basic data types , spark Implicit conversion is implemented by default )
saveAsObjectFile(path) Save elements to a file serially , Can pass SparkContext.objectFile() Load the data back
countByKey() For key value pairs only RDD It works , Press Key Number of calculated values , return hashmap<K,Int>
foreach(func) Execution of each element func operation , Usually used to update a Accumulator, No return value

spark RDD API Some operations of have asynchronous versions , image foreach The asynchronous version of foreachAsync, Immediately return to a FutureAction To the caller , Instead of blocking here .

Shuffle operations

some spark Will be triggered shuffle event , shuffle yes spark A mechanism of , Used to reassign data by group to different partitions . This usually affects executor And the machine , This makes shuffle Is a complex and time-consuming operation .

Background

To understand shuffle What happened at this stage , Let's take a look at one reduceByKey Examples of operations . reduceByKey Operation produces a new RDD, One key And all of it value Merged into one tuple, value It's through func To merge . The difficulty here is , key All of the value Not necessarily in the same partition , Not even the same machine , But they must be in the same place to calculate the result .

stay spark in , Data is usually not distributed between partitions , Instead, they are distributed where a particular operation requires . When calculating , A single task is executed on a single partition – therefore , To organize reduceByKey Data required for a single task , spark Need to execute a all-to-all The operation of . This operation needs to read all partitions to find all keys All of the value, Then put the corresponding key All of the value Put it together to calculate the final result , This is it. shuffle.

although shuffle The elements in each partition of the generated data are determined , The order of partitions itself is also determined , But the order of the elements is not . If you want to suffle The order of the following elements is determined , Then you can use :

  • Use mapPartitions To sort the data of each partition , for example .sorted
  • Use repartitionAndSortWithinPartitions Efficient sorting while repartition .
  • Use sortBy Create globally ordered RDD

Trigger shuffle The operation of the event includes :

  • repartition operation , Such as repartition
  • ByKey operation , Such as groupByKey, reduceByKey
  • join operation , Such as cogroup, join

Performance Impact

Shuffle It's a very expensive operation , Because it affects the disk IO, Data serialization , The Internet IO. In order to organize data shuffle, spark Produce a lot of map Task to organize data , There are also many reduce Task to aggregate data . The name comes from MapReduce, Not directly and directly spark Of map,reduce Operation related .

A separate map The results of the task are kept in memory , Until there is not enough memory . then , Sort them according to the target partition , Write to a single file . stay reduce End , The task reads the related sorted data blocks .

some shuffle The operation will significantly consume heap memory , Because they use memory data structures to organize data before and after data transmission . say concretely ,  reduceByKey  and  aggregateByKey stay map End create their structure , and ByKey Operation in reduce End create structure . When the memory cannot meet the data size , spark Will write data to disk , This leads to additional disks IO And garbage collection operations .

Shuffle A large number of temporary files will be generated on the disk . from spark1.3 Start , These files will be retained , Until the corresponding RDD When it is recycled because it is no longer used . To do so , For recalculation , No need to recreate shuffle file . If you keep these RDD A reference to or GC Not often , Garbage collection may take a long time . This means running for a long time spark Tasks consume a lot of disk space . This temporary storage directory consists of spark.local.dir Configuration parameter assignment .

You can use a variety of configuration parameters to tune Shuffle act . see Spark Configuration Guide.

RDD Persistence

spark One of the most important features , Between multiple operations , Set data to persistence ( cache ) To the memory . When you persist a RDD when , Each node stores partition data that they compute in memory , And access these datasets elsewhere ( Or their derived datasets ) Reuse this data on the operation of . This can make the following operations faster ( Usually 10x Times the speed ). Caching is a key tool for iterative algorithms and fast interactive use .

have access to persist() perhaps cache() Method to cache RDD. When RDD When it was first calculated , It will be saved to the memory of the nodes .spark The cache is fault tolerant – If RDD Any partitions of are missing , It will automatically use the original transformations Work it out .

in addition , Every RDD You can use different Storage level To persist , such as , Persistent datasets to disk , Persist to memory , Copy to other nodes . These levels are achieved by passing parameters StorageLevel Object to persist() Realized .cache The method is to use the default storage level (StorageLevel.MEMORY_ONLY) A shortcut to . Here are all the storage levels :

Storage LevelMeaning
MEMORY_ONLY Default level , Reverse the sequence of RDD Stored in JVM in . If there's not enough memory , Some partitions will not be cached , It will be recalculated when necessary
MEMORY_AND_DISK Reverse the sequence of RDD Stored in JVM in . If there's not enough memory , Store partition on disk , Read it from the disk when necessary
MEMORY_ONLY_SER Serialized RDD Store in JVM in , This usually saves memory space , But the reading time will be slower , And consume some CPU
MEMORY_AND_DISK_SER Be similar to MEMORY_ONLY_SER, But if there is not enough memory , More data will be saved to disk
DISK_ONLY Save partition to disk
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as above , But the data will be copied to two cluster nodes
OFF_HEAP (experimental) Be similar to MEMORY_ONLY_SER, But the data is saved to off-heap In the memory

spark It will also automatically save some temporary data , Doing it shuffle When ( Such as reduceByKey), Even if the user does not call persist. This is done to avoid being shuffle When the operation node fails, the entire input must be recalculated . It is still recommended to reuse in the plan RDD when , Manual call persist.

Which Storage Level to Choose

spark The storage level is designed to improve memory utilization and CPU There are different tradeoffs between efficiency . We recommend using the following process to select :

  • If your RDD Suitable for the default storage level (MEMORY_ONLY), Then don't modify it . This is the most promising CPU Efficiency options , Give Way RDD The operation on is the fastest .
  • If RDD Not suitable for , Try to use MEMORY_ONLY_SER, And choose a quick serialization Library , This usually saves space , It's also faster .
  • Generally, do not overflow data to the disk , Unless the calculated data is very time-consuming , Or the data is filtered from a very large data set . generally speaking , Recalculating data is about as fast as reading from disk .
  • If you need fast error recovery ( For example, using spark To serve web application ), You can use replication data levels . All storage levels are fault tolerant ( By recalculating ), However, the replication level can continue to perform tasks without waiting for recalculation .

Removing Data

spark Automatically monitor cache usage on each node , And use LRU( Recently at least use ) Policy removes old data partitions . If you want to delete the cache manually , Use RDD.unpersist() Method .

Shared Variables

Usually , Pass a function to spark operation ( image map or reduce) Is executed on the remote cluster node , It works on separate copies of all variables used by the function . These variables are copied to each machine , And no updates will be sent back to the driver from the remote machine . It is inefficient to support the sharing of common read and write variables between tasks . however ,spark Indeed, two limited types of shared variables are provided for two common usage patterns : broadcast variables and accumulators.

Broadcast Variables

Broadcast variables allow developers to keep a read-only variable on each machine , Instead of making a copy and sending it to each task . for example , They can effectively provide a large data set for each node .spark We also try to use more efficient broadcast algorithm to allocate broadcast variables , To save communication costs .

spark actions Is executed by "shuffle" The operation is divided into several stages .spark Automatically broadcast the shared data required by each stage task . The broadcast data is cached in a serialized manner , And deserialize between each task run . It means , Show that creating broadcast variables is only possible if the task uses the same data across multiple phases , Or when it is important to store data in deserialization , It works .

Using variables v Create a broadcast variable call SparkContext.broadcast(v). Broadcast variables are wrapped in v Inside , You can call value Method to access its value , The code is as follows :

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]

Once the broadcast variable is created , Any function running on the cluster should use this variable instead of v, To avoid v It is transmitted to the same node many times . in addition , object v It should not be modified after broadcasting , To ensure that all nodes get the same value ( For example, variables are later transferred to the new node ).

Accumulators

An accumulator is a variable , Increase through correlation and exchange operations , Therefore, it can efficiently support concurrency . They are used to implement counters ( For example MapReduce in ) Or sum .spark It supports digital type accumulators , At the same time, developers can also support new types by themselves .

As the user , You can create named or anonymous accumulators . Like the picture below , A command accumulator ( This is called counter) Will be in web The page shows the modification of values in different stages . spark stay “tasks” The table shows the value of each accumulator modified by the task .

Accumulators in the Spark UI
stay UI The upper trace accumulator is very useful for understanding the runtime ( Pay attention to Python I don't support ).

Create a numeric accumulator , You can call SparkContext.longAccumulator() perhaps SparkContext.doubleAccumulator(), To add up Long perhaps Double Cumulative value of type . Then tasks running on the cluster can call add Method to increase it . However , They cannot read the value of the accumulator . Only the driver can read the value of the accumulator , Use value Method .

The following code shows an accumulator used to accumulate all the elements in the array

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

The above code uses the built-in accumulator type Long, Developers can inherit AccumulatorV2 Class to create your own type . virtual base class AccumulatorV2 There are several methods that need to be rewritten : reset Used to clear the value of the accumulator to 0, add Used to accumulate another value to the accumulator , merge Used to combine the values of accumulators of the same type into one . Other methods that need to be covered are API In the document API documentation. for instance , We have a class MyVector Realized data vector , We can write this way :

class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

  private MyVector myVector = MyVector.createZeroVector();

  public void reset() {
    myVector.reset();
  }

  public void add(MyVector v) {
    myVector.add(v);
  }
  ...
}

// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");

Be careful , When developers define their own AccumulatorV2 Type , The generated type may not be the same as the added element type .

For only action Accumulator updated during operation , spark Ensure that the accumulation is performed only once , That is, the restart task does not update the value of the accumulator . stay transformation In operation , Users need to be clear about , If the task or work phase is re executed , Updates for each task will be applied multiple times .

The accumulator does not change spark The inert model of . If the accumulator is RDD Updated in the operation of , Their values are only in RDD Updated once when calculated . therefore , In image map() And so on , Updates to accumulators are not performed . The following code demonstrates this feature :

LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
原网站

版权声明
本文为[M_ O_]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206222045409743.html