当前位置:网站首页>Abstractqueuedsynchronizer (AQS) source code detailed analysis - condition queue process analysis
Abstractqueuedsynchronizer (AQS) source code detailed analysis - condition queue process analysis
2022-06-21 08:33:00 【*Wucongcong*】
AQS Source code exploration Condition Condition queue ( Write an introductory BrokingQueue)
1、Condition Queue introduction
- AQS There's another very important inner class in ConditionObject, It has achieved Condition Interface , The main user implements conditional locking .
- ConditionObject A queue is also maintained in , This queue is mainly used for waiting conditions , When conditions hold , Other threads will signal The elements in this queue , Move it to AQS Of the queue , Wait for the thread holding the lock to release the lock and wake up .
- Condition A typical application is in BrokingQueue In the implementation of , When the queue is empty , The thread to get the element is blocked in notEmpty Conditionally , Once an element is added to the queue , Will inform notEmpty Conditions , Move elements in its queue to AQS Waiting in the queue to be awakened .
2、 Write an introductory BrokingQueue
2.1、 Customize BrokingQueue Interface
/** * @author wcc * @date 2022/2/12 19:23 */
public interface BlockingQueue<T> {
/** * Interface for inserting data */
void put(T element);
/** * Interface to get data */
T take();
}
2.2、 Customize MiniArrayBrokingQueue class
/** * @author wcc * @date 2022/2/12 19:24 */
public class MiniArrayBrokingQueue implements BlockingQueue{
// Thread concurrency control
private Lock lock = new ReentrantLock();
/** * When the producer thread produces data , It will first check the current queues Is it full , If it's already full , The current producer thread needs to be called notFull.await() * Enter into notFull Is pending in the condition queue for , Wait for the consumer thread to consume a data */
private Condition notFull = lock.newCondition();
/** * When the consumer thread consumes data , It will first check the current queues Whether there is data in , If there is no data . The current consumer thread needs to be called notEmpty.await() * Enter into notEmpty Condition queue suspended , Wake up while waiting for the producer thread to produce data */
private Condition notEmpty = lock.newCondition();
// Array of storage elements
private Object[] queues;
// The length of the array
private int size;
/** * count: Indicates the amount of data that can be consumed in the current queue * putptr: Record the next location where the producer will store the data , After each producer has produced one piece of data , Will putptr++ * takeptr: Record the next location of consumer consumption data , After each consumer produces one data , Will takeptr++ */
private int count, putptr, takeptr;
public MiniArrayBrokingQueue(int size) {
this.size = size;
this.queues = new Object[size];
}
@Override
public void put(Object element) {
lock.lock();
try {
// The first thing ? We need to judge the current queues Is it full
if(count == this.size){
notFull.await();
// So let's go over here , Indicates that the queue is not full , You can put data in the queue
this.queues[putptr ++] = element;
// Production data is updated after count Value
count ++;
if(putptr == this.size){
putptr = 0;
}
// When an element is successfully placed in the queue , What needs to be done ?
// Need to give notEmpty A wake-up signal to wake up a consumer thread
notEmpty.signal();
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
@Override
public Object take() {
Object element;
lock.lock();
try {
// The first thing ? We need to judge the current queues Is it empty
if (count == 0) {
notEmpty.await();
// So let's go over here , Indicates that the queue is not full , You can put data in the queue
element = this.queues[takeptr++];
// Take out the data and update it count Value
count--;
if (takeptr == this.size) {
takeptr = 0;
}
// When an element is successfully consumed in the queue , What needs to be done ?
// Need to give notFull A wake-up signal to wake up a producer thread
notFull.signal();
return element;
}
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public static void main(String[] args) {
MiniArrayBrokingQueue<Integer> queue = new MiniArrayBrokingQueue(10);
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (true){
i ++;
if(i == 10){
i = 0;
}
try {
System.out.println(" The production data :"+ i);
queue.put(Integer.valueOf(i));
TimeUnit.MILLISECONDS.sleep(200);
}catch (Exception e){
e.printStackTrace();
}
}
}
});
producer.start();
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
Integer result = queue.take();
System.out.println(" Consumer consumption data :"+ result);
TimeUnit.MILLISECONDS.sleep(200);
}catch (Exception e){
e.printStackTrace();
}
}
}
});
consumer.start();
}
}
The operation results are as follows :

3、AQS Condition Conditional queue source code analysis
3.1、ConditionObject Inner class
Here's a brief introduction AQS Medium Node About in the inner class Condition Two properties of the conditional queue :
// The next node waiting on the condition (Condition Use when locking ), Be careful , A conditional queue is a one-way linked list
Node nextWaiter;
// node state : Optional value (0、SIGNAL(-1)、CANCELED(1)、CONDITION、PROPAGATE)
// waitStatus == 0 Default state
// waitStatus > 0: Cancel the state ( stay ReentrantLock In mode )
// waitStatus == -1: At present node If it is head Node time , After releasing the lock, you need to wake up the subsequent nodes
// waitStatus == -2: At present node It's located in Condition Nodes on the condition queue
volatile int waitStatus;
The picture above is about AQS Flow analysis of blocking queue :

// be located ReentrantLock Class : How to create a conditional queue
public Condition newCondition() {
return sync.newCondition();
}
// be located RenntrantLock The static inner class of Sync in
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
// and ConditionObject yes AQS Inner class in
return new ConditionObject();
}
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// Point to the header node in the condition queue
private transient Node firstWaiter;
// Point to the tail node in the condition queue
private transient Node lastWaiter;
....
}

3.2、await() Method resolution
public final void await() throws InterruptedException {
// Determine whether the current thread is in interrupt state , If yes, an interrupt exception will be thrown directly
if (Thread.interrupted())
throw new InterruptedException();
// Will call await() The thread of the method is wrapped as node And add to the condition queue , And returns... Of the current thread node
Node node = addConditionWaiter();
// Completely release the lock corresponding to the current thread , take state Set as 0
// Why release the lock ? After hanging with a lock , Who can wake you up ? therefore await Then release the lock
long savedState = fullyRelease(node);
// 0: stay Condition No interrupt signal was received during queue suspension
// -1: stay Condition An interrupt signal was received while the queue was suspended
// 1: stay Condition Queue pending no interrupt signal received during queue pending , But after migrating to the blocking queue, I received an interrupt signal
int interruptMode = 0;
// isOnSyncQueue return true: Indicates the corresponding... Of the current thread node Has migrated to the blocking queue
// return false: Indicates that the current thread is still in the condition queue , Need to continue park
while (!isOnSyncQueue(node)) {
// The current... In the suspend condition queue node The corresponding thread
LockSupport.park(this);
// When will it wake up ?
// 1. Regular path , External thread gets lock after , Called signal() Method , Transfer the head node of the conditional queue to the blocking queue , After this node obtains the lock , Will wake up
// 2. After moving to the blocking queue , It is found that the status of the precursor node in the blocking queue is cancelled , The current node will wake up
// 3. The current node was awakened by an external thread using an interrupt while it was suspended ...
// checkInterruptWhileWaiting(node): Even in condition Queue pending , The thread was interrupted , Corresponding node It will also be migrated to the blocking queue
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// So let's go over here , It means that at present node Has migrated to the blocking queue
// acquireQueued(node, savedState): The logic of the contention queue If you return true: Indicates that it was awakened by an interrupt while suspended in the blocking queue
// Conditions for a : return true: Indicates that it was awakened by an external thread interrupt in the contention queue
// Condition 2 :interruptMode != THROW_IE(-1) establish : Show the current node No interrupt has occurred in the condition queue
// Set up interruptMode Set to re interrupt , Indicates that the interrupt is in the blocking queue
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// Consider node.nextWaiter != null When was it established :
// It's actually node In the condition queue, if it is awakened by an external thread interrupt , Will be added to the blocking queue , But it is not set nextWaiter = null
if (node.nextWaiter != null) // clean up if cancelled
// Clean up the current node in the cancellation status in the condition queue
unlinkCancelledWaiters();
// Conditions established : Indicates that an interrupt occurred during the suspend (1. Pending in condition queue ,2. Pending outside the condition queue )
if (interruptMode != 0)
// Different processing is performed according to the location of the interrupt
reportInterruptAfterWait(interruptMode);
}
3.3、addConditionWaiter() Method : Wrap the current thread as node Node join to Condition In line
/** * Adds a new waiter to wait queue. * @return its new wait node * call await The threads of the method are all threads in the locked state , in other words addConditionWaiter() There is no concurrency */
private Node addConditionWaiter() {
// Get the reference of the tail node of the current condition queue , Save to condition variable t in
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// Conditions for a :t != null establish : Description there are already in the current queue node Element , You need to add
// Condition 2 :t.waitStatus != Node.CONDITION Be careful ,node stay Condition In the queue , Its state is CONDITION(-2)
// establish : Show the current node There was an interruption
if (t != null && t.waitStatus != Node.CONDITION) {
// Clean up all nodes in the condition queue that are in the cancelled state
unlinkCancelledWaiters();
// Update local variables t For the latest queue end node , Because above unlinkCancelledWaiters() May change lastWaiters quote
t = lastWaiter;
}
// Create for current thread node node , Set state to CONDITION state (-2)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// Conditions established : Indicates that there are no elements in the condition queue , The current thread is the first element to enter the queue
if (t == null)
firstWaiter = node;
else
// This indicates that the current condition queue already has other node 了 , Do additional operations
t.nextWaiter = node;
// to update lastWaiters Queue end node for the latest condition
lastWaiter = node;
// Returns the value of the current thread node
return node;
}
3.4、fullyRelease(Node node) Method : Completely release the lock occupied by the current thread
// Completely release the lock occupied by the current thread
final long fullyRelease(Node node) {
// Whether the lock is successfully released , When failed When you fail , This indicates that the current thread is calling without a lock await() Method thread ( Wrong writing )
// Suppose it fails , stay finally Block of code , The corresponding state of the current thread just added to the condition queue will be changed to the cancel state
// The subsequent thread will clean up the node in the cancelled state
boolean failed = true;
try {
// Get the... Held by the current thread state value
long savedState = getState();
// In most cases :release Method returns true
if (release(savedState)) {
// The failure flag is set to false
failed = false;
// Returns the... Released by the current thread state value
// Why return to saveState value ?
// Because you wake up again when you are moved to the blocking queue , And the current node In the blocking queue is head.next and
// At present lock Status is state == 0 Under the circumstances , At present node You can get the lock , At this time, we need to state Set to savedState( The state of holding a lock )
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// be located AQS The static inner class of Sync in : How to actually release the lock
//ReentrantLock.unlock() -> sync.release()
public final boolean release(int arg) {
//tryRelease Try to release the lock
//true: The current thread has completely released the lock
//false: The current thread has not fully released the lock
if (tryRelease(arg)) {
//head Under what circumstances will it be created :
// When the lock thread does not release the thread , And when another thread wants to acquire the lock during the lock holding period , Other threads found that they could not acquire the lock
// And the blocking queue is empty , At this point, subsequent threads will build a for the current lock thread head node ( Encapsulate the locking thread into head)
// Subsequent threads are then appended to head After the node ( Become head My back drive )
Node h = head;
// Conditions 1:h!=null establish : Describe the... In the queue head The node has been initialized ,ReentrantLock During use , Too many threads compete
// Conditions 2:h.waitStatus!=0 establish , explain head It must have been inserted in the back head node
if (h != null && h.waitStatus != 0)
// Wake up the rear drive node
unparkSuccessor(h);
return true;
}
return false;
}
// be located AQS The static inner class of Sync in : Try to release the lock
//true: The current thread has completely released the lock | false: The current thread has not fully released the lock
protected final boolean tryRelease(int releases) {
//state The value of the state variable is subtracted
int c = getState() - releases;
// If the condition holds : Indicates that the current thread is not locked -> Throw an exception directly
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// The current thread holds the lock
// Whether the current thread has completely released the lock , The default initial value is false
boolean free = false;
//c==0 When conditions hold : This indicates that the current thread has completely released the lock
if (c == 0) {
//free=true The current thread has completely released the lock
free = true;
// Update the current locking thread to null
setExclusiveOwnerThread(null);
}
// to update state ( be based on CAS)
setState(c);
// return free
return free;
}
3.5、isOnSyncQueue(Node node) Method : Judge the present Condition queue Medium node Whether the node is in the blocking queue
// Judge the present Condition queue Medium node Whether the node is in the blocking queue
final boolean isOnSyncQueue(Node node) {
// Conditions for a :node.waitStatus == Node.CONDITION establish : Show the current node It must be in the condition queue , because signal Method to migrate the node to the blocking queue , Will node Is set to 0
// Condition 2 :node.prev == null precondition : At present node.waitStatus != Node.CONDITION ==> 1.node.waitStatus = 0( Indicates that the current node has been signal 了 )
// 2.node.waitStatus = 1: The current thread is calling without a lock await Methodical , It will eventually be node Change to cancel status CANCELED
// node.waitStatus == 0: Why judge node.prev == null
// because signal The method is to modify the state first and then migrate
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// So let's go over here , What kind of situation ?
// node.waitStatus != CONDITION And node.prev != null ==> You can get rid of node.waitStatus == 1
// Why can cancel status be excluded ? because signal Method will not cancel the status of node Nodes are migrated
// Set up prev The referenced logic is set by the logic of the migration blocking queue (enq)
// The logic of joining the team :1. Set up node.prev = tail 2.CAS At present node For blocking queues Of tail Tail node , Only when you succeed can you really enter the blocking queue 3.pred.next = node
// It can be calculated that prev != null, Nor can it explain the current node It has been successfully queued to the blocking queue
// Conditions established : This indicates that the current node has been successfully queued to the blocking queue , And there are other nodes behind the current node node 了 ..( Non tail node )
if (node.next != null) // If has successor, it must be on queue
return true;
/** * So let's go over here , Indicates that the status of the current node is node.prev != null And node.waitStatus != CONDITION And it can rule out CANCELED Cancel , And node.waitStatus == 0 * findNodeFromTail Start from the tail node of the blocking queue and traverse forward to find node, If found, return true, Can't find return false * At present node It is possible that signal In the process , Moving .. Has yet to be completed .. */
return findNodeFromTail(node);
}
3.6、checkInterruptWhileWaiting(Node node) Method : Determine whether the current thread is interrupted and determine where it is interrupted
private int checkInterruptWhileWaiting(Node node) {
// Thread.interrupted(): Returns the interrupt flag bit of the current thread , And reset the current flag bit to false
return Thread.interrupted() ?
// transferAfterCancelledWait(node): This method is called only when the thread is awakened by an interrupt
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// Judge whether the current thread node is interrupted in the condition queue or migrated to the blocking queue
final boolean transferAfterCancelledWait(Node node) {
// compareAndSetWaitStatus(node, Node.CONDITION, 0): Conditions established : Show the current node It must be in the condition queue , because signal When migrating nodes, the node status will be changed to 0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// Interrupt wake-up node It will also be added to the blocking queue
enq(node);
// return true It means that it is interrupted in the condition queue
return true;
}
// So far, there are several situations ?
// 1. At present node Has been called by an external thread signal Method to migrate it to the blocking queue
// 2. At present node Being called by an external thread signal Method to migrate it to the blocking queue, but the migration has not been completed yet
while (!isOnSyncQueue(node))
Thread.yield();
// false: Indicates that the current node is not in the condition queue when it is awakened by an interrupt
return false;
}
3.7、unlinkCancelledWaiters() Method : Clean up the nodes in the condition queue with the status cancelled
// Clean up the nodes in the condition queue with the status cancelled
private void unlinkCancelledWaiters() {
// Represents the current node , Start backward iteration from the first node of the linked list
Node t = firstWaiter;
// The last normal state in the current linked list node node
Node trail = null;
while (t != null) {
// Get the next node of the current node
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
// to update nextWaiter by null
t.nextWaiter = null;
// Conditions established : It indicates that the traversed node has not encountered a normal node
if (trail == null)
// to update firstWaiter The pointer is the next node
firstWaiter = next;
else
// Let the previous normal node point to the next node of the cancelled node , The node with problems in the middle is skipped
trail.nextWaiter = next;
// Conditions established : The current node is the end of the queue node , to update lastWaiter Point to the last normal node
if (next == null)
lastWaiter = trail;
}
else // If the condition is not satisfied, it will be implemented until else, This indicates that the current node is a normal node
trail = t;
t = next;
}
}
3.8、reportInterruptAfterWait(int interruptMode): Determine the subsequent operation according to the location of the interrupt
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// Conditions established : Indicates that an interrupt has occurred in the condition queue , here await() Method throws an interrupt exception
if (interruptMode == THROW_IE)
throw new InterruptedException();
// Conditions established : Describes interrupts that occur outside the condition queue , At this point, set the interrupt flag bit of the current thread to true, Interrupt processing is left to your business layer code , If you don't handle it , Then nothing will happen
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
3.9、signal() Method : Wake up the thread in the condition queue
public final void signal() {
// Determines the current call signal Whether the thread of the method is an exclusive lock holding thread , If not , Throws an exception directly
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// Get the first node in the condition queue node
Node first = firstWaiter;
// The first node in the condition queue is not null, Then migrate the first node to the logic in the blocking queue
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// firstWaiter = first.nextWaiter: Because the current firstWaiter The conditional queue is coming soon
// So update firstWaiter For the next node of the current node
// If the next node of the current node is null, This indicates that the current condition queue has only one node , Then the current firstWaiter After leaving the team , for null 了
// So we need to update lastWaiter Also for the null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// At present first Node out of condition queue , Disconnect from the next node
first.nextWaiter = null;
// transferForSignal(first)
// boolean:true At present first Node migrated to blocking queue successfully ,false: Migration failed
// while loop :(first = firstWaiter) != null At present first Migration failed , Will first Updated to first.next, Continue trying to migrate ...
// Until a node is successfully migrated , Or the condition queue is empty
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
3.10、transferForSignal(Node node) Method : Migrate... In the condition queue firstWaiter Node to blocking queue
// Migrate... In the condition queue firstWaiter Node to blocking queue
final boolean transferForSignal(Node node) {
/* * If cannot change waitStatus, the node has been cancelled. */
// CAS Modify the status of the current node to 0, Because the current node is about to migrate to the blocking queue
// success : Indicates that the current node is in normal status in the condition queue
// Failure :1. Cancel the state ( Threads await The lock is not held at the time of , Corresponding to the final thread node Will be set to CANCLED state )
// 2:node The corresponding thread is suspended , Was awakened by another thread using an interrupt signal ( It will also actively enter the blocking queue , The status will also be changed to 0)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// enq(node): Returns the predecessor node of the current node , And add the current node to the blocking queue p Is the front node of the current node in the blocking queue
Node p = enq(node);
// Get the state of the front node
int ws = p.waitStatus;
// Conditions for a :ws > 0 establish : This indicates that the status of the precursor node is cancelled in the blocking queue , Wake up the current node
// Condition 2 : precondition (ws <= 0):compareAndSetWaitStatus(p, ws, Node.SIGNAL)
// return true: Indicates that the status of the precursor node is set to signal Status success ,
// return false: Current drive node The corresponding thread calls lockInterrupt The ones on the team node When , Will respond to interrupts , When an external thread gives an interrupt signal to a precursor thread , Forerunner node Will be changed to CANCELED
// And execute out of line logic
// As long as the status of the precursor node is not 0 perhaps -1, Then wake up the current thread
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// Wake up the current node The corresponding thread , After the logic , Talk back
LockSupport.unpark(node.thread);
return true;
}
e p = enq(node);
// Get the state of the front node
int ws = p.waitStatus;
// Conditions for a :ws > 0 establish : This indicates that the status of the precursor node is cancelled in the blocking queue , Wake up the current node
// Condition 2 : precondition (ws <= 0):compareAndSetWaitStatus(p, ws, Node.SIGNAL)
// return true: Indicates that the status of the precursor node is set to signal Status success ,
// return false: Current drive node The corresponding thread calls lockInterrupt The ones on the team node When , Will respond to interrupts , When an external thread gives an interrupt signal to a precursor thread , Forerunner node Will be changed to CANCELED
// And execute out of line logic
// As long as the status of the precursor node is not 0 perhaps -1, Then wake up the current thread
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// Wake up the current node The corresponding thread , After the logic , Talk back
LockSupport.unpark(node.thread);
return true;
}
边栏推荐
- MySql 过滤查询(以字母开头,以数字开头,非数字开头,非字母开头)
- JUnit5单元测试
- 测试入门——软件测试模型
- Eureka's timedsupersortask class (periodic task with automatic interval adjustment)
- Antd table how scroll bars appear in long tables
- Kotlin---- detailed explanation of data types
- 【VS】【使用问题】【解决方案】VS2010打开一直停留在启动界面
- TiDB、mysql修改系统变量/常用语句(杀死process中的进程)
- Global and Chinese market for online automatic optical inspection 2022-2028: Research Report on technology, participants, trends, market size and share
- STL tutorial 3- type conversion static_ cast、dynamic_ cast、const_ cast、reinterpret_ Cast method
猜你喜欢

Showctf starter file contains series

CTF show WEB10

Eureka's timedsupersortask class (periodic task with automatic interval adjustment)

Visual studio code annotation plug-in: korofileheader

This article takes you to interpret the advertising advantages of tiktok

STL tutorial 3- type conversion static_ cast、dynamic_ cast、const_ cast、reinterpret_ Cast method

Use lua+redis+openresty to realize concurrent optimization of e-commerce Homepage

Gql+nodejs+mysql database

2022-2028 global after sales spark plug industry research and trend analysis report

移动应用开发总结
随机推荐
Unity 5 自帶的Mono也可以支持C# 6
2022-2028 global hydrogen internal combustion engine industry research and trend analysis report
Linux环境下MySQL的安装过程
Kotlin---- detailed explanation of data types
Summary of problems and errors encountered in tidb4.0.0 (tiup deployment)
Post-Process初级使用笔记(重要的几项)
Give two strings S and T, and judge whether T is the word formed after rearrangement of S
Improve code checking with annotations
Tidb and MySQL modify system variables / common statements (kill the process in process)
怎么搭建深度学习框架?
C # implement callback
Global and Chinese market of Toro from 2022 to 2028: Research Report on technology, participants, trends, market size and share
函数声明和函数表达式的区别
Unity开发相关的博客收集
Global and Chinese market for military imaging systems 2022-2028: Research Report on technology, participants, trends, market size and share
Gql+nodejs+mysql database
4.4 Eval function replaces function
Base de données de sauvegarde DumpLiNg
Three ways to solve cross domain problems
2022-2028 global cooling on-off valve industry research and trend analysis report