当前位置:网站首页>Abstractqueuedsynchronizer (AQS) source code detailed analysis - condition queue process analysis

Abstractqueuedsynchronizer (AQS) source code detailed analysis - condition queue process analysis

2022-06-21 08:33:00 *Wucongcong*

AQS Source code exploration Condition Condition queue ( Write an introductory BrokingQueue)

1、Condition Queue introduction

  • AQS There's another very important inner class in ConditionObject, It has achieved Condition Interface , The main user implements conditional locking .
  • ConditionObject A queue is also maintained in , This queue is mainly used for waiting conditions , When conditions hold , Other threads will signal The elements in this queue , Move it to AQS Of the queue , Wait for the thread holding the lock to release the lock and wake up .
  • Condition A typical application is in BrokingQueue In the implementation of , When the queue is empty , The thread to get the element is blocked in notEmpty Conditionally , Once an element is added to the queue , Will inform notEmpty Conditions , Move elements in its queue to AQS Waiting in the queue to be awakened .

2、 Write an introductory BrokingQueue

2.1、 Customize BrokingQueue Interface

/** * @author wcc * @date 2022/2/12 19:23 */
public interface BlockingQueue<T> {
    
  /** *  Interface for inserting data  */
  void put(T element);

  /** *  Interface to get data  */

  T take();
}

2.2、 Customize MiniArrayBrokingQueue class

/** * @author wcc * @date 2022/2/12 19:24 */
public class MiniArrayBrokingQueue implements BlockingQueue{
    

  //  Thread concurrency control 
  private Lock lock = new ReentrantLock();

  /** *  When the producer thread produces data , It will first check the current queues Is it full , If it's already full , The current producer thread needs to be called notFull.await() *  Enter into notFull  Is pending in the condition queue for , Wait for the consumer thread to consume a data  */
  private Condition notFull = lock.newCondition();

  /** *  When the consumer thread consumes data , It will first check the current queues Whether there is data in , If there is no data . The current consumer thread needs to be called notEmpty.await() *  Enter into notEmpty Condition queue suspended , Wake up while waiting for the producer thread to produce data  */
  private Condition notEmpty = lock.newCondition();

  //  Array of storage elements 
  private Object[] queues;

  //  The length of the array 
  private int size;

  /** * count: Indicates the amount of data that can be consumed in the current queue  * putptr: Record the next location where the producer will store the data , After each producer has produced one piece of data , Will  putptr++ * takeptr: Record the next location of consumer consumption data , After each consumer produces one data , Will  takeptr++ */
  private int count, putptr, takeptr;

  public MiniArrayBrokingQueue(int size) {
    
    this.size = size;
    this.queues = new Object[size];
  }

  @Override
  public void put(Object element) {
    
    lock.lock();
    try {
    
      //  The first thing ? We need to judge the current queues Is it full 
      if(count == this.size){
    
        notFull.await();
        //  So let's go over here , Indicates that the queue is not full , You can put data in the queue 
        this.queues[putptr ++] = element;
        //  Production data is updated after count Value 
        count ++;
        if(putptr == this.size){
    
          putptr = 0;
        }
        //  When an element is successfully placed in the queue , What needs to be done ?
        //  Need to give notEmpty A wake-up signal to wake up a consumer thread 
        notEmpty.signal();
      }
    }catch (Exception e){
    
      e.printStackTrace();
    }finally {
    
      lock.unlock();
    }
  }

  @Override
  public Object take() {
    
    Object element;
    lock.lock();
    try {
    
      //  The first thing ? We need to judge the current queues Is it empty 
      if (count == 0) {
    
        notEmpty.await();
        //  So let's go over here , Indicates that the queue is not full , You can put data in the queue 
        element = this.queues[takeptr++];
        //  Take out the data and update it count  Value 
        count--;
        if (takeptr == this.size) {
    
          takeptr = 0;
        }
        //  When an element is successfully consumed in the queue , What needs to be done ?
        //  Need to give notFull A wake-up signal to wake up a producer thread 
        notFull.signal();
        return element;
      }
      }catch(Exception e){
    
        e.printStackTrace();
      }finally{
    
        lock.unlock();
      }
    }

  public static void main(String[] args) {
    
    MiniArrayBrokingQueue<Integer> queue = new MiniArrayBrokingQueue(10);
    Thread producer = new Thread(new Runnable() {
    
      @Override
      public void run() {
    
        int i = 0;
        while (true){
    
          i ++;
          if(i == 10){
    
            i = 0;
          }
          try {
    
            System.out.println(" The production data :"+ i);
            queue.put(Integer.valueOf(i));
            TimeUnit.MILLISECONDS.sleep(200);
          }catch (Exception e){
    
            e.printStackTrace();
          }
        }
      }
    });
    producer.start();
    Thread consumer = new Thread(new Runnable() {
    
      @Override
      public void run() {
    
        while (true){
    
          try {
    
            Integer result = queue.take();
            System.out.println(" Consumer consumption data :"+ result);
            TimeUnit.MILLISECONDS.sleep(200);
          }catch (Exception e){
    
            e.printStackTrace();
          }
        }
      }
    });
    consumer.start();
  }

}

The operation results are as follows

H2g9Df.png

3、AQS Condition Conditional queue source code analysis

3.1、ConditionObject Inner class

Here's a brief introduction AQS Medium Node About in the inner class Condition Two properties of the conditional queue :

//  The next node waiting on the condition (Condition Use when locking ), Be careful , A conditional queue is a one-way linked list 
Node nextWaiter;
// node state : Optional value (0、SIGNAL(-1)、CANCELED(1)、CONDITION、PROPAGATE)
// waitStatus == 0  Default state 
// waitStatus > 0: Cancel the state ( stay ReentrantLock In mode )
// waitStatus == -1: At present node  If it is  head Node time , After releasing the lock, you need to wake up the subsequent nodes 
// waitStatus == -2: At present node  It's located in  Condition  Nodes on the condition queue 
volatile int waitStatus;

The picture above is about AQS Flow analysis of blocking queue

H5upl9.png

//  be located  ReentrantLock  Class : How to create a conditional queue 
public Condition newCondition() {
    
    return sync.newCondition();
}

//  be located  RenntrantLock  The static inner class of  Sync in 
abstract static class Sync extends AbstractQueuedSynchronizer {
    
	final ConditionObject newCondition() {
    
        //  and  ConditionObject  yes AQS  Inner class in 
        return new ConditionObject();
    }
}

public class ConditionObject implements Condition, java.io.Serializable {
    
    private static final long serialVersionUID = 1173984872572414699L;
    //  Point to the header node in the condition queue 
    private transient Node firstWaiter;
    //  Point to the tail node in the condition queue 
    private transient Node lastWaiter;
    ....
}

H5MjFf.png

3.2、await() Method resolution

public final void await() throws InterruptedException {
    
    //  Determine whether the current thread is in interrupt state , If yes, an interrupt exception will be thrown directly 
    if (Thread.interrupted())
        throw new InterruptedException();
    //  Will call await() The thread of the method is wrapped as node And add to the condition queue , And returns... Of the current thread node
    Node node = addConditionWaiter();
    //  Completely release the lock corresponding to the current thread , take state Set as 0
    //  Why release the lock ?  After hanging with a lock , Who can wake you up ?  therefore await Then release the lock 
    long savedState = fullyRelease(node);
    // 0: stay Condition  No interrupt signal was received during queue suspension 
    // -1: stay  Condition An interrupt signal was received while the queue was suspended 
    // 1: stay  Condition  Queue pending no interrupt signal received during queue pending , But after migrating to the blocking queue, I received an interrupt signal 
    int interruptMode = 0;

    // isOnSyncQueue return  true: Indicates the corresponding... Of the current thread  node  Has migrated to the blocking queue 
    //  return false: Indicates that the current thread is still in the condition queue , Need to continue park
    while (!isOnSyncQueue(node)) {
    
        //  The current... In the suspend condition queue node The corresponding thread 
        LockSupport.park(this);

        //  When will it wake up ?
        // 1. Regular path , External thread gets lock after , Called signal()  Method , Transfer the head node of the conditional queue to the blocking queue , After this node obtains the lock , Will wake up 
        // 2. After moving to the blocking queue , It is found that the status of the precursor node in the blocking queue is cancelled , The current node will wake up 
        // 3. The current node was awakened by an external thread using an interrupt while it was suspended ...

        // checkInterruptWhileWaiting(node): Even in  condition  Queue pending , The thread was interrupted , Corresponding  node It will also be migrated to the blocking queue 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    //  So let's go over here , It means that at present  node  Has migrated to the blocking queue 
    // acquireQueued(node, savedState): The logic of the contention queue   If you return  true: Indicates that it was awakened by an interrupt while suspended in the blocking queue 
    //  Conditions for a : return true: Indicates that it was awakened by an external thread interrupt in the contention queue 
    //  Condition 2 :interruptMode != THROW_IE(-1) establish : Show the current node No interrupt has occurred in the condition queue 
    //  Set up  interruptMode  Set to re interrupt , Indicates that the interrupt is in the blocking queue 
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;

    //  Consider  node.nextWaiter != null  When was it established :
    //  It's actually node In the condition queue, if it is awakened by an external thread interrupt , Will be added to the blocking queue , But it is not set nextWaiter = null
    if (node.nextWaiter != null) // clean up if cancelled
        //  Clean up the current node in the cancellation status in the condition queue 
        unlinkCancelledWaiters();
    //  Conditions established : Indicates that an interrupt occurred during the suspend (1. Pending in condition queue ,2. Pending outside the condition queue )
    if (interruptMode != 0)
        //  Different processing is performed according to the location of the interrupt 
        reportInterruptAfterWait(interruptMode);
}

3.3、addConditionWaiter() Method : Wrap the current thread as node Node join to Condition In line

/** * Adds a new waiter to wait queue. * @return its new wait node *  call  await  The threads of the method are all threads in the locked state , in other words  addConditionWaiter() There is no concurrency  */
private Node addConditionWaiter() {
    
    //  Get the reference of the tail node of the current condition queue , Save to condition variable  t in 
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    //  Conditions for a :t != null  establish : Description there are already in the current queue  node  Element , You need to add 
    //  Condition 2 :t.waitStatus != Node.CONDITION  Be careful ,node  stay  Condition In the queue , Its state is  CONDITION(-2)
    //  establish : Show the current node There was an interruption 
    if (t != null && t.waitStatus != Node.CONDITION) {
    
        //  Clean up all nodes in the condition queue that are in the cancelled state 
        unlinkCancelledWaiters();
        //  Update local variables t  For the latest queue end node , Because above unlinkCancelledWaiters() May change  lastWaiters quote 
        t = lastWaiter;
    }

    //  Create for current thread  node  node , Set state to  CONDITION  state (-2)
    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    //  Conditions established : Indicates that there are no elements in the condition queue , The current thread is the first element to enter the queue 
    if (t == null)
        firstWaiter = node;
    else
        //  This indicates that the current condition queue already has other  node 了 , Do additional operations 
        t.nextWaiter = node;
    //  to update  lastWaiters Queue end node for the latest condition 
    lastWaiter = node;
    //  Returns the value of the current thread node
    return node;
}

3.4、fullyRelease(Node node) Method : Completely release the lock occupied by the current thread

//  Completely release the lock occupied by the current thread 
final long fullyRelease(Node node) {
    
    //  Whether the lock is successfully released , When  failed  When you fail , This indicates that the current thread is calling without a lock  await() Method thread ( Wrong writing )
    //  Suppose it fails , stay  finally  Block of code , The corresponding state of the current thread just added to the condition queue will be changed to the cancel state 
    //  The subsequent thread will clean up the node in the cancelled state 
    boolean failed = true;
    try {
    
        //  Get the... Held by the current thread  state  value 
        long savedState = getState();
        //  In most cases :release  Method returns  true
        if (release(savedState)) {
    
            //  The failure flag is set to  false
            failed = false;
            //  Returns the... Released by the current thread  state  value 
            //  Why return to  saveState value ?
            //  Because you wake up again when you are moved to the blocking queue , And the current node In the blocking queue is head.next  and 
            //  At present  lock  Status is  state == 0 Under the circumstances , At present  node  You can get the lock , At this time, we need to state  Set to  savedState( The state of holding a lock )
            return savedState;
        } else {
    
            throw new IllegalMonitorStateException();
        }
    } finally {
    
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

// be located AQS The static inner class of Sync in : How to actually release the lock 
//ReentrantLock.unlock() -> sync.release()
public final boolean release(int arg) {
    
    //tryRelease Try to release the lock 
    //true: The current thread has completely released the lock 
    //false: The current thread has not fully released the lock 
    if (tryRelease(arg)) {
    
        //head Under what circumstances will it be created :
        // When the lock thread does not release the thread , And when another thread wants to acquire the lock during the lock holding period , Other threads found that they could not acquire the lock 
        // And the blocking queue is empty , At this point, subsequent threads will build a for the current lock thread head node ( Encapsulate the locking thread into head)
        // Subsequent threads are then appended to head After the node ( Become head My back drive )
        Node h = head;
        // Conditions 1:h!=null establish : Describe the... In the queue head The node has been initialized ,ReentrantLock During use , Too many threads compete 
        // Conditions 2:h.waitStatus!=0  establish , explain head It must have been inserted in the back head node 
        if (h != null && h.waitStatus != 0)
            // Wake up the rear drive node 
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// be located AQS The static inner class of Sync in : Try to release the lock 
//true: The current thread has completely released the lock  | false: The current thread has not fully released the lock 
protected final boolean tryRelease(int releases) {
    
    //state The value of the state variable is subtracted 
    int c = getState() - releases;
    // If the condition holds : Indicates that the current thread is not locked ->  Throw an exception directly 
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // The current thread holds the lock 
    // Whether the current thread has completely released the lock , The default initial value is false
    boolean free = false;
    //c==0  When conditions hold : This indicates that the current thread has completely released the lock 
    if (c == 0) {
    
        //free=true  The current thread has completely released the lock 
        free = true;
        // Update the current locking thread to null
        setExclusiveOwnerThread(null);
    }
    // to update state ( be based on CAS)
    setState(c);
    // return free
    return free;
}

3.5、isOnSyncQueue(Node node) Method : Judge the present Condition queue Medium node Whether the node is in the blocking queue

//  Judge the present  Condition queue   Medium  node Whether the node is in the blocking queue 
final boolean isOnSyncQueue(Node node) {
    
    //  Conditions for a :node.waitStatus == Node.CONDITION  establish : Show the current  node  It must be in the condition queue , because signal Method to migrate the node to the blocking queue , Will node Is set to 0
    //  Condition 2 :node.prev == null  precondition : At present node.waitStatus != Node.CONDITION ==> 1.node.waitStatus = 0( Indicates that the current node has been  signal 了 )
    // 2.node.waitStatus = 1: The current thread is calling without a lock  await Methodical , It will eventually be node Change to cancel status  CANCELED
    // node.waitStatus == 0: Why judge  node.prev == null
    //  because  signal  The method is to modify the state first and then migrate 
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;

    //  So let's go over here , What kind of situation ?
    // node.waitStatus != CONDITION  And  node.prev != null ==>  You can get rid of  node.waitStatus == 1
    //  Why can cancel status be excluded ?  because signal Method will not cancel the status of node Nodes are migrated 
    //  Set up  prev The referenced logic is set by the logic of the migration blocking queue  (enq)
    //  The logic of joining the team :1. Set up  node.prev = tail 2.CAS  At present  node  For blocking queues   Of tail  Tail node , Only when you succeed can you really enter the blocking queue  3.pred.next = node
    //  It can be calculated that  prev != null, Nor can it explain the current node It has been successfully queued to the blocking queue 

    //  Conditions established : This indicates that the current node has been successfully queued to the blocking queue , And there are other nodes behind the current node node 了 ..( Non tail node )
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /** *  So let's go over here , Indicates that the status of the current node is  node.prev != null And  node.waitStatus != CONDITION  And it can rule out  CANCELED  Cancel , And  node.waitStatus == 0 * findNodeFromTail  Start from the tail node of the blocking queue and traverse forward to find  node, If found, return true, Can't find return false *  At present node It is possible that signal In the process , Moving .. Has yet to be completed .. */
    return findNodeFromTail(node);
}

3.6、checkInterruptWhileWaiting(Node node) Method : Determine whether the current thread is interrupted and determine where it is interrupted

private int checkInterruptWhileWaiting(Node node) {
    
    // Thread.interrupted(): Returns the interrupt flag bit of the current thread , And reset the current flag bit to false
    return Thread.interrupted() ?
        // transferAfterCancelledWait(node): This method is called only when the thread is awakened by an interrupt 
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

//  Judge whether the current thread node is interrupted in the condition queue or migrated to the blocking queue 
final boolean transferAfterCancelledWait(Node node) {
    
    // compareAndSetWaitStatus(node, Node.CONDITION, 0): Conditions established : Show the current node It must be in the condition queue , because signal When migrating nodes, the node status will be changed to 0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    
        //  Interrupt wake-up node It will also be added to the blocking queue 
        enq(node);
        //  return true It means that it is interrupted in the condition queue 
        return true;
    }

    //  So far, there are several situations ?
    // 1. At present node Has been called by an external thread signal Method to migrate it to the blocking queue 
    // 2. At present node Being called by an external thread signal Method to migrate it to the blocking queue, but the migration has not been completed yet 
    while (!isOnSyncQueue(node))
        Thread.yield();
    // false: Indicates that the current node is not in the condition queue when it is awakened by an interrupt 
    return false;
}

3.7、unlinkCancelledWaiters() Method : Clean up the nodes in the condition queue with the status cancelled

//  Clean up the nodes in the condition queue with the status cancelled 
private void unlinkCancelledWaiters() {
    
    //  Represents the current node , Start backward iteration from the first node of the linked list 
    Node t = firstWaiter;
    //  The last normal state in the current linked list node node 
    Node trail = null;
    while (t != null) {
    
        //  Get the next node of the current node 
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
    
            //  to update nextWaiter by null
            t.nextWaiter = null;
            //  Conditions established : It indicates that the traversed node has not encountered a normal node 
            if (trail == null)
                //  to update firstWaiter The pointer is the next node 
                firstWaiter = next;
            else
                //  Let the previous normal node point to the next node of the cancelled node , The node with problems in the middle is skipped 
                trail.nextWaiter = next;
            //  Conditions established : The current node is the end of the queue node , to update lastWaiter  Point to the last normal node 
            if (next == null)
                lastWaiter = trail;
        }
        else //  If the condition is not satisfied, it will be implemented until else, This indicates that the current node is a normal node 
            trail = t;
        t = next;
    }
}

3.8、reportInterruptAfterWait(int interruptMode): Determine the subsequent operation according to the location of the interrupt

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    
    //  Conditions established : Indicates that an interrupt has occurred in the condition queue , here await() Method throws an interrupt exception 
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    //  Conditions established : Describes interrupts that occur outside the condition queue , At this point, set the interrupt flag bit of the current thread to true, Interrupt processing is left to your business layer code , If you don't handle it , Then nothing will happen 
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

3.9、signal() Method : Wake up the thread in the condition queue

public final void signal() {
    
    //  Determines the current call signal Whether the thread of the method is an exclusive lock holding thread , If not , Throws an exception directly 
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //  Get the first node in the condition queue node
    Node first = firstWaiter;
    //  The first node in the condition queue is not null, Then migrate the first node to the logic in the blocking queue 
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    
    do {
    
        // firstWaiter = first.nextWaiter: Because the current  firstWaiter  The conditional queue is coming soon 
        //  So update  firstWaiter  For the next node of the current node 
        //  If the next node of the current node is  null, This indicates that the current condition queue has only one node , Then the current firstWaiter  After leaving the team , for null 了 
        //  So we need to update  lastWaiter  Also for the  null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //  At present  first Node out of condition queue , Disconnect from the next node 
        first.nextWaiter = null;
        // transferForSignal(first)
        // boolean:true  At present first Node migrated to blocking queue successfully ,false: Migration failed 
        // while loop :(first = firstWaiter) != null  At present first Migration failed , Will first Updated to first.next, Continue trying to migrate ...
        //  Until a node is successfully migrated , Or the condition queue is empty 
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

3.10、transferForSignal(Node node) Method : Migrate... In the condition queue firstWaiter Node to blocking queue

//  Migrate... In the condition queue firstWaiter Node to blocking queue 
final boolean transferForSignal(Node node) {
    
    /* * If cannot change waitStatus, the node has been cancelled. */
    // CAS  Modify the status of the current node to 0, Because the current node is about to migrate to the blocking queue 
    //  success : Indicates that the current node is in normal status in the condition queue 
    //  Failure :1. Cancel the state  ( Threads await The lock is not held at the time of , Corresponding to the final thread node Will be set to CANCLED  state )
    // 2:node The corresponding thread is suspended , Was awakened by another thread using an interrupt signal ( It will also actively enter the blocking queue , The status will also be changed to 0)
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // enq(node): Returns the predecessor node of the current node , And add the current node to the blocking queue  p  Is the front node of the current node in the blocking queue 
    Node p = enq(node);
    //  Get the state of the front node 
    int ws = p.waitStatus;
    //  Conditions for a :ws > 0  establish : This indicates that the status of the precursor node is cancelled in the blocking queue , Wake up the current node 
    //  Condition 2 : precondition (ws <= 0):compareAndSetWaitStatus(p, ws, Node.SIGNAL)
    //  return true: Indicates that the status of the precursor node is set to signal Status success ,
    //  return false: Current drive node The corresponding thread calls  lockInterrupt The ones on the team node When , Will respond to interrupts , When an external thread gives an interrupt signal to a precursor thread , Forerunner node Will be changed to CANCELED
    //  And execute out of line logic 
    //  As long as the status of the precursor node is not  0  perhaps  -1, Then wake up the current thread 
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //  Wake up the current node The corresponding thread , After the logic , Talk back 
        LockSupport.unpark(node.thread);
    return true;
}
e p = enq(node);
    //  Get the state of the front node 
    int ws = p.waitStatus;
    //  Conditions for a :ws > 0  establish : This indicates that the status of the precursor node is cancelled in the blocking queue , Wake up the current node 
    //  Condition 2 : precondition (ws <= 0):compareAndSetWaitStatus(p, ws, Node.SIGNAL)
    //  return true: Indicates that the status of the precursor node is set to signal Status success ,
    //  return false: Current drive node The corresponding thread calls  lockInterrupt The ones on the team node When , Will respond to interrupts , When an external thread gives an interrupt signal to a precursor thread , Forerunner node Will be changed to CANCELED
    //  And execute out of line logic 
    //  As long as the status of the precursor node is not  0  perhaps  -1, Then wake up the current thread 
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //  Wake up the current node The corresponding thread , After the logic , Talk back 
        LockSupport.unpark(node.thread);
    return true;
}
原网站

版权声明
本文为[*Wucongcong*]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202221454047943.html