当前位置:网站首页>Spark accumulators and broadcast variables

Spark accumulators and broadcast variables

2022-06-24 07:00:00 Angryshark_ one hundred and twenty-eight

accumulator

Accumulators are somewhat similar Redis The counter of , But it's more powerful than a counter , Not only can it be used to count , It can also be used to accumulate and sum 、 Accumulate and merge elements, etc .

Suppose we have a word.txt Text , We want to count the words in the text “sheep” The number of rows , We can read the text directly filter Filter and count .

sc.textFile("word.txt").filter(_.contains("sheep")).count()

Suppose we want to count the words in the text separately "sheep""wolf" The number of rows , If it needs to be calculated twice according to the above method

sc.textFile("word.txt").filter(_.contains("sheep")).count()
sc.textFile("word.txt").filter(_.contains("wolf")).count()

If you want to make statistics separately 100 Lines of words , Then calculate 100 Time

If an accumulator is used , You only need to read it once

val count1=sc.acccumlator(0)
val count2=sc.acccumlator(0)
...

def processLine(line:String):Unit{
    
   if(line.contains("sheep")){
       count1+=1
   }
   
   if(line.contains("wolf")){
       count2+=1
   }
   
   ...
}


sc.textFile("word.txt").foreach(processLine(_))

Not only Int Types can be accumulated ,Long、Double、Collection You can also add up , You can also customize , And this variable can be in Spark Of WebUI See the interface .

Be careful : Accumulator can only be Driver End definition and reading , Can't be in Executor End read .

Broadcast variables

Broadcast variables allow caching of a read-only variable on each machine (worker) above , Not every task (task) Save a backup . Using broadcast variables, a copy of a large data input set can be allocated to each node in a more efficient way .

Broadcast variables improve data sharing efficiency in two ways :

(1) Each node in the cluster ( Physical machines ) There is only one copy , The default closure is a copy of each task ;

(2) Broadcast transmission is through BT Download mode , That is to say P2P download , When there are many clusters , It can greatly improve the data transmission rate . After the broadcast variable is modified , No feedback to other nodes .

val list=sc.parallize(0 to 10)
val brdList=sc.broadcast(list)

sc.textFile("test.txt").filter(brdList.value.contains(_.toInt)).foreach(println)

When using , Attention should be paid to :

(1) Apply to Small variable distribution , For motion, there are dozens of M The variable of , Each task is sent once, which consumes memory , It's a waste of time

(2) Broadcast variables can only be driver End definition , stay Executor End read ,Executor Do not modify

原网站

版权声明
本文为[Angryshark_ one hundred and twenty-eight]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/175/202206240050384857.html