当前位置:网站首页>为提高效率使用ParallelStream竟出现各种问题
为提高效率使用ParallelStream竟出现各种问题
2022-07-25 13:05:00 【邂逅于晚风】
一、前言
小编最近新接手一个任务需要对千万级数据量访问第三方接口可想而知对性能要求极高,当时我第一反应就是请求第三方接口时使用ParallelStream对数据进行操作这不就解决了吗。想象是美好的可结果直接将我按在地上摩擦..........
二、场景
我们先看代码
List<Business> getinfo = mapper.getinfo(createDate.toString("yyy-MM-dd"));
List<Business> infos = new ArrayList<>();
gdispx_data.parallelStream().forEach(o -> {
//数据处理
infos.add(o);
});
看似没有问题,执行发现结果集元素个数不等于期望元素个数,且其中存在 null 元素,而且有几率出现数组下标越界错误。
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:189)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.IntPipeline.forEach(IntPipeline.java:404)
at jit.wxs.disruptor.stream.StreamTest.main(StreamTest.java:15)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 15
at java.util.ArrayList.add(ArrayList.java:463)
at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205)
at java.util.stream.IntPipeline$3$1.accept(IntPipeline.java:233)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.execLocalTasks(ForkJoinPool.java:1040)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1058)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
三、分析
问题原因也很简单,Parallel Stream 其内部采用 ForkJoinPool 线程池进行执行,也就是说存在线程安全问题,而 ArrayList 是线程不安全的。下面依次来分析产生各种异常情况的原因。
3.1 元素数量丢失
// java.util.ArrayList#add(E)
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
导致数组下标越界的原因是 ArrayList 的 add() 方法中的 elementData[size++] = e,这行代码不是原子操作,可以拆解为:
读取 size 值
将 e 添加到 size 的位置,即 elementData[size] = e
size++
这里存在内存可见性问题,当线程 A 从内存读取 size 后,设置 e 值,将 size 加 1,然后写入内存。过程中可能有线程 B 也修改了 size 并写入内存,那么线程 A 写入内存的值就会丢失线程 B 的更新。这解释了会出现数组长度比原始数组要小(元素丢失)的情况。
3.2 null 元素
null 元素产生跟元素数据丢失类似,也是由于 elementData[size++] = e 不是原子操作导致的。假设存在三个线程,线程 1、线程 2、线程 3。三个线程同时开始执行,初始 size 值为 1。
线程 1 全部执行完毕,此时 size 被更新为 2。
线程 2 一开始读取 size 值 = 1、将 e 添加到 size 位置后时间片就用完了,轮到执行第三步 size++ 读取到了线程 1 的更新,size 直接被更新成了 3。【注:此处线程 2 的 e 值也丢失了,被线程 1 覆盖】
线程3 一开始读取 size 值 = 1 后时间片就用完了,轮到执行第二步将 e 添加到 size 位置读取到了线程 2 的更新,size 变成了 3。size = 2 的位置就被跳过了,因此 elementData[2] 为 null 了。
3.3 数组下标越界
数组越界异常则主要发生在数组扩容前的临界点。假设当前数组刚好只能添加一个元素,两个线程同时准备执行ensureCapacityInternal(size + 1),同时读取的 size 值,加 1 后进入ensureCapacityInternal都不会导致扩容。
退出 ensureCapacityInternal 后,两个线程同时执行 elementData[size] = e,线程 B 的 size++ 先完成,假设此刻线程 A 读取到了线程 B 的更新,线程 A 再执行 size++,此时 size 的实际值就会大于数组的容量,这样就会发生数组越界异常。
四、解决
解决问题也很简单,分两种,一种是把结果集合变成线程安全的即可。
List<Integer> list = new CopyOnWriteArrayList<>();
// or
List<Integer> list = Collections.synchronizedList(new ArrayList<>());五、补充
在使用并行流的时候是无法保证元素的顺序的,也就是即使你用了同步集合也只能保证元素都正确但无法保证其中的顺序,这个在Oracle官方文档也有说明,不要使用有副作用的lambda表达式,那么什么是有副作用的lambda表达式呢?官方文档说https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html就是说这种lambda表达式的结果会在管道执行的过程中发生变化
边栏推荐
- Introduction to web security UDP testing and defense
- CONDA common commands: install, update, create, activate, close, view, uninstall, delete, clean, rename, change source, problem
- 0717RHCSA
- 需求规格说明书模板
- Shell common script: check whether a domain name and IP address are connected
- 卷积神经网络模型之——GoogLeNet网络结构与代码实现
- 【视频】马尔可夫链蒙特卡罗方法MCMC原理与R语言实现|数据分享
- Substance Designer 2021软件安装包下载及安装教程
- 6W+字记录实验全过程 | 探索Alluxio经济化数据存储策略
- [机器学习] 实验笔记 – 表情识别(emotion recognition)
猜你喜欢
详解浮点数的精度问题

Mid 2022 review | latest progress of large model technology Lanzhou Technology

【GCN-RS】Learning Explicit User Interest Boundary for Recommendation (WWW‘22)

Shell常用脚本:获取网卡IP地址
![[Video] visual interpretation of Markov chain principle and Mrs example of R language region conversion | data sharing](/img/6e/9e0abf8db5ec93080033bd89605ac2.jpg)
[Video] visual interpretation of Markov chain principle and Mrs example of R language region conversion | data sharing

G027-OP-INS-RHEL-04 RedHat OpenStack 创建自定义的QCOW2格式镜像
[email protected](using password:YES)"/>全网最简单解决方式1045-Access denied for user [email protected](using password:YES)

【OpenCV 例程 300篇】239. Harris 角点检测之精确定位(cornerSubPix)

深度学习的训练、预测过程详解【以LeNet模型和CIFAR10数据集为例】

机器学习强基计划0-4:通俗理解奥卡姆剃刀与没有免费午餐定理
随机推荐
Connotation and application of industrial Internet
Eccv2022 | transclassp class level grab posture migration
如何理解Keras中的指标Metrics
0719RHCSA
深度学习的训练、预测过程详解【以LeNet模型和CIFAR10数据集为例】
迁移PaloAlto HA高可用防火墙到Panorama
Go: Gin custom log output format
外围系统调用SAP的WebAPI接口
【视频】马尔可夫链蒙特卡罗方法MCMC原理与R语言实现|数据分享
[Video] Markov chain Monte Carlo method MCMC principle and R language implementation | data sharing
一味地做大元宇宙的规模,已经背离了元宇宙本该有的发展逻辑
全网最简单解决方式1045-Access denied for user [email protected](using password:YES)
TCP的拥塞控制
卷积神经网络模型之——LeNet网络结构与代码实现
massCode 一款优秀的开源代码片段管理器
0720RHCSA
R language GLM generalized linear model: logistic regression, Poisson regression fitting mouse clinical trial data (dose and response) examples and self-test questions
EMQX Cloud 更新:日志分析增加更多参数,监控运维更省心
R语言GLM广义线性模型:逻辑回归、泊松回归拟合小鼠临床试验数据(剂量和反应)示例和自测题
【AI4Code】《Unified Pre-training for Program Understanding and Generation》 NAACL 2021