当前位置:网站首页>Accumulateur Spark et variables de diffusion

Accumulateur Spark et variables de diffusion

2022-06-24 07:00:00 Angryshark 128.

Accumulateur

L'accumulateur est un peu similaireRedisCompteur pour,Mais plus puissant que le compteur,Non seulement pour le comptage,Peut également être utilisé pour additionner、Ajouter des éléments de fusion, etc..

Supposons que nous ayons unword.txtTexte,Nous voulons compter les mots dans ce texte“sheep”Nombre de lignes pour,Nous pouvons lire le texte directementfilterFiltrer et compter.

sc.textFile("word.txt").filter(_.contains("sheep")).count()

Supposons que nous voulions compter les mots dans le texte séparément"sheep""wolf"Nombre de lignes pour,Si deux calculs sont nécessaires selon la méthode ci - dessus

sc.textFile("word.txt").filter(_.contains("sheep")).count()
sc.textFile("word.txt").filter(_.contains("wolf")).count()

Si vous comptez séparément100Nombre de lignes de mots,Calculer100Une fois

Si un accumulateur est utilisé,Il suffit de le lire une fois.

val count1=sc.acccumlator(0)
val count2=sc.acccumlator(0)
...

def processLine(line:String):Unit{
    
   if(line.contains("sheep")){
       count1+=1
   }
   
   if(line.contains("wolf")){
       count2+=1
   }
   
   ...
}


sc.textFile("word.txt").foreach(processLine(_))

Non seulementInt Les types peuvent s'additionner ,Long、Double、Collection On peut aussi cumuler , Vous pouvez également personnaliser , Et cette variable peut être trouvée dans SparkDeWebUIL'interface voit.

Attention!:L'accumulateur ne peut être utilisé qu'àDriver Définition de fin et lecture ,Ça ne peut pas êtreExecutorLecture finale.

Variables de diffusion

Les variables de diffusion permettent de mettre en cache une variable en lecture seule sur chaque machine (worker)Là - haut, Pas toutes les missions (task) Enregistrer une copie de sauvegarde . L'utilisation de variables de diffusion permet d'attribuer plus efficacement une copie d'un ensemble d'entrées volumineuses à chaque noeud .

.Les variables de diffusion améliorent l'efficacité du partage des données de deux façons :

(1)Chaque noeud du cluster(Machines physiques)Une seule copie, La fermeture par défaut est une copie par tâche ;

(2) La diffusion se fait par BT Mise en œuvre du mode de téléchargement ,C'est - à - direP2PTélécharger, Quand il y a beaucoup de grappes , Le taux de transmission des données peut être considérablement augmenté . Après modification des variables de diffusion , Pas de rétroaction vers d'autres noeuds .

val list=sc.parallize(0 to 10)
val brdList=sc.broadcast(list)

sc.textFile("test.txt").filter(brdList.value.contains(_.toInt)).foreach(println)

Lors de l'utilisation,Attention!:

(1)Pour Distribution des petites variables , Pour le mouvement, des dizaines MVariable de, Chaque tâche envoie une fois la mémoire consommée ,Et perdre du temps

(2)Les variables de diffusion ne peuvent être affichées que surdriverDéfinition finale,InExecutorLecture finale,ExecutorImpossible de modifier

原网站

版权声明
本文为[Angryshark 128.]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/175/202206240050384857.html