当前位置:网站首页>Source code analysis of ThreadPoolExecutor

Source code analysis of ThreadPoolExecutor

2022-07-23 18:59:00 Notes sharing and perception of programming Xiaobai

Tips : When the article is finished , Directories can be generated automatically , How to generate it, please refer to the help document on the right


One 、ThreadPoolExecutor Application mode

Why use thread pools instead of threads

stay java in , Create a new thread if each request arrives , The cost is quite large . In practical use , The server spends considerable time and system resources on creating and destroying threads , It may even take more time and resources to process actual user requests . In addition to the overhead of creating and destroying threads , Active threads also need to consume system resources . If in a jvm Too many threads are created in , May cause the system to over consume memory or “ switching ” Resulting in insufficient system resources . To prevent a shortage of resources , Server applications need to take some measures to limit the number of requests processed at any given time , Minimize the number of threads created and destroyed , In particular, the creation and destruction of some resources consuming threads , Try to use existing objects for services , This is it. “ Pool resources ” Causes of Technology .

Thread pool is mainly used to solve the problem of thread life cycle cost and resource shortage . By reusing threads for multiple tasks , The cost of thread creation is allocated to multiple tasks , And since the thread already exists when the request arrives , So the delay caused by thread creation is eliminated . such , The request can be served immediately , Respond faster with Applications . in addition , By properly adjusting the number of threads in the thread, we can prevent the shortage of resources .

Why does Alibaba specification require the use of ThreadPoolExecutor, instead of Executors?

own new The operation of thread pool is a specification of Alibaba 、 If you don't control the core parameters manually , There is no way to optimize the performance of the thread pool , After all Executors Thread pool provided in , Many are doomed

It may not be suitable for the current business and our CPU Hardware , Our business is full of IO Dense , There are plenty of them CPU Concentrated ( What is? IO What is intensive CPU intensive ) Of

( The conclusion is Executors The parameters of the thread pool provided in the tool are fixed , The created thread pool is not necessarily suitable for our business or hardware resources )
ThreadPoolExecutor Creating a thread pool , It can be customized , Create more suitable for our own business and CPU Thread pool of hardware resources

Two 、ThreadPoolExecutor Core parameters

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
    
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

 Insert picture description here

  • corePoolSize Number of core threads
  • maximumPoolSize Maximum number of threads , Maximum number of threads - The number of core threads is equal to the number of emergency threads
  • keepAliveTime Idle time of emergency thread
  • unit :keepAliveTime Time unit of
  • workQueue Work queue
  • threadFactory Thread factory
  • handler : Refusal strategy

JDK The default four rejection policies
 Insert picture description here

  • AbortPolicy Let the caller throw RejectedExecutionException abnormal , This is the default strategy
  • CallerRunsPolicy Let the caller run the task
  • DiscardPolicy Give up this mission
  • DiscardOldestPolicy Abandon the earliest task in the queue , This task replaces

Can achieve RejectedExecutionHandler Interface , Custom reject policy

executor and submit The difference between

executor It can only be transmitted Runnable The task of
submit It can be transmitted Runnable and Callable, That is to say submit Can pass Callable Get future, Get the result of executing the task

3、 ... and 、ThreadPoolExecutor Execute the process

We go through execute Method to see the execution process of the thread pool  Insert picture description here
 Insert picture description here

Four 、ThreadPoolExecutor state

4.1 The core attribute of thread pool ctl

 Insert picture description here
 Insert picture description here

4.2 State transition of thread pool

 Insert picture description here
 Insert picture description here

 Insert picture description here

 Insert picture description here

5、 ... and 、execute Method

The first point is the core : adopt execute Method , View the overall thread pool execution process , And some judgments to avoid concurrency
The second point is the core : Why does the thread pool add a non core thread with an empty task ( Emergency thread ) The thread pool
( In order to process the number of working threads is 0, But there are tasks in the task queue )

 public void execute(Runnable command) {
    
        // Judge not empty 
        if (command == null)
            throw new NullPointerException();
           // obtain ctl Value 
        int c = ctl.get();
        // Whether the number of worker threads is less than the number of core threads 
        if (workerCountOf(c) < corePoolSize) {
    
            //  adopt addWorker Method to add a number of core threads to execute command Mission 
            if (addWorker(command, true))
                // Adding the number of core threads succeeded , return true, Go straight back to 
                return;
            //  If it were · Concurrent , Add the thread that failed the core thread , Need to get it again ctl value 
            c = ctl.get(); 
        }
        // Failed to create the number of core threads 
        // Judge whether the current thread pool state is RUNNING
        // If it is RUNNING, perform offer Method to add a task to a work queue 
        if (isRunning(c) && workQueue.offer(command)) {
    
            // Adding task to work queue succeeded 
            // Get... Again ctl value 
            int recheck = ctl.get();
            // Judge whether the current thread pool state is RUNNING state , If not , You need to remove tasks from the work queue 
            if (! isRunning(recheck) && remove(command))
                 // Execute reject strategy ( The thread pool state is incorrect , Execute reject strategy )
                reject(command); 
            // Determine whether the number of worker threads is 0
            else if (workerCountOf(recheck) == 0)
                // The number of worker threads is 0, But there are tasks in the work queue 
                // Add an empty task , Emergency thread , Processing tasks queued in the work queue 
                addWorker(null, false);
        }
        // Failed to add task to work queue , Add an emergency thread to execute the current task 
        else if (!addWorker(command, false))
            // Adding emergency thread failed , perform reject Refusal strategy 
            reject(command);
    }

1、 First, judge whether the task is empty , If you throw an exception directly NullPointerException
2、 Judge whether the number of parent core threads is used up , If there are core threads , Just put Try The task is left to the core thread , If you give it to the core thread successfully , Go straight back to

This 【 Try 】 This time is very important , Because in the case of concurrency , The number of core threads may be robbed in an instant

3、 If in the process of handing over to the core thread , The core thread is occupied by others , Then regain ctl Value , Through high 3 Three judges whether the thread pool is RUNNING state , If it is , Just Try Hand over the task to the queue
4、 If the thread pool is not RUNNING state , Or the task queue is full , Just try to give it to the emergency thread , If the attempt fails , Directly implement the decision strategy
5、 If thread pool RUNNING State and Try The task was handed over to the queue successfully , Will get... Again ctl Value , Again, judge whether the state of the thread pool is RUNNING, If not, try to remove the previously added task , Execute the rejection policy after success

6、 If not RUNNING, Or is it RUNNING But the attempt to remove the task failed , Just judge whether there are working threads in the thread pool , without , Create an emergency thread of empty tasks to empty the tasks in the task queue

6、 ... and 、 addWorker Method

 Add worker threads 
 private boolean addWorker(Runnable firstTask, boolean core) {
    
       // Judgment of thread pool state , And the number of working threads 
       // Outer layer for Identification of the loop 
        retry:
        for (;;) {
    
            // obtain ctl Value 
            int c = ctl.get();
           //  Get the status of the thread pool 
            int rs = runStateOf(c);
            // If the state of the thread pool is not RUNNING  and   Not satisfied at the same time 【 The state of the thread pool is SHUTDOWN And the task is empty and the queue is not empty 】 The condition of returns directly false ( Corresponding  addWorker(null, false))
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
                return false;

            for (;;) {
    
               // Gets the current number of worker threads 
                int wc = workerCountOf(c);
                // Determine whether the current number of worker threads is greater than the maximum 
                if (wc >= CAPACITY ||
                    // If it's a core thread , Is it greater than corePoolSize , If it is an emergency thread , Is it greater than maximumPoolSize
                    wc >= (core ? corePoolSize : maximumPoolSize))
                   //  The current worker thread has reached the maximum 
                    return false;
                 // With CAS The way , Number of worker threads +1, If it works , Jump directly out of the outer layer for loop  
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // Recapture ctl value  
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) // Based on newly acquired ctl Get the thread pool status , Judgment and previous rs Whether the status is consistent 
                    continue retry; //  It indicates that concurrent operations cause changes in the state of the thread pool , Need to re judge the status 
 
            }
        }


        // Add a worker thread and start the worker thread 
        // Whether the worker thread starts 
        boolean workerStarted = false;
        // Whether the worker thread is added successfully 
        boolean workerAdded = false;
        //Worker  It's the worker thread 
        Worker w = null;
        try {
    
            // new Worker Build worker threads , Throw the task to Worker in 
            w = new Worker(firstTask);
            // Get Worker Bound in Thread The thread of 
            final Thread t = w.thread;
            if (t != null) {
    // Definitely not null , Robustness judgment 
                // Lock ....( Concurrent , Others may perform shutDown)
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
    
                    // Based on the retrieved ctl, Get the status of the thread pool 
                    int rs = runStateOf(ctl.get());
                    // If the line path pool status is RUNNING, Just add a worker thread 
                    if (rs < SHUTDOWN ||
                         // If the status of the thread pool is SHUTDOWN And the incoming task is null( Corresponding  addWorker(null, false))
                        (rs == SHUTDOWN && firstTask == null)) {
    
                        // Add worker threads 
                        // Determine whether the current thread is in run state ( Robustness judgment )
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                            // What will be constructed Worker Object added to workers(HashSet) in 
                        workers.add(w);
                        // Get the number of worker threads 
                        int s = workers.size();
                        // If the current number of worker threads , Greater than the maximum number of worker threads in history , Just copy it back to largestPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                            // Set the identity added by the worker thread to true
                        workerAdded = true;
                    }
                } finally {
    
                   //  Release the lock 
                    mainLock.unlock();
                }
                if (workerAdded) {
    
                    // Adding work task succeeded , Start thread 
                    t.start();
                    // Set the identity of the worker thread startup to true
                    workerStarted = true;
                }
            }
        } finally {
    
           // If the worker thread fails to start ( The state of the thread pool may have changed at startup )
            if (! workerStarted)
                 // remove workers Worker threads in , Number of worker threads -1, Try changing the thread pool state to TIDYING
                addWorkerFailed(w);
        }
        return workerStarted;
    }
// After starting the thread fails , Remedial actions done 
private void addWorkerFailed(Worker w) {
    
       // Lock 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
    
           // Determine whether the previously created worker thread is successful 
            if (w != null)
                // If it works , take workers The current worker thread in is removed 
                workers.remove(w);
                // Number of project threads -1
            decrementWorkerCount();
            // Try changing the thread pool state to TIDYING
            tryTerminate();
        } finally {
    
            mainLock.unlock();
        }
    }

7、 ... and 、 Worker class ( Worker thread class ) Simple understanding of

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
    
      
        private static final long serialVersionUID = 6138294804551838833L;
        // Working thread Thread object , Built during initialization 
        final Thread thread;
        // Tasks to be performed 
        Runnable firstTask;

        volatile long completedTasks;
         
         // The worker thread just initialized is not allowed to be interrupted 
        Worker(Runnable firstTask) {
    
            setState(-1); // inhibit interrupts until runWorker
            // for the first time new The task will be assigned to firstTask 
            this.firstTask = firstTask;
            // to Worker structure Thread object 
            this.thread = getThreadFactory().newThread(this);
        }
       // call t.start(), perform run Method 
        public void run() {
    
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
    
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
    
            if (compareAndSetState(0, 1)) {
    
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
    
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        {
     acquire(1); }
        public boolean tryLock()  {
     return tryAcquire(1); }
        public void unlock()      {
     release(1); }
        public boolean isLocked() {
     return isHeldExclusively(); }

        void interruptIfStarted() {
    
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    
                try {
    
                    t.interrupt();
                } catch (SecurityException ignore) {
    
                }
            }
        }
    }

1、Worker Class inheritance AbstractQueuedSynchronizer And realize Runnable
2、 Why? Worker Realize it by yourself AQS lock ?
Reentrant is not allowed , If another thread executes shutdown Words , The worker thread will not be interrupted , But if it is stop Forced interruption

Interrupting a thread does not stop the thread immediately , It's going to be thread The interrupt ID of is set to true

The following source code can also be seen Worker Reentry lock is not supported

public void lock()        {
     acquire(1); }

 public final void acquire(int arg) {
    
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


        protected boolean tryAcquire(int unused) {
    
            if (compareAndSetState(0, 1)) {
    
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

8、 ... and 、 runWorker Method

execute()—>addWorker()——>t.run()——>run() (Worker Medium run Method )

public void run() {
    
            runWorker(this);
        }

The process of performing tasks , And do the interrupt thread related lock operation

 final void runWorker(Worker w) {
    
        // Get the current thread 
        Thread wt = Thread.currentThread();
        // obtain Worker The task 
        Runnable task = w.firstTask;
        // hold Worker Set the task in to null
        w.firstTask = null;
        //unlock: hold Worker in state Set as 0, At the same time ExclusiveOwnerThread Set as null(state from -1 Turn into 0, Representatives can be interrupted )
        w.unlock(); // allow interrupts
        // When performing tasks , Check whether there is an abnormal mark in the sub function 
        boolean completedAbruptly = true;
        try {
    
            // The first way to get the task , Is to perform execute、submit when , Incoming tasks are processed directly 
            // The second way to get tasks , Get task execution from the work queue 
            while (task != null || (task = getTask()) != null) {
    
                // Lock , stay SHUTDOWN State, , The current thread is not allowed to be interrupted 
                // stay Worker Internally implemented lock , It is not a reentrant lock , Because when interrupted , You need to be right worker Conduct lock, If you can't get it, it means that the current worker thread is executing a task 
                w.lock();
                // If the thread pool state changes to STOP state , The current thread must be interrupted 
                // First state , Determine whether the current thread is STOP state 
                // The second judgment : Check the interrupt flag bit , Merge and return , If false, The explanation is not STOP, If it becomes true, You need to check again whether the thread pool status is caused by concurrent operations STOP
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted()&&runStateAtLeast(ctl.get(), STOP)))
                // Check whether the current thread interrupt flag is false, If false, Is executed wt.interrupt()
                  &&!wt.isInterrupted())
                  // Set the interrupt flag to true
                    wt.interrupt();
                try {
    
                   // Hook function for executing tasks , Lead to enhance , Not a dynamic proxy ( In fact, it is an empty method , Programmers can rewrite )
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
    
                       // Perform tasks 
                        task.run();
                    } catch (RuntimeException x) {
    
                        thrown = x; throw x;
                    } catch (Error x) {
    
                        thrown = x; throw x;
                    } catch (Throwable x) {
    
                        thrown = x; throw new Error(x);
                    } finally {
    
                     // Hook function for executing tasks , The rear enhancement , Not a dynamic proxy ( In fact, it is an empty method , Programmers can rewrite )
                        afterExecute(task, thrown);
                    }
                } finally {
    
                   // take task Set as null
                    task = null;
                    // Perform a successful mission +1
                    w.completedTasks++;
                    // take state The mark is set to 0
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
    
            processWorkerExit(w, completedAbruptly);
        }
    }

Nine 、getTask Method

How to work from a queue ,workQueue Get task from

From work i Get task in queue

 private Runnable getTask() {
    
       // identification ( Non core threads can kill )
        boolean timedOut = false; // Did the last poll() time out?
         // Dead cycle 
        for (;;) {
    
             // obtain ctl value 
            int c = ctl.get();
            // Get the status of the thread pool 
            int rs = runStateOf(c);
      // ============================= Determine thread pool state =======================
            // Check if queue empty only if necessary.
            // If entering if, You need to kill the current worker thread 
            // Thread pool state enters SHUTDOWN、STOP
            // If the thread pool status is greater than STOP, You need to remove the current worker thread 
            // If the thread pool state is SHUTDOWN, And the work queue is empty , You need to remove the current worker thread 
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    
                 //  Remove the current worker thread 
                decrementWorkerCount();
                // return null, hand processWorkerExit Remove the current worker thread 
                return null;
            }
          // ============================= Determine the number of worker threads =======================
            // Get the number of worker threads 
            int wc = workerCountOf(c);
             //allowCoreThreadTimeOut : Allow core thread timeout ( It's usually false)
             // Whether the worker thread is larger than the core thread 
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // Whether the worker thread is larger than the maximum thread 
            // The number of worker threads is greater than the number of core threads , And the current thread has timed out 
            // Try to kill the current thread 
            if ((wc > maximumPoolSize || (timed && timedOut))
            // If the number of worker threads is greater than 1, Or the work queue is empty 
            // If the work queue is empty , I'll kill myself 
            // If the number of worker threads is greater than 1, I'll kill myself 
                && (wc > 1 || workQueue.isEmpty())) {
    
                // be based on CAS Operation removes the current thread , Only one thread will CAS success 
                if (compareAndDecrementWorkerCount(c))
                 // return null, hand processWorkerExit Remove the current worker thread 
                    return null;
                continue;
            }
    // ============================= Get the task from the work queue =======================
            try {
    
                Runnable r = timed ?
                // Block getting tasks from the work queue for a certain time ( It can be understood as non core taking this method )
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // Keep blocking ,( It can be understood as taking this method as the core )
                    workQueue.take();
                if (r != null)
                // If you get the task , Go straight back 
                    return r;
                    // Getting tasks from the queue timed out ( The maximum lifetime of the current worker thread has been reached )
                timedOut = true;
             
            } catch (InterruptedException retry) {
    
                timedOut = false;
            }
        }
    }

Ten 、processWorkerExit Method

     // Remove the operation of the current worker thread 
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    
    // If executed processWorkerExit Method operation does not sleep getTask The operation , It is directly caused by the exception ( Generally, hook )
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        // Because the execution method is illegal , Manually deduct the number of worker threads 
            decrementWorkerCount();
          // Lock 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
    
        // Record the total number of tasks processed by the current thread pool 
            completedTaskCount += w.completedTasks;
            // Remove worker threads 
            workers.remove(w);
        } finally {
    
            mainLock.unlock();
        }
        // Try to link the line to the city ( Excessive state —— Destroy State )
        tryTerminate();
       // Recapture ctl
        int c = ctl.get();
        // Current thread pool state , When it comes to this , That is the SHUTDOWN、RUNNING
        if (runStateLessThan(c, STOP)) {
    
        // If the current worker thread is removed in normal state 
            if (!completedAbruptly) {
    
            // Minimum number of core threads , How much is allowed 
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // If the task in the work queue is not empty , Set the minimum value of the worker thread 
                if (min == 0 && ! workQueue.isEmpty())
                
                    min = 1;
                    // There are also worker threads in the thread pool 
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // It indicates that the worker thread is not removed in the normal way , Add another worker thread 
            // Thread pool work queue is not empty , And there are no worker threads , Add another worker thread 
            addWorker(null, false);
        }
    }

Thread pool common questions :
1、 Thread pool 7 Parameters ?
2、 Execution flow of thread pool ?
3、 The decision strategy of thread pool ?
4、 The core attribute of thread pool ctl What is it? ?
5、 What is the state of the thread pool and how it changes ?
6、 Thread pool execute and submit The difference between
7、 How are worker threads expressed in the thread pool
8、 Where are worker threads stored

原网站

版权声明
本文为[Notes sharing and perception of programming Xiaobai]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/204/202207231635383197.html