当前位置:网站首页>ThreadPoolExecutor线程池实现原理与源码解析
ThreadPoolExecutor线程池实现原理与源码解析
2022-06-23 07:40:00 【李嘉图呀李嘉图】
目录
3.3.3 shutdown() 和 shutdowNow() 的区别
3.5.2 shutdownNow()与任务执行过程综合分析
1. 线程池实现原理
下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地 从队列中取任务,这是一个典型的生产者—消费者模型。

要实现这样一个线程池,有几个问题需要考虑:
- 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
- 线程池中的线程个数是固定的,还是动态变化的?
- 每次提交新任务,是放入队列?还是开新线程?
- 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
针对问题4,有3种做法:
- 不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制。当队列为空时,线 程池中的线程只能睡眠一会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询。
- 不使用阻塞队列,但在队列外部、线程池内部实现了阻塞/唤醒机制。
- 使用阻塞队列。
显然做法3最完善,避免了线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡 眠/轮询带来的资源消耗和延迟。因为如此ThreadPoolExector/ScheduledThreadPoolExecutor都是基于阻塞队列来实现的,而不是一般的队列。
2. 线程池类继承体系

在这里,有两个核心的类: ThreadPoolExector 和 ScheduledThreadPoolExecutor ,后者不仅可以执行某个任务,还可以周期性地执行任务。
向线程池中提交的每个任务,都必须实现 Runnable 接口,通过最上面的 Executor 接口中的
execute(Runnable command) 向线程池提交任务。
然后,在 ExecutorService 中,定义了线程池的关闭接口 shutdown() ,还定义了可以有返回值的任务,也就是 Callable。
3. 代码分析
3.1 核心数据结构
public class ThreadPoolExecutor extends AbstractExecutorService {
//线程池状态 和 有效线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 阻塞队列,用于存放任务
private final BlockingQueue<Runnable> workQueue;
// 对线程池内部各种变量进行互斥访问控制
private final ReentrantLock mainLock = new ReentrantLock();
// 线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//....
}Worker 是 ThreadPoolExector 的内部类,每个 Worker 都存放一个线程
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable{
//封装线程
final Thread thread;
//要运行的初始任务。可能是空。
Runnable firstTask;
//worker执行完毕的任务数
volatile long completedTasks;
//.....
}Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁用于线程池的关闭、线程执行任务的过程。
3.2 核心配置参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}参数解释:
1. corePoolSize:核心线程数,在线程池中始终维护的线程个数。
2. maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。
3. keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩
回corePoolSize。
4. blockingQueue:线程池所用的队列类型。
5. threadFactory:线程创建工厂,可以自定义,默认值 Executors.defaultThreadFactory() 。
6. RejectedExecutionHandler:corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒
绝策略。
配置参数在任务提交过程中的运行流程:
步骤一:判断当前线程数是否大于或等于核心线程数 corePoolSize。如果小于,则新建线程 执行;如果大于,则进入步骤二。
步骤二:判断队列是否已满。如未满,则放入队列;如已满,则进入步骤三。
步骤三:判断当前线程数是否大于或等于maxPoolSize。如果小于,则新建线程执行;如果 大于,则进入步骤四。
步骤四:根据拒绝策略来拒绝任务。
总结:首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize, 最后使用拒绝策略。
3.3 线程池优雅关闭
线程池的关闭,较之线程的关闭更加复杂。
当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。
因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。
3.3.1 线程池的生命周期
在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字段里面,即ctl变量。如下图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,这两个变量是分开存储的。


由代码中可以看到,ctl变量被拆成两半,最高的3位表示线程池的状态,低的29位表示线程的个数。
线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING 和 TERMINATED。
状态之间的迁移过程,如图所示:

线程池有两个关闭方法,shutdown() 和 shutdownNow(),这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING状态;最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才真正关闭。
这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在 TIDYING = 2 时,接下来只可能迁移到 TERMINATED = 3,不可能迁移回 STOP = 1 或者其他状态。
除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }3.3.2 正确关闭线程池的步骤
关闭线程池的过程为:在调用 shutdown() 或者 shutdownNow() 之后,线程池并不会立即关闭,接下来需要调用 awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:
executor.shutdown();
try {
boolean flag = true;
do {
flag = ! executor.awaitTermination(500, TimeUnit.MICROSECONDS);
}while (flag);
}catch (InterruptedException e){
//.....
}awaitTermination(...) 方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回;如果不是,则通过 termination 条件变量阻塞一段时间,之后继续判断。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}3.3.3 shutdown() 和 shutdowNow() 的区别
- shutdown() 不会清空任务队列,会等所有任务执行完成,shutdownNow() 清空任务队列。
- shutdown() 只会中断空闲的线程,shutdownNow() 会中断所有线程。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//加锁保证线程安全
mainLock.lock();
try {
//检查是否有关闭线程的权限
checkShutdownAccess();
//将线程池状态改为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲线程
interruptIdleWorkers();
//具有空方法体的钩子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//加锁保证线程安全
mainLock.lock();
try {
//检查是否有关闭线程池的权限
checkShutdownAccess();
//将线程池状态设置为STOP
advanceRunState(STOP);
//中断所有线程
interruptWorkers();
//清空任务队列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}接下来看下中断空闲线程和中断所有线程的区别:
shutdown() 方法中的 interruptIdleWorkers() 方法的实现:
//中断空闲线程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//中断空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers) {
Thread t = w.thread;
//如果tryLock成功,表示线程处于空闲状态;
//如果不成功,表示线程持有锁,正在执行某个任务
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}shutdownNow() 方法中的interruptWorkers() 方法的实现:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}关键区别点在 tryLock():
一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。
tryLock() 如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。
public boolean tryLock() { return tryAcquire(1); }
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}shutdownNow() 调用了 interruptWorkers() 方法:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}void interruptIfStarted() {
Thread t;
//运行状态中且没有被中断的全部中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}shutdown() 和 shutdownNow() 都调用了 tryTerminate() 方法,如下所示:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//当workQueue为空,workCount = 0 时
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将状态切换到到TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//调用钩子函数
terminated();
} finally {
//将状态由 TIDYING 改为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//通知awaitTermination(.....)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}tryTerminate()不会强行终止线程池,只是做了一下检测:当 workerCount 为0,workerQueue为 空时,先把状态切换到 TIDYING,然后调用钩子方法 terminated()。当钩子方法执行完成时,把状态从 TIDYING 改为 TERMINATED,接着调用 termination.sinaglAll(),通知前面阻塞在 awaitTermination 的所有调用者线程。
所以,TIDYING 和 TREMINATED 的区别是在二者之间执行了一个钩子方法terminated(),目前是一个空实现。
3.4 任务提交过程分析
提交任务流程如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当前线程小于corePoolSize, 则启动新线程
if (workerCountOf(c) < corePoolSize) {
// 添加worker, 并将comman设置为Worker的第一个任务开始执行
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果当前的线程大于或等于corePoolSize, 则调用workerQueue.offer放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池正在停止,则将command任务从队列中移除,并拒绝command任务请求
if (! isRunning(recheck) && remove(command))
reject(command);
// 放入到队列中发现没有线程执行任务,开启新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 队列已满,且线程数大于maxPoolSize, 执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
// 用于启动新线程,如果core参数为true, 则使用corePoolSize作为上限,否则使用maxPoolSize作为上限
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池状态值起码是 SHUTDOWN 和 STOP,或则第一个任务不是null,或者工作队列为空
// 则添加worker失败,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 工作线程数达到上限,要么是 corePoolSize 要么是 maximumPoolSize,启动线程失败
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加worker数量成功,返回到retry语句
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 如果线程池运行状态起码是SHUTDOWN,则重试retry标签语句,CAS
if (runStateOf(c) != rs)
continue retry;
}
}
// worker数量加1成功后,接着运行:
boolean workerStarted = false;
boolean workerAdded = false;
ThreadPoolExecutor.Worker w = null;
try {
// 新建worker对象
w = new ThreadPoolExecutor.Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 由于线程已经在运行中,无法启动,抛异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将线程对应的worker加入worker集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加worker成功,则启动该worker对应的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果启动新线程失败
if (! workerStarted)
// workCount - 1
addWorkerFailed(w);
}
return workerStarted;
}3.5 任务执行过程分析
在上面的任务提交过程中,可能会开启一个新的 Worker,并把任务本身作为 firstTask 赋给该 Worker。但对于一个 Worker 来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。
下面来看 Woker 的 run() 方法的实现过程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 当前Worker对象封装的线程
final Thread thread;
// 线程需要运行的第一个任务。可以是null,如果是null,则线程从队列获取任务
Runnable firstTask;
// 记录线程执行完成的任务数量,每个线程一个计数器
volatile long completedTasks;
/**
* 使用给定的第一个任务并利用线程工厂创建Worker实例
* @param firstTask 线程的第一个任务,如果没有,就设置为null,
* 此时线程会从队列 获取任务。
*/
Worker(Runnable firstTask) {
// 线程处于阻塞状态,调用runWorker的时候中断
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 调用ThreadPoolExecutor的runWorker方法执行线程的运行
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 中断Worker封装的线程
w.unlock();
boolean completedAbruptly = true;
try {
// 如果线程初始任务不是null,或者从队列获取的任务不是null,表示该线程应该执行任 务。
while (task != null || (task = getTask()) != null) {
// 获取线程锁
w.lock();
// 如果线程池停止了,确保线程被中断
// 如果线程池正在运行,确保线程不被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 获取到任务后,再次检查线程池状态,如果发现线程池已经停止,则给自己发 中断信号
wt.interrupt();
try {
// 任务执行之前的钩子方法,实现为空
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 任务执行结束后的钩子方法,实现为空
afterExecute(task, thrown);
}
} finally {
// 任务执行完成,将task设置为null
task = null;
// 线程已完成的任务数加1
w.completedTasks++;
// 释放线程锁
w.unlock();
}
}
// 判断线程是否是正常退出
completedAbruptly = false;
} finally {
// Worker退出
processWorkerExit(w, completedAbruptly);
}
}
}3.5.1 shutdown()与任务执行过程综合分析
把任务的执行过程 和 上面的线程池的关闭过程结合起来进行分析,当调用 shutdown() 的时候,可能出现以下几种场景:
1. 当调用 shutdown() 的时候,所有线程都处于空闲状态。 这意味着任务队列一定是空的。此时,所有线程都会阻塞在 getTask() 方法的地方。然后,所有线程都会收到interruptIdleWorkers() 发来的中断信号,getTask() 返回null,所有Worker都会退出while循环,之后执processWorkerExit。
2. 当调用 shutdown() 的时候,所有线程都处于忙碌状态。 此时,队列可能是空的,也可能是非空的。interruptIdleWorkers() 内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask() 才会返回null。之后,就和场景1一样了,退出while循环。
3. 当调用shutdown()的时候,部分线程忙碌,部分线程空闲。 有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在 getTask() 方法的地方。空闲的这些线程会和场景1一样处理,不空闲的线程会和场景2一样处理。
getTask()方法的内部细节:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池调用了shutdownNow(),返回null
// 如果线程池调用了shutdown(),并且任务队列为空,也返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 工作线程数减一
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果队列为空,就会阻塞pool或者take,前者有超时时间,后者没有超时时间
// 一旦中断,此处抛异常,对应上文场景1。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}3.5.2 shutdownNow()与任务执行过程综合分析
和上面的 shutdown() 类似,只是多了一个环节,即清空任务队列。如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程和中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。
当一个 Worker 最终退出的时候,会执行清理工作:
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
// 如果线程正常退出,不会执行if的语句,这里一般是非正常退出,需要将worker数量减一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 将自己的worker从集合移除
workers.remove(w);
} finally {
mainLock.unlock();
}
// 每个线程在结束的时候都会调用该方法,看是否可以停止线程池
tryTerminate();
int c = ctl.get();
// 如果在线程退出前,发现线程池还没有关闭
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果线程池中没有其他线程了,并且任务队列非空
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果工作线程数大于min,表示队列中的任务可以由其他线程执行,退出当前线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果当前线程退出前发现线程池没有结束,任务队列不是空的,也没有其他线程来执行
// 就再启动一个线程来处理。
addWorker(null, false);
}
}3.6 线程池的四种拒绝策略
在 execute(Runnable command) 的最后,调用了 reject(command) 执行拒绝策略,代码如下所示:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}handler 就是我们可以设置的拒绝策略管理器:
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
//默认拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}3.6.1 CallerRunsPolicy策略
策略:谁请求谁处理
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}3.6.2 AbortPolicy策略
策略:抛异常
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}3.6.3 DiscardPolicy策略
策略:直接抛弃不处理
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}3.6.4 DiscardOldestPolicy策略
策略:抛弃最老的任务
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}4. Executors工具类
concurrent包提供了Executors工具类,利用它可以创建各种不同类型的线程池。
4.1 线程池对比
单线程的线程池:
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}固定数目线程的线程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}每接收一个请求,就创建一个线程来执行:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}单线程具有周期调度功能的线程池:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new Executors.DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}多线程,有调度功能的线程池:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}不同类型的线程池,其实都是由前面的几个关键配置参数配置而成的。
4.2 最佳实践
在《阿里巴巴Java开发手册》中,明确禁止使用Executors创建线程池,并要求开发者直接使用 ThreadPoolExector 或 ScheduledThreadPoolExecutor 进行创建。这样做是为了强制开发者明确线程池的运行策略,规避因使用不当而造成资源耗尽的风险。
边栏推荐
- HCIP之路
- C restart application
- 华为云服务器弹性公网IP无法ping
- Tri rapide + Tri par bulle + Tri par insertion + Tri par sélection
- View the file once a second and send the result of the last line of the file to the syslog server
- C# 内存法复制图像bitmap
- openvino系列 18. 通过OpenVINO和OpenCV实现实时的物体识别(RTSP,USB视频读取以及视频文件读取)
- @What is the difference between controller and @restcontroller?
- C Scrollview scroll up or scroll down
- 顺序表课设
猜你喜欢

Rotary table visual screening machine and its image recognition system

The sandbox has reached a cooperation with football player to bring popular football cartoons and animation into the metauniverse

PHP file contains -ctf

建立一有序的顺序表,并实现下列操作: 1.把元素x插入表中并保持有序; 2.查找值为x的元素,若找到将其删除; 3.输出表中各元素的值。

GIF验证码分析
![[try to hack] IP address](/img/ab/ed91f3094ac913a0d79448a2d19015.png)
[try to hack] IP address

Eureka service registration and discovery

深度学习------不同方法实现vgg16

HCIP之路MPLS

Friends of the week
随机推荐
GIF验证码分析
[veusz] import 2D data in CSV
Learn to draw Er graph in an article
HCIP之路第八次实验
开源软件、自由软件、Copyleft、CC都是啥,傻傻分不清楚?
Microsoft Exchange – prevent network attacks
数学知识:快速幂—快速幂
Acwing game 56 [End]
快速删除代码里面的node_modules
一篇文章学会er图绘制
深度学习------不同方法实现vgg16
Playwirght深度入门
生产环境服务器环境搭建+项目发布流程
配置ASMX无法访问
Vs problems when connecting to SQL myconn OPen(); cannot execute
Acwing第 56 场周赛【完结】
Capturing packets to find repeated acks and a large number of TCP retransmissions in TCP sessions -- sack (selective acknowledgement) technology
深度学习------卷积(conv2D)底层
抓包发现tcp会话中老是出现重复的ack和大量的tcp重传——SACK(Selective Acknowledgment, 选择性确认)技术
转盘式视觉筛选机及其图像识别系统