当前位置:网站首页>In order to improve efficiency, there are various problems when using parallelstream
In order to improve efficiency, there are various problems when using parallelstream
2022-07-25 13:17:00 【Encounter in the evening wind】
One 、 Preface
Xiaobian recently took over a new task, which requires access to third-party interfaces for tens of millions of data volumes. It is conceivable that it requires extremely high performance , At that time, my first reaction was to use ParallelStream Don't you solve the problem of operating data . Imagination is beautiful, but the result directly presses me on the ground and rubs ..........
Two 、 scene
Let's look at the code first
List<Business> getinfo = mapper.getinfo(createDate.toString("yyy-MM-dd"));
List<Business> infos = new ArrayList<>();
gdispx_data.parallelStream().forEach(o -> {
// Data processing
infos.add(o);
});
There seems to be no problem , The number of elements in the result set of execution discovery is not equal to the expected number of elements , And there are null Elements , And there is a chance of array subscript out of bounds errors .
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:189)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.IntPipeline.forEach(IntPipeline.java:404)
at jit.wxs.disruptor.stream.StreamTest.main(StreamTest.java:15)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 15
at java.util.ArrayList.add(ArrayList.java:463)
at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205)
at java.util.stream.IntPipeline$3$1.accept(IntPipeline.java:233)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.execLocalTasks(ForkJoinPool.java:1040)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1058)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
3、 ... and 、 analysis
The reason for the problem is also very simple ,Parallel Stream Internal use ForkJoinPool Thread pool for execution , That is to say, there are thread safety problems , and ArrayList It's not thread safe . Next, we will analyze the causes of various abnormal conditions in turn .
3.1 The number of elements is missing
// java.util.ArrayList#add(E)
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
The reason why the array subscript is out of bounds is ArrayList Of add() Methods elementData[size++] = e, This line of code is not an atomic operation , It can be disassembled into :
Read size value
take e Add to size The location of , namely elementData[size] = e
size++
There is a memory visibility problem , When a thread A Read from memory size after , Set up e value , take size Add 1, Then write to memory . There may be threads in the process B Also changed the size And write to memory , So thread A Writing values to memory will lose threads B Update . This explains the fact that the array length is smaller than the original array ( Element missing ) The situation of .
3.2 null Elements
null Element generation is similar to element data loss , It's also because elementData[size++] = e It is not caused by atomic operation . Suppose there are three threads , Threads 1、 Threads 2、 Threads 3. Three threads start executing at the same time , initial size The value is 1.
Threads 1 All done , here size Is updated to 2.
Threads 2 Start reading size value = 1、 take e Add to size After the position, the time slice is used up , It's your turn to take the third step size++ Read thread 1 Update ,size Directly updated to 3.【 notes : Thread... Here 2 Of e Values are also lost , Threaded 1 Cover 】
Threads 3 Start reading size value = 1 After that, the time slice is used up , It's your turn to carry out the second step e Add to size Position read thread 2 Update ,size Turned into 3.size = 2 The position of is skipped , therefore elementData[2] by null 了 .
3.3 Array subscript out of bounds
The array out of bounds exception mainly occurs at the critical point before the expansion of the array . Suppose that only one element can be added to the current array , Two threads are ready to execute at the same time ensureCapacityInternal(size + 1), Read at the same time size value , Add 1 Back entry ensureCapacityInternal Will not lead to expansion .
sign out ensureCapacityInternal after , Two threads execute at the same time elementData[size] = e, Threads B Of size++ To complete , Suppose that at this moment, the thread A Read thread B Update , Threads A Re execution size++, here size The actual value of will be greater than the capacity of the array , In this way, an array out of bounds exception will occur .
Four 、 solve
Solving the problem is also very simple , There are two kinds , One is to turn the result set into thread safe .
List<Integer> list = new CopyOnWriteArrayList<>();
// or
List<Integer> list = Collections.synchronizedList(new ArrayList<>());5、 ... and 、 Add
When using parallel flow, the order of elements cannot be guaranteed , That is, even if you use a synchronized set, you can only ensure that the elements are correct, but you can't guarantee the order of the elements , This is in Oracle It is also stated in the official documents , Don't use those with side effects lambda expression , So what are the side effects lambda Expression? ? The official document says https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html That is to say lambda The result of the expression will change during the execution of the pipeline
边栏推荐
- Substance Designer 2021软件安装包下载及安装教程
- pytorch创建自己的Dataset加载数据集
- Concurrent programming - memory model JMM
- 程序的内存布局
- ESP32-C3 基于Arduino框架下Blinker点灯控制10路开关或继电器组
- CONDA common commands: install, update, create, activate, close, view, uninstall, delete, clean, rename, change source, problem
- The world is exploding, and the Google server has collapsed
- 0720RHCSA
- Shell common script: check whether a domain name and IP address are connected
- yum和vim须掌握的常用操作
猜你喜欢

卷积神经网络模型之——GoogLeNet网络结构与代码实现

卷积神经网络模型之——LeNet网络结构与代码实现

Django 2 ----- database and admin

Zero basic learning canoe panel (16) -- clock control/panel control/start stop control/tab control

【AI4Code】《Pythia: AI-assisted Code Completion System》(KDD 2019)

Design and principle of thread pool

massCode 一款优秀的开源代码片段管理器

Concurrent programming - memory model JMM

How to understand metrics in keras

Any time, any place, super detective, seriously handle the case!
随机推荐
ORAN专题系列-21:主要的玩家(设备商)以及他们各自的态度、擅长领域
Shell common script: get the IP address of the network card
B tree and b+ tree
Convolutional neural network model -- lenet network structure and code implementation
基于百问网IMX6ULL_PRO开发板移植LCD多点触摸驱动(GT911)
Azure Devops(十四) 使用Azure的私有Nuget仓库
ESP32-C3 基于Arduino框架下Blinker点灯控制10路开关或继电器组
Docker learning - redis cluster -3 master and 3 slave - capacity expansion - capacity reduction building
Requirements specification template
Migrate PaloAlto ha high availability firewall to panorama
【服务器数据恢复】HP EVA服务器存储RAID信息断电丢失的数据恢复
【视频】马尔可夫链蒙特卡罗方法MCMC原理与R语言实现|数据分享
Substance designer 2021 software installation package download and installation tutorial
并发编程 — 内存模型 JMM
MLIR原理与应用技术杂谈
Memory layout of program
Summary of Niuke forum project deployment
AtCoder Beginner Contest 261 F // 树状数组
【AI4Code】《Unified Pre-training for Program Understanding and Generation》 NAACL 2021
Shell常用脚本:检测某域名、IP地址是否通