当前位置:网站首页>Source code analysis of ThreadPoolExecutor
Source code analysis of ThreadPoolExecutor
2022-07-23 18:59:00 【Notes sharing and perception of programming Xiaobai】
Tips : When the article is finished , Directories can be generated automatically , How to generate it, please refer to the help document on the right
List of articles
- One 、ThreadPoolExecutor Application mode
- Two 、ThreadPoolExecutor Core parameters
- 3、 ... and 、ThreadPoolExecutor Execute the process
- Four 、ThreadPoolExecutor state
- 5、 ... and 、execute Method
- 6、 ... and 、 addWorker Method
- 7、 ... and 、 Worker class ( Worker thread class ) Simple understanding of
- 8、 ... and 、 runWorker Method
- Nine 、getTask Method
- Ten 、processWorkerExit Method
One 、ThreadPoolExecutor Application mode
Why use thread pools instead of threads
stay java in , Create a new thread if each request arrives , The cost is quite large . In practical use , The server spends considerable time and system resources on creating and destroying threads , It may even take more time and resources to process actual user requests . In addition to the overhead of creating and destroying threads , Active threads also need to consume system resources . If in a jvm Too many threads are created in , May cause the system to over consume memory or “ switching ” Resulting in insufficient system resources . To prevent a shortage of resources , Server applications need to take some measures to limit the number of requests processed at any given time , Minimize the number of threads created and destroyed , In particular, the creation and destruction of some resources consuming threads , Try to use existing objects for services , This is it. “ Pool resources ” Causes of Technology .
Thread pool is mainly used to solve the problem of thread life cycle cost and resource shortage . By reusing threads for multiple tasks , The cost of thread creation is allocated to multiple tasks , And since the thread already exists when the request arrives , So the delay caused by thread creation is eliminated . such , The request can be served immediately , Respond faster with Applications . in addition , By properly adjusting the number of threads in the thread, we can prevent the shortage of resources .
Why does Alibaba specification require the use of ThreadPoolExecutor, instead of Executors?
own new The operation of thread pool is a specification of Alibaba 、 If you don't control the core parameters manually , There is no way to optimize the performance of the thread pool , After all Executors Thread pool provided in , Many are doomed
It may not be suitable for the current business and our CPU Hardware , Our business is full of IO Dense , There are plenty of them CPU Concentrated ( What is? IO What is intensive CPU intensive ) Of
( The conclusion is Executors The parameters of the thread pool provided in the tool are fixed , The created thread pool is not necessarily suitable for our business or hardware resources )
ThreadPoolExecutor Creating a thread pool , It can be customized , Create more suitable for our own business and CPU Thread pool of hardware resources
Two 、ThreadPoolExecutor Core parameters
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

- corePoolSize Number of core threads
- maximumPoolSize Maximum number of threads , Maximum number of threads - The number of core threads is equal to the number of emergency threads
- keepAliveTime Idle time of emergency thread
- unit :keepAliveTime Time unit of
- workQueue Work queue
- threadFactory Thread factory
- handler : Refusal strategy
JDK The default four rejection policies 
- AbortPolicy Let the caller throw RejectedExecutionException abnormal , This is the default strategy
- CallerRunsPolicy Let the caller run the task
- DiscardPolicy Give up this mission
- DiscardOldestPolicy Abandon the earliest task in the queue , This task replaces
Can achieve RejectedExecutionHandler Interface , Custom reject policy
executor and submit The difference between
executor It can only be transmitted Runnable The task of
submit It can be transmitted Runnable and Callable, That is to say submit Can pass Callable Get future, Get the result of executing the task
3、 ... and 、ThreadPoolExecutor Execute the process
We go through execute Method to see the execution process of the thread pool 

Four 、ThreadPoolExecutor state
4.1 The core attribute of thread pool ctl


4.2 State transition of thread pool




5、 ... and 、execute Method
The first point is the core : adopt execute Method , View the overall thread pool execution process , And some judgments to avoid concurrency
The second point is the core : Why does the thread pool add a non core thread with an empty task ( Emergency thread ) The thread pool
( In order to process the number of working threads is 0, But there are tasks in the task queue )
public void execute(Runnable command) {
// Judge not empty
if (command == null)
throw new NullPointerException();
// obtain ctl Value
int c = ctl.get();
// Whether the number of worker threads is less than the number of core threads
if (workerCountOf(c) < corePoolSize) {
// adopt addWorker Method to add a number of core threads to execute command Mission
if (addWorker(command, true))
// Adding the number of core threads succeeded , return true, Go straight back to
return;
// If it were · Concurrent , Add the thread that failed the core thread , Need to get it again ctl value
c = ctl.get();
}
// Failed to create the number of core threads
// Judge whether the current thread pool state is RUNNING
// If it is RUNNING, perform offer Method to add a task to a work queue
if (isRunning(c) && workQueue.offer(command)) {
// Adding task to work queue succeeded
// Get... Again ctl value
int recheck = ctl.get();
// Judge whether the current thread pool state is RUNNING state , If not , You need to remove tasks from the work queue
if (! isRunning(recheck) && remove(command))
// Execute reject strategy ( The thread pool state is incorrect , Execute reject strategy )
reject(command);
// Determine whether the number of worker threads is 0
else if (workerCountOf(recheck) == 0)
// The number of worker threads is 0, But there are tasks in the work queue
// Add an empty task , Emergency thread , Processing tasks queued in the work queue
addWorker(null, false);
}
// Failed to add task to work queue , Add an emergency thread to execute the current task
else if (!addWorker(command, false))
// Adding emergency thread failed , perform reject Refusal strategy
reject(command);
}
1、 First, judge whether the task is empty , If you throw an exception directly NullPointerException
2、 Judge whether the number of parent core threads is used up , If there are core threads , Just put Try The task is left to the core thread , If you give it to the core thread successfully , Go straight back to
This 【 Try 】 This time is very important , Because in the case of concurrency , The number of core threads may be robbed in an instant
3、 If in the process of handing over to the core thread , The core thread is occupied by others , Then regain ctl Value , Through high 3 Three judges whether the thread pool is RUNNING state , If it is , Just Try Hand over the task to the queue
4、 If the thread pool is not RUNNING state , Or the task queue is full , Just try to give it to the emergency thread , If the attempt fails , Directly implement the decision strategy
5、 If thread pool RUNNING State and Try The task was handed over to the queue successfully , Will get... Again ctl Value , Again, judge whether the state of the thread pool is RUNNING, If not, try to remove the previously added task , Execute the rejection policy after success
6、 If not RUNNING, Or is it RUNNING But the attempt to remove the task failed , Just judge whether there are working threads in the thread pool , without , Create an emergency thread of empty tasks to empty the tasks in the task queue
6、 ... and 、 addWorker Method
Add worker threads
private boolean addWorker(Runnable firstTask, boolean core) {
// Judgment of thread pool state , And the number of working threads
// Outer layer for Identification of the loop
retry:
for (;;) {
// obtain ctl Value
int c = ctl.get();
// Get the status of the thread pool
int rs = runStateOf(c);
// If the state of the thread pool is not RUNNING and Not satisfied at the same time 【 The state of the thread pool is SHUTDOWN And the task is empty and the queue is not empty 】 The condition of returns directly false ( Corresponding addWorker(null, false))
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
return false;
for (;;) {
// Gets the current number of worker threads
int wc = workerCountOf(c);
// Determine whether the current number of worker threads is greater than the maximum
if (wc >= CAPACITY ||
// If it's a core thread , Is it greater than corePoolSize , If it is an emergency thread , Is it greater than maximumPoolSize
wc >= (core ? corePoolSize : maximumPoolSize))
// The current worker thread has reached the maximum
return false;
// With CAS The way , Number of worker threads +1, If it works , Jump directly out of the outer layer for loop
if (compareAndIncrementWorkerCount(c))
break retry;
// Recapture ctl value
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // Based on newly acquired ctl Get the thread pool status , Judgment and previous rs Whether the status is consistent
continue retry; // It indicates that concurrent operations cause changes in the state of the thread pool , Need to re judge the status
}
}
// Add a worker thread and start the worker thread
// Whether the worker thread starts
boolean workerStarted = false;
// Whether the worker thread is added successfully
boolean workerAdded = false;
//Worker It's the worker thread
Worker w = null;
try {
// new Worker Build worker threads , Throw the task to Worker in
w = new Worker(firstTask);
// Get Worker Bound in Thread The thread of
final Thread t = w.thread;
if (t != null) {
// Definitely not null , Robustness judgment
// Lock ....( Concurrent , Others may perform shutDown)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Based on the retrieved ctl, Get the status of the thread pool
int rs = runStateOf(ctl.get());
// If the line path pool status is RUNNING, Just add a worker thread
if (rs < SHUTDOWN ||
// If the status of the thread pool is SHUTDOWN And the incoming task is null( Corresponding addWorker(null, false))
(rs == SHUTDOWN && firstTask == null)) {
// Add worker threads
// Determine whether the current thread is in run state ( Robustness judgment )
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// What will be constructed Worker Object added to workers(HashSet) in
workers.add(w);
// Get the number of worker threads
int s = workers.size();
// If the current number of worker threads , Greater than the maximum number of worker threads in history , Just copy it back to largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
// Set the identity added by the worker thread to true
workerAdded = true;
}
} finally {
// Release the lock
mainLock.unlock();
}
if (workerAdded) {
// Adding work task succeeded , Start thread
t.start();
// Set the identity of the worker thread startup to true
workerStarted = true;
}
}
} finally {
// If the worker thread fails to start ( The state of the thread pool may have changed at startup )
if (! workerStarted)
// remove workers Worker threads in , Number of worker threads -1, Try changing the thread pool state to TIDYING
addWorkerFailed(w);
}
return workerStarted;
}
// After starting the thread fails , Remedial actions done
private void addWorkerFailed(Worker w) {
// Lock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Determine whether the previously created worker thread is successful
if (w != null)
// If it works , take workers The current worker thread in is removed
workers.remove(w);
// Number of project threads -1
decrementWorkerCount();
// Try changing the thread pool state to TIDYING
tryTerminate();
} finally {
mainLock.unlock();
}
}
7、 ... and 、 Worker class ( Worker thread class ) Simple understanding of
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// Working thread Thread object , Built during initialization
final Thread thread;
// Tasks to be performed
Runnable firstTask;
volatile long completedTasks;
// The worker thread just initialized is not allowed to be interrupted
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
// for the first time new The task will be assigned to firstTask
this.firstTask = firstTask;
// to Worker structure Thread object
this.thread = getThreadFactory().newThread(this);
}
// call t.start(), perform run Method
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1); }
public boolean tryLock() {
return tryAcquire(1); }
public void unlock() {
release(1); }
public boolean isLocked() {
return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
1、Worker Class inheritance AbstractQueuedSynchronizer And realize Runnable
2、 Why? Worker Realize it by yourself AQS lock ?
Reentrant is not allowed , If another thread executes shutdown Words , The worker thread will not be interrupted , But if it is stop Forced interruption
Interrupting a thread does not stop the thread immediately , It's going to be thread The interrupt ID of is set to true
The following source code can also be seen Worker Reentry lock is not supported
public void lock() {
acquire(1); }
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
8、 ... and 、 runWorker Method
execute()—>addWorker()——>t.run()——>run() (Worker Medium run Method )
public void run() {
runWorker(this);
}
The process of performing tasks , And do the interrupt thread related lock operation
final void runWorker(Worker w) {
// Get the current thread
Thread wt = Thread.currentThread();
// obtain Worker The task
Runnable task = w.firstTask;
// hold Worker Set the task in to null
w.firstTask = null;
//unlock: hold Worker in state Set as 0, At the same time ExclusiveOwnerThread Set as null(state from -1 Turn into 0, Representatives can be interrupted )
w.unlock(); // allow interrupts
// When performing tasks , Check whether there is an abnormal mark in the sub function
boolean completedAbruptly = true;
try {
// The first way to get the task , Is to perform execute、submit when , Incoming tasks are processed directly
// The second way to get tasks , Get task execution from the work queue
while (task != null || (task = getTask()) != null) {
// Lock , stay SHUTDOWN State, , The current thread is not allowed to be interrupted
// stay Worker Internally implemented lock , It is not a reentrant lock , Because when interrupted , You need to be right worker Conduct lock, If you can't get it, it means that the current worker thread is executing a task
w.lock();
// If the thread pool state changes to STOP state , The current thread must be interrupted
// First state , Determine whether the current thread is STOP state
// The second judgment : Check the interrupt flag bit , Merge and return , If false, The explanation is not STOP, If it becomes true, You need to check again whether the thread pool status is caused by concurrent operations STOP
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted()&&runStateAtLeast(ctl.get(), STOP)))
// Check whether the current thread interrupt flag is false, If false, Is executed wt.interrupt()
&&!wt.isInterrupted())
// Set the interrupt flag to true
wt.interrupt();
try {
// Hook function for executing tasks , Lead to enhance , Not a dynamic proxy ( In fact, it is an empty method , Programmers can rewrite )
beforeExecute(wt, task);
Throwable thrown = null;
try {
// Perform tasks
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// Hook function for executing tasks , The rear enhancement , Not a dynamic proxy ( In fact, it is an empty method , Programmers can rewrite )
afterExecute(task, thrown);
}
} finally {
// take task Set as null
task = null;
// Perform a successful mission +1
w.completedTasks++;
// take state The mark is set to 0
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Nine 、getTask Method
How to work from a queue ,workQueue Get task from
From work i Get task in queue
private Runnable getTask() {
// identification ( Non core threads can kill )
boolean timedOut = false; // Did the last poll() time out?
// Dead cycle
for (;;) {
// obtain ctl value
int c = ctl.get();
// Get the status of the thread pool
int rs = runStateOf(c);
// ============================= Determine thread pool state =======================
// Check if queue empty only if necessary.
// If entering if, You need to kill the current worker thread
// Thread pool state enters SHUTDOWN、STOP
// If the thread pool status is greater than STOP, You need to remove the current worker thread
// If the thread pool state is SHUTDOWN, And the work queue is empty , You need to remove the current worker thread
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// Remove the current worker thread
decrementWorkerCount();
// return null, hand processWorkerExit Remove the current worker thread
return null;
}
// ============================= Determine the number of worker threads =======================
// Get the number of worker threads
int wc = workerCountOf(c);
//allowCoreThreadTimeOut : Allow core thread timeout ( It's usually false)
// Whether the worker thread is larger than the core thread
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// Whether the worker thread is larger than the maximum thread
// The number of worker threads is greater than the number of core threads , And the current thread has timed out
// Try to kill the current thread
if ((wc > maximumPoolSize || (timed && timedOut))
// If the number of worker threads is greater than 1, Or the work queue is empty
// If the work queue is empty , I'll kill myself
// If the number of worker threads is greater than 1, I'll kill myself
&& (wc > 1 || workQueue.isEmpty())) {
// be based on CAS Operation removes the current thread , Only one thread will CAS success
if (compareAndDecrementWorkerCount(c))
// return null, hand processWorkerExit Remove the current worker thread
return null;
continue;
}
// ============================= Get the task from the work queue =======================
try {
Runnable r = timed ?
// Block getting tasks from the work queue for a certain time ( It can be understood as non core taking this method )
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// Keep blocking ,( It can be understood as taking this method as the core )
workQueue.take();
if (r != null)
// If you get the task , Go straight back
return r;
// Getting tasks from the queue timed out ( The maximum lifetime of the current worker thread has been reached )
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
Ten 、processWorkerExit Method
// Remove the operation of the current worker thread
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If executed processWorkerExit Method operation does not sleep getTask The operation , It is directly caused by the exception ( Generally, hook )
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// Because the execution method is illegal , Manually deduct the number of worker threads
decrementWorkerCount();
// Lock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Record the total number of tasks processed by the current thread pool
completedTaskCount += w.completedTasks;
// Remove worker threads
workers.remove(w);
} finally {
mainLock.unlock();
}
// Try to link the line to the city ( Excessive state —— Destroy State )
tryTerminate();
// Recapture ctl
int c = ctl.get();
// Current thread pool state , When it comes to this , That is the SHUTDOWN、RUNNING
if (runStateLessThan(c, STOP)) {
// If the current worker thread is removed in normal state
if (!completedAbruptly) {
// Minimum number of core threads , How much is allowed
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// If the task in the work queue is not empty , Set the minimum value of the worker thread
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// There are also worker threads in the thread pool
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// It indicates that the worker thread is not removed in the normal way , Add another worker thread
// Thread pool work queue is not empty , And there are no worker threads , Add another worker thread
addWorker(null, false);
}
}
Thread pool common questions :
1、 Thread pool 7 Parameters ?
2、 Execution flow of thread pool ?
3、 The decision strategy of thread pool ?
4、 The core attribute of thread pool ctl What is it? ?
5、 What is the state of the thread pool and how it changes ?
6、 Thread pool execute and submit The difference between
7、 How are worker threads expressed in the thread pool
8、 Where are worker threads stored
边栏推荐
- 有人是靠自学建模找到工作的吗?千万别让这些思维害了你
- 什么是堆栈以及堆栈的区别
- JUC concurrent programming [detailed explanation and demonstration]
- What does MySQL access port mean_ What is the port number of the database port
- Redis [2022 latest interview question]
- What is the current situation of the next generation industry? 90% of career changing modelers are learning this process
- FPGA实现IIC协议(一)IIC总线协议
- The first layer of OSI model: physical layer, the cornerstone of existence!
- C#Split的用法,Split分割字符串
- 我的创作纪念日
猜你喜欢

JUC并发编程【详解及演示】

Google正在改进所有产品中的肤色表现 践行“图像公平”理念
![[whole process of game modeling model production] 3ds Max and ZBrush produce radio receivers](/img/c9/302a52d2c9f6fc3b5971e9a0ea55e6.png)
[whole process of game modeling model production] 3ds Max and ZBrush produce radio receivers

Spark 安装与启动

【2020】【论文笔记】太赫兹新型探测——太赫兹特性介绍、各种太赫兹探测器

Modeling at the beginning of learning is very confused, how to learn next generation role modeling?

Great God "magic change" airpods, equipped with usb-c interface, 3D printing shell makes maintenance easier
![Gradle [graphic installation and use demonstration]](/img/e4/ac69e490fd06ed4ad6a90972273373.png)
Gradle [graphic installation and use demonstration]
![[sharing 3D modeling and production skills] how ZBrush turns pictures into relief models](/img/fc/c400821c07ea43576a14a30d96183f.png)
[sharing 3D modeling and production skills] how ZBrush turns pictures into relief models

一文了解 NebulaGraph 上的 Spark 项目
随机推荐
【2018】【论文笔记】石墨烯场效应管及【1】——GFETs的种类和原理,GFETs特性,GFETs在太赫兹中的应用和原理
Leetcode 0131. split palindrome string
How to use quota
SQL 语句练习
quota的使用方法
【游戏建模模型制作全流程】用ZBrush制作游戏士兵角色
ResponseBodyAdvice接口使用导致的报错及解决
[sharing 3D modeling and production skills] how ZBrush turns pictures into relief models
【2020】【论文笔记】相变材料与超表面——
Is learning next generation modeling a good scene or a good role? Choose the right profession and pay more than half
jumpserver管理员账号被锁定
【2022】【论文笔记】太赫兹量子阱——
多线程与高并发day11
Know two things: how does redis realize inventory deduction and prevent oversold?
【攻防世界WEB】难度四星12分进阶题:Cat
《通信软件开发与应用》课程结业报告
Is it suitable for learning 3D modeling? You can't lose one of these five points
As a senior 3D modeler, I give some suggestions to novice learning partners to use
Rapid establishment of devstack cloud computing platform
零基础要学建模该从何开始?如何才能学好游戏建模?