当前位置:网站首页>Spark Streaming checkpoint的问题与恢复
Spark Streaming checkpoint的问题与恢复
2022-06-22 15:23:00 【ZH519080】
/**虽然checkpoint是对Spark Streaming运行过程中的元数据和每次RDD的数据状态 * 保存到一个持久化系统中,实现高可用性。 * 即使 * /**当程序修改后打包成新程序后,可能会报错,若删除checkpoint的开头文件,只保留数据文件: * hadoop dfs -rmr /checkpoint/checkpoint* * 但是新程序虽然能重新启动,但是不会读取上次的数据文件,而是重新开始计算, * 这样仍然会丢失数据 * */ * 但checkpoint的弊端: * 若流式程序代码或配置改变,则先停掉旧的spark Streaming程序,然后把新的程序打包编译后重新执行, * 会造成两种情况: * 1、启动报错,反序列化异常 * 2、启动正常,但可能运行的代码是上一次的旧程序代码 * 为何有如此情况??? * 这是因为checkpoint第一次持久化时会把整个相关程序的jar包给序列化成一个二进制文件, * 每次重启都会从checkpoint目录中恢复,即使把新的程序打包序列化后加载的仍然是旧的序列化二进制文件, * 会导致报错或者依旧执行旧代码程序。 * 若直接把上次的checkpoint删除,当启动新的程序时,只能从kafka的smallest或largest(默认是最新的)的偏移量消费, * 若配置成smallest则会导致数据重复,若配置成largest则会导致数据丢失。 * 针对以上问题,有两种解决方案: * 1、旧程序不关闭,新程序启动,两个程序并存一段时间执行消费 * 2、在旧程序关闭时记录其偏移量,当新程序启动时可直接从偏移量出开始消费。 * * * 但是若不使用checkpoint功能,像类似upstatebykey等有状态的函数如何使用?????*/ /**启动预写日志机制 * 预写日志机制(Write Ahead Log,WAL),若启动该机制,Receiver接收到的所有数据都会 * 被写入配置的checkpoint目录中,driver恢复数据时避免数据丢失。 * 调用StreamingContext.checkpoint配置一个检查点目录,然后 * spark.streaming.receiver.writeAheadLog.enable设置为true*/ //**在Spark Streaming应用程序挂掉后,若重新编译Spark Streaming应用程序再运行,是不能从 //* 挂掉的位置恢复的,因为重新编译会导致不能回反序列化Checkpoint /**从Driver故障中重启并恢复应用的条件: * 1、若应用程序首次启动,将创建一个新的StreamingContext实例,设置一个目录保存Checkpoint数据 * 2、若从Driver失败中重启并恢复,则必须从Checkpoint目录中导入Checkpoint数据来 * 重新创建StreamingContext实例。 * * 上面两点可通过StreamingContext.getOrCreate方法实现*/
def exeStreamSV: Unit ={
/**这种方式不能从任意位置恢复*/
val ssc = StreamingContext.getOrCreate(checkpointDir,streamingSV _)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
val checkpointDir = "hdfs://zhumaster:8020/data/checkpoint"
def stateValues(values: Seq[Int], state: Option[Int]): Option[Int] ={
Some(values.size + state.getOrElse(0))
}
def streamingSV: StreamingContext ={
val sparkConf = new SparkConf().setAppName("checkpoint recover")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf,Duration(500))
ssc.checkpoint(checkpointDir)
ssc.socketTextStream("zhumaster",9999).flatMap(_.split(" ")).map((_,1))
.updateStateByKey(stateValues _)
.checkpoint(Duration(200))
.print()
ssc
}
边栏推荐
- 直播无顶流:董宇辉这么火,还有人看刘畊宏吗?
- Test for API
- SAP ABAP 中的模块化:宏、子程序和功能模块 -04
- NiO uses writable events to handle the situation of one-time write incompleteness
- Test for API
- 面试知识点
- 什么是RESTful,REST api设计时应该遵守什么样的规则?
- 机器学习笔记 - HaGRID—手势识别图像数据集简介
- SAP ABAP data dictionary tutorial se11: tables, locked objects, views, and structures-03
- SAP web service 无法使用 SOAMANAGER 登陆到SOA管理页面
猜你喜欢

SAP script tutorial: se71, se78, SCC1, vf03, so10-013

jsp學習之(二)---------jsp脚本元素和指令

如何为政企移动办公加上一道“安全锁”?

Summary of safari compatibility issues

SAP ABAP sub screen tutorial: call sub screen -010 in SAP
![[C language] deeply analyze the relationship between pointer and array](/img/f3/432eeee17034033361e05dde67aac3.jpg)
[C language] deeply analyze the relationship between pointer and array

Pod type

C语言贪吃蛇

SAP ABAP data dictionary tutorial se11: tables, locked objects, views, and structures-03

【微信小程序封装底部弹出框二】
随机推荐
面对默认导入失败的情况
JS method for judging data type of interview questions
SAP script tutorial: se71, se78, SCC1, vf03, so10-013
为什么要买增额终身寿险?增额终身寿险安全可靠吗?
Adding an unknown type of MCU to jflash
Bidirectional data binding V-model and v-decorator
In case of default import failure
异步IO的简单理解
Default function control =default and =delete
6.gui (graphics, filling)
学习量子纠缠的可解释表示,该深度生成模型可直接应用于其他物理系统
Safari兼容性问题总结
Summary of safari compatibility issues
Iterators and generators
Conversion between numeric types and strings
Jsp Learning (2) - - jsp script Elements and instructions
JS获取数据类型方法总结
5 modes of IO model
Implementation classes with similar execution logic use the template pattern
SAP ABAP report programming-08