当前位置:网站首页>pyspark学习笔记
pyspark学习笔记
2022-07-23 05:38:00 【我是女生,我不做程序媛】
spark框架:MapReduce

【注:reduce的个数不一定和key的个数相等,可能n个key对应m个reduce】
shuffle:处于一个宽依赖,可以实现类似混洗的功能,将相同的 Key 分发至同一个 Reducer上进行处理。
搭建sc环境:
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark dataframe
spark dataframe与pandas的dataframe不同,是两种不同的数据类型,具有不同的函数和使用方法。
- 建立spark dataframe: df=spark_session.sql(‘sql’)
- 将spark dataframe转化为二维列表: df.collect()
外层数组是每一行数据(Row),里层数组是一行中每一列(Column)的数据
spark rdd
rdd是spark中的基础数据单元,每个rdd被分为多个分区,可以包含Python、Java、Scala中任意类型的对象。
创建rdd
- 读取外部数据集
lines = sc.textFile(“README.md”) - 分发驱动器程序中的对象集合(如list、set)
lst=[(‘Alice’,1),(‘Bob’,2)]
rdd = sc.parallelize( lst )
3. 从spark dataframe转化
rdd = spark.createDataFrame( lst ,[‘name’,‘age’] ).rdd
rdd基本操作
转化操作transformation
转化操作不进行实际计算和存储,只是记录计算的步骤(即惰性计算):
① map:将函数作用在rdd的每个元素中,函数返回结果作为结果rdd的值。
nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x: x*x).collect()
② filter:将函数作用在每个rdd元素,返回符合函数条件的rdd
pythonlines = lines.filter(lambda line: “Python” in line)
③ flatMap:将函数作用在迭代器rdd上,将所有迭代器的返回值塞到同一个迭代器中返回
lines = sc.parallelize([”hello world“,“hi”])
words = lines.flatMap(lambda line: line.split(" "))
words.first() #返回“hello”行动操作action
行动操作进行实际计算,得出结果返回到驱动器程序中或者并存储到外部存储(如HDFS)中:
①reduce:接收两个同类型元素并将计算结果返回
sum = rdd.reduce(lambda x,y: x+y)
② pythonlines.first()
③ rdd.take(2) #取rdd的2个值,以列表的形式返回[(‘Alice’,1),(‘Bob’,2)]
④pythonlines.collect() #取全部值,会把整个rdd拉取到driver上,慎用,最好用.first(), take(n)等取代
⑤ pythonlines.count() #计数持久化
Spark rdd惰性求值,每次调用行动操作时都会将前面的依赖重新计算一边,为了避免重复计算,可以将rdd持久化。
result = nums.map(lambda x: x*x)
result.persist() # 可以选择持久化级别:MEMORY_ONLY,MEMORY_ONLY_SER,MEMORY_AND_DISK,MEMORY_AND_DISK_SER,DISK_ONLY
print(result.count())
print(result.first())
result.unpersist()遍历dataframe形式的rdd
rdd是不可iterable的类型,不可以用for循环遍历
①rdd_result = rdd.map(function)
②rdd_result = rdd.mapPartitions(function)
def function(iter):
from multiprocessing.pool import ThreadPool*
results = []
def f (r ):
nonlocal results
results += [ something ]
# 如果r是row类型,即Row(id=' ',set=[]),可以取r['id'],r['set']等
# 如果r是tuple类型,即(Row(id=' ',set=[ ]),Row(id=' ',set=[ ])),可以取r[0],r[1],r[0]['id'],r[0]['set']等
with ThreadPool(8) as p:
p.map(f, [r for r in iter])
# r是rdd的一行
return results
map和mapPartition的区别:
对于有3个元素,1个分区的rdd:
map是对rdd中的每一个元素进行操作,操作一次的结果作为一个Row,操作3次;
mapPartitions则是对rdd中的每个分区的迭代器进行操作,操作一次的结果为3个Row,操作1次。
上例中,map的返回结果为[Row(_1=324, _2=[‘a’, ‘b’, ‘c’]), Row(_1=100, _2=[‘e’, ‘g’]), Row(_1=555, _2=[‘a’, ‘e’, ‘m’])]
mapPartitions的返回结果为[Row(id=324, set=[‘a’, ‘b’, ‘c’]), Row(id=100, set=[‘e’, ‘g’]), Row(id=555, set=[‘a’, ‘e’, ‘m’])]
向spark传递函数
python
- lambda函数
word = rdd.filter(lambda s : “error” in s) - 定义局部函数
#自定义query函数,rdd作为参数传入
def function(iter):
pass
#将返回结果转化为df
df = rdd.mapPartitions(query).toDF()
注意1:若在rdd函数中传递了某个对象的成员,spark会把成员所在整个对象都序列化发到工作节点上,传递的东西会比想象中大得多,如:
rdd.filter(lambda x: self.query in x) #会把整个self都保存到局部变量
query=self.query()
rdd.filter(lambda x: query in x)
注意2:在function中定义的print语句不会输出到主机终端,会在其他工作节点输出,因此不要在调用函数内部输出调试。
Scala
java
边栏推荐
- 項目部署(簡版)
- check the manual that corresponds to your MySQL server version for the right syntax to use near ‘ord
- 6. Barycentric coordinate interpolation and graphics rendering pipeline
- Xssgame games (XSS learning) level1-15
- pygame实现飞机大战游戏
- 十年架构五年生活-01毕业之初
- Murata power maintenance switch server power maintenance and main functional features
- 3.Flask 中的线程
- 2. Analysis of the return value of the startup function
- 超级简单的人脸识别api 只需几行代码就可以实现人脸识别
猜你喜欢

开发必备之Idea使用

Huawei executives talk about the 35 year old crisis. How can programmers overcome the worry of age?

SPR:SUPERVISED PERSONALIZED RANKING BASED ON PRIOR KNOWLEDGE FOR RECOMMENDATION

An accident caused by MySQL misoperation, and "high availability" is not working well
![[Anaconda environmental management and package management]](/img/cc/9f15282eabf0eee5e9f28b4f62f1e2.png)
[Anaconda environmental management and package management]

JDBC的學習以及簡單封裝

Heidelberg CP2000 circuit board maintenance printer host controller operation and maintenance precautions

Master slave synchronization step read / write separation + self encountered error sharing

JDBC Learning and simple Encapsulation

【Pyradiomics】提取的影像组学特征值不正常(很多0和1)
随机推荐
分页、过滤
Cadence learning path (VIII) PCB placement components
Filter in MATLAB
Markdown common syntax records
CountDownLatch的用法
With only 5000 lines of code, AI renders 100 million landscape paintings on v853
使用cmd安装pygame
js中拼接字符串,注意传参,若为字符串,则需要加转义字符
【无标题】
使用聚类分析 构建信用卡高风险客户识别模型
USCD行人异常数据集使用指南 | 快速下载
DWI图像 从DICOM Tag识别 b value 的方法
JDBC数据库连接池
3DMAX first skin brush weights, then attach merge
达人专栏 | 还不会用 Apache Dolphinscheduler?大佬用时一个月写出的最全入门教程
【无标题】
【无标题】
TS type gymnastics intermediate type gymnastics challenge closing battle
Notifier Nordic fire engine power supply maintenance and daily maintenance
【Anaconda 环境管理与包管理】