当前位置:网站首页>Scala110-combineByKey
Scala110-combineByKey
2022-07-25 15:07:00 【Weigtang 406 team】
Intro
combineByKey be applied to Key-Value data , Used to calculate a key Some indicators of . Look directly at specific examples , Know how to use ~
Generate the data
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{
DataFrame, Row, SparkSession}
import scala.collection.mutable.{
ListBuffer, ArrayBuffer}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.{ListBuffer, ArrayBuffer}
val builder = SparkSession
.builder()
.appName("learningScala")
.config("spark.executor.heartbeatInterval","60s")
.config("spark.network.timeout","120s")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryoserializer.buffer.max","512m")
.config("spark.dynamicAllocation.enabled", false)
.config("spark.sql.inMemoryColumnarStorage.compressed", true)
.config("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
.config("spark.sql.broadcastTimeout", 600)
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.crossJoin.enabled", true)
.master("local[*]")
val spark = builder.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
builder: org.apache.spark.sql.SparkSession.Builder = [email protected]
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.S[email protected]
import spark.implicits._
val df = Seq(
(1, "male", "18" ,"2019-01-01 11:45:50"),
(2, "female", "37" ,"2019-01-02 11:55:50"),
(3, "male", "21" ,"2019-01-21 11:45:50"),
(4, "female", "44" ,"2019-02-01 12:45:50"),
(5, "male", "40" ,"2019-01-15 10:40:50")
).toDF("id","sex","age", "createtime_str")
df: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
df.show(truncate=false)
+---+------+---+-------------------+
|id |sex |age|createtime_str |
+---+------+---+-------------------+
|1 |male |18 |2019-01-01 11:45:50|
|2 |female|37 |2019-01-02 11:55:50|
|3 |male |21 |2019-01-21 11:45:50|
|4 |female|44 |2019-02-01 12:45:50|
|5 |male |40 |2019-01-15 10:40:50|
+---+------+---+-------------------+
df.rdd.map(r => {
val sex = r.getAs[String]("sex")
val age = r.getAs[String]("age")
(sex,age.toInt)
}).foreach(println)
(male,18)
(female,37)
(male,21)
(female,44)
(male,40)
adopt map constitute k-v Format data
combineByKey
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
The average age of different sexes
Look at an example : Calculate the average age of users of different genders
df.rdd.map(r => {
(r.getAs[String]("sex"),r.getAs[String]("age").toInt)}).combineByKey(
(v) => (v.toInt, 1),// initialization value, If it's the first time , Just assign a value of 1
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),// The same division , The same key How to deal with
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)// Different zones & The same key How to deal with
).foreach(println)
(female,(81,2))
(male,(79,3))
- Step 1 above , This is the first time k-v when , Generate (value,1), Write it down as acc
- The second step , The same partition is encountered again key when , perform mergeValue operation , namely acc._1+value,acc._2+1, As the cumulative frequency
- The third step , Different zones , The same key perform mergeCombiners operation , Value addition , Cumulative frequency also wants to add
- Finally, the intermediate result value is obtained , This value is equivalent to that we have divisor and dividend , One more step map, Get the final result
df.rdd.map(r => {
(r.getAs[String]("sex"),r.getAs[String]("age").toInt)}).combineByKey(
(v) => (v.toInt, 1),// initialization value, If it's the first time , Just assign a value of 1
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),// The same division , The same key How to deal with
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)// Different zones & The same key How to deal with
).map(
k=>{
(k._1,k._2._1.toFloat/k._2._2)}
).foreach(println)
(female,40.5)
(male,26.333334)
Age sets of different genders
df.rdd.map(r => {
(r.getAs[String]("sex")
,r.getAs[String]("age").toInt)})
.combineByKey(
(v) => ArrayBuffer(v),// initialization value, If it's the first time , Generate ArrayBuffer
(acc: ArrayBuffer[Int],v) => acc+=v,// The same division , The same key, Add as element to ArrayBuffer in
(acc1: ArrayBuffer[Int], acc2: ArrayBuffer[Int]) => acc1++=acc2// Different zones & The same key Merge ArrayBuffer
).foreach(println)
(female,ArrayBuffer(37, 44))
(male,ArrayBuffer(18, 21, 40))
It is similar to finding the mean value in another step , Or superimpose other calculations
df.rdd.map(r => {
(r.getAs[String]("sex")
,r.getAs[String]("age").toInt)})
.combineByKey(
(v) => ArrayBuffer(v),// initialization value, If it's the first time , Generate ArrayBuffer
(acc: ArrayBuffer[Int],v) => acc+=v,// The same division , The same key, Add as element to ArrayBuffer in
(acc1: ArrayBuffer[Int], acc2: ArrayBuffer[Int]) => acc1++=acc2// Different zones & The same key Merge ArrayBuffer
).map(k=>(k._1,k._2.sum.toFloat/k._2.length,k._2.max)).foreach(println)
(female,40.5,44)
(male,26.333334,40)
df.rdd.map(r => {
(r.getAs[String]("sex")
,r.getAs[String]("age").toInt)})
.combineByKey(
(v) => ArrayBuffer(v),// initialization value, If it's the first time , Generate ArrayBuffer
(acc: ArrayBuffer[Int],v) => acc+=v,// The same division , The same key, Add as element to ArrayBuffer in
(acc1: ArrayBuffer[Int], acc2: ArrayBuffer[Int]) => acc1++=acc2// Different zones & The same key Merge ArrayBuffer
).map(k=>(k._1,k._2.sum.toFloat/k._2.length,k._2.max)).toDF("sex","avgAge","MaxAge").show(truncate=false)
+------+---------+------+
|sex |avgAge |MaxAge|
+------+---------+------+
|male |26.333334|40 |
|female|40.5 |44 |
+------+---------+------+
2021-03-11 Jiulong lake, Jiangning District, Nanjing
边栏推荐
- LeetCode第 303 场周赛
- Live classroom system 05 background management system
- Reprint ---- how to read the code?
- Deng Qinglin, a technical expert of Alibaba cloud: Best Practices for disaster recovery and remote multi activity across availability zones on cloud
- Implement a simple restful API server
- [thread knowledge points] - spin lock
- [C topic] Li Kou 88. merge two ordered arrays
- Splice a field of the list set into a single string
- 转载----如何阅读代码?
- oracle_12505错误解决方法
猜你喜欢

Splice a field of the list set into a single string

Melody + realsense d435i configuration and error resolution

AS查看依赖关系和排除依赖关系的办法

I hope some suggestions on SQL optimization can help you who are tortured by SQL like me

"How to use" decorator mode

Deng Qinglin, a technical expert of Alibaba cloud: Best Practices for disaster recovery and remote multi activity across availability zones on cloud

44 新浪导航 ,小米边栏 练习

图片裁剪cropper 示例

45padding won't open the box

41 图片背景综合-五彩导航图
随机推荐
oracle_ 12505 error resolution
【微信小程序】小程序宿主环境详解
39 简洁版小米侧边栏练习
Content type corresponding to office file
(original) customize a scrolling recyclerview
Qt connect 中, SIGNAL,SLOT 与 lambda 对比
As methods for viewing and excluding dependencies
45padding不会撑开盒子的情况
Log4j2 basic configuration
I hope some suggestions on SQL optimization can help you who are tortured by SQL like me
js URLEncode函数
27 选择器的分类
37 元素模式(行内元素,块元素,行内块元素)
MySQL sort
[C topic] the penultimate node in the Niuke linked list
pl/sql 创建并执行oralce存储过程,并返回结果集
Scala110-combineByKey
"Ask every day" reentrantlock locks and unlocks
[C题目]力扣206. 反转链表
L1 and L2 regularization