JAVA并发终章-核心源码调试
admin
2024-05-13 17:45:05
0

前言

**全篇高能!非战斗人员请撤离!!!非战斗人员请撤离!!!非战斗人员请撤离!!!**本章将会调试juc源码,将整个过程串起来,作为并发终章,后面不会再更新并发章节(有调整除外!)

源码调试

ReentrantLock 显式锁

前文说过,除非对锁要做操作,不然不建议用显式锁,但是这并不代表我们可以不懂他的机制,更不允许不懂他的源码!
ReentrantLock实现了Lock接口。ReentrantLock中,包含了Sync对象;而且,Sync是AQS的子类,其中有之前说的公平锁和闯入锁(前面贴了部分源码),之前也讲过ReentrantLock是个独占锁。默然是非公平的。现在就从源码上来了解。

    public void lock() {sync.lock();}abstract void lock();

从这里可以看出,ReentrantLock默认是使用闯入锁的

  public ReentrantLock() {sync = new NonfairSync();}

闯入锁

 static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock.  Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}

首先,闯入锁做了一次cas动作,如果cas成功,那么

 protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}

当前线程可以获得执行权,并且设置排它,如果没有获得就 acquire(1);

    public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

tryAcquire()

   protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
  final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

非常简单的代码,大致意思是:如果state是0,那么当前线程获得锁,如果当前线程就是执行的线程,那么状态加1,1就是传递的参数。如果不是当前线程持有锁,那么返回false。

addWaiter

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}

比较简单,意思是加入队列,并且将当前线程插入到等待队列中EXCLUSIVE是排他节点,表示独占。和tryAcquire连起来就是,如果获得锁失败了,那么就会加入到等待队列中去排队。

acquireQueued

    final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// //获得该node的前置节点final Node p = node.predecessor();// 如果上一个是头节点,那么再抢夺一次,如果抢夺成功,那么就设置自己为哑结点(头)if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

shouldParkAfterFailedAcquire

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE.  Indicate that we* need a signal, but don't park yet.  Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}

果前置节点为SIGNAL(可以理解为成功),意味着只需要等待其他前置节点的线程被释放,并且挂起当前线程,如果上一个线程被取消了(状态大于0,是1),那么循环这个双向队列,移除掉已经取消的线程,如果状态是其他,当前的线程设置状态SIGNAL(-1)。

parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

使用LockSupport.park挂起当前线程编程WATING状态,并且检查这个线程的中断状态

selfInterrupt

    static void selfInterrupt() {Thread.currentThread().interrupt();}

设置中断标志位为true,当线程满足两个条件,阻塞状态和中断标志为ture,则会抛出InterruptedException异常。

所以,整个闯入锁的流程应该是,lock了之后,立马去争夺一次锁,争夺成功了,OK,争夺失败了之后,查看当前线程状态,如果线程状态是0,那么争夺,如果是自己,则state加1,否则乖乖去排队。如果上一个节点是哑结点,难么再次争夺。把自己设置为头节点,否则把自己挂起。

公平锁

      final void lock() {acquire(1);}

由此可见,公平锁和闯入锁的差距就是在于lock的时候闯入锁闯入了一次,而公平锁直接acquire(1),进去看看。

 protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}

和闯入锁区别在于多了一次检测,看看这次检测干了什么,又在检测什么?

public final boolean hasQueuedPredecessors() {//读取头节点Node t = tail; //读取尾节点Node h = head;//s是首节点h的后继节点Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}

h != t当头节点和尾节点相等时,才会返回false。
1.头节点和尾节点都为null,空队列不需要排队。
2.头节点和尾节点不为null但是相等,说明头节点和尾节点都指向一个元素,表示队列中只有一个节点,不需要排队。

s.thread != Thread.currentThread()
首节点的后继节点如果是自己,就不需要排队,不是就要排队。

如果说检测成功(是首节点或者是首节点的后继节点),尝试去拥有锁。后续和闯入锁一样,不再赘述了。

其实闯入锁相对于公平锁多了两次先机,第一次是请求的时候,闯一次,第二次是公平锁需要检测前面是不是首节点,如果是就不抢了,而闯入锁是直接闯,其他一模一样。

锁的释放

 public void unlock() {sync.release(1);}
 public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

如果当前线程不是独占的,直接抛出异常,如果状态是0,释放,将独占锁放掉,这里有个有趣的现象,如果连上两次锁,再解一次,实际上是解不开的。
unparkSuccessor代码过于简单。

private void unparkSuccessor(Node node) { int ws = node.waitStatus;//获得head节点的状态 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);// //设置head节点 状态为0 Node s = node.next;//得到head节点的下一个节点 if (s == null || s.waitStatus > 0) { //如果下一个节点为null或者status>0表示cancelled状态. //通过从尾部节点开始扫描,找到距离head最近的一个s = null; for (Node t = tail; t != null && t != node; t =	t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //next节点不为空,直接唤醒这个线程即可 LockSupport.unpark(s.thread); 
}

值得一提的是,从尾部遍历实际上是怕尾部分叉。因为此时并发环境下可能会有其他线程入队。而此时如果A在cas往后插,B也插,此时C来释放,那不是分叉了吗?故而从尾部插入

tryRelease

        protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}

如果当前线程不是独占的,直接抛出异常,如果状态sh

读写锁

我们刚刚对显式独占锁进行了源码的剖析,现在讲一下读写锁。然后再逐渐深入AQS。

ReentrantReadWriteLock

先看类

public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {......
}

首先他实现了ReadWriteLock接口,去看看

public interface ReadWriteLock {/*** Returns the lock used for reading.** @return the lock used for reading*/Lock readLock();/*** Returns the lock used for writing.** @return the lock used for writing*/Lock writeLock();
}

这个接口比较简单,就是一个读锁一个写锁。看样子是获得两个锁,先方法。

    public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);}public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

很简单,就是new 了两个锁,另外使用的默认是闯入锁。
看看 ReadLock和 WriteLock

readLock

  protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}
  public void lock() {sync.acquireShared(1);}

获取共享锁,参数是1,看看

 public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}

tryAcquireShared

protected final int tryAcquireShared(int unused) {/** Walkthrough:* 1. If write lock held by another thread, fail.* 2. Otherwise, this thread is eligible for*    lock wrt state, so ask if it should block*    because of queue policy. If not, try*    to grant by CASing state and updating count.*    Note that step does not check for reentrant*    acquires, which is postponed to full version*    to avoid having to check hold count in*    the more typical non-reentrant case.* 3. If step 2 fails either because thread*    apparently not eligible or CAS fails or count*    saturated, chain to version with full retry loop.*/Thread current = Thread.currentThread();int c = getState();if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;int r = sharedCount(c);if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}return fullTryAcquireShared(current);}

AQS里边只有一个state来记录锁的状态,那怎么区分读锁和写锁呢,这里用到了二进制。将state转化为二进制,有32位,左边高16位记录读锁数量,包括重入的读锁数量,右边低16位记录写锁数量,包括重入的写锁数量。

  static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }/** Returns the number of exclusive holds represented in count  */static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
 if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;

意思是如果有独占锁,并且独占锁不是自己,那么去排队把。

sharedCount

 abstract boolean readerShouldBlock();

公平锁

final boolean readerShouldBlock() {return hasQueuedPredecessors();}

这个代码之前看过,意思是是否是首节点,或者是其后置节点。

闯入锁

final boolean readerShouldBlock() {/* As a heuristic to avoid indefinite writer starvation,* block if the thread that momentarily appears to be head* of queue, if one exists, is a waiting writer.  This is* only a probabilistic effect since a new reader will not* block if there is a waiting writer behind other enabled* readers that have not yet drained from the queue.*/return apparentlyFirstQueuedIsExclusive();}

非公平锁中判断是否阻塞读锁请求,就是看队列中是否有等待的写锁线程。

然后再看是否读锁数量已经到达上限,如果没有,夺锁。

  if (r == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} 

简单,意思就是看看是不是第一个,如果是把第一个读者设置为自己,如果当前没有读者,设置为自己。

else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}

其实就是叠加读者次数。

fullTryAcquireShared

  final int fullTryAcquireShared(Thread current) {/** This code is in part redundant with that in* tryAcquireShared but is simpler overall by not* complicating tryAcquireShared with interactions between* retries and lazily reading hold counts.*/HoldCounter rh = null;for (;;) {int c = getState();if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;// else we hold the exclusive lock; blocking here// would cause deadlock.} else if (readerShouldBlock()) {// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}

这段代码看起来比较复杂,慢慢分析其实很简单。
首先,有个自旋操作。

if (exclusiveCount(c) != 0) {
...
}

如果当前独占锁的数目不是0(有写锁),

 if (getExclusiveOwnerThread() != current)return -1;

独占锁不是自己,那么失败

else if (readerShouldBlock()) {
...
}

当前读者应该被阻塞,里面的代码是

  // Make sure we're not acquiring read lock reentrantlyif (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}

取到缓存中最后一次获取到读锁的计数器,计数为 0,说明当前线程没有获取到过读锁,为了垃圾回收考虑,干掉readHolds,如果rh不是空,但是count是0,那么说明当前线程不是第一个读者,也没有读者(说明当前的根本不是读锁),就去排队。继续分析后面代码。

if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");

读者数达到最大,那么抛出异常。简单的。

 if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}

这个和上面的已经重复。不在赘述。

tryAcquireShared方法分析完了,来总结一下,一个读请求,想要获取到共享锁,先看看有没有独占的,如果有,那么就阻塞,如果最大数目达到了,报异常,如果当前是第一个,设为第一个读者,当前不是,那么就累加。如果条件没有满足,执行fullTryAcquireShared,表示全力再试一次,里面的逻辑是判断是否有写锁占用,然后如果已经被阻塞了,那么确定一下是否可以再次进入,如果不能就阻塞,能就执行一样的逻辑。

if (tryAcquireShared(arg) < 0)doAcquireShared(arg);

如果不可以共享,执行
doAcquireShared

  private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

这个代码很简单,意思是以共享模式把这个节点加入等候队列,如果是前驱节点是头结点,就再试一次,如果不是就挂起,简单的。其中有个方法

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:*   Propagation was indicated by caller,*     or was recorded (as h.waitStatus either before*     or after setHead) by a previous operation*     (note: this uses sign-check of waitStatus because*      PROPAGATE status may transition to SIGNAL.)* and*   The next node is waiting in shared mode,*     or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}

意思是把当前节点设置为头节点,如果当前节点为空或者状态不对,那么获取下一个节点,如果下一个节点空了,释放

    private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases.  This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}}

这意思是 反复获取头节点,如果头结点没有变化就退出循环,进去If代表这里不止一个节点,如果状态为SIGNAL,就唤醒后面的节点,如果状态为0,说明h的后继所代表的线程已经被唤醒或即将被唤醒。

那么把读锁的逻辑连起来,整个过程非常清晰,就是如果读锁可以占用,则占用,如果读锁不能占用,如果前驱节点就是头节点,尝试占用,占用成功并且唤醒后面的节点,如果不是则挂起

writelock
写锁的逻辑和独占锁一模一样,不需要赘述。

读锁释放

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

doReleaseShared不在分析,前面有。说一下括号里面的

protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}

AQS(AbstractQueuedSynchronizer)

在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义,AQS则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。

AQS里有一个专门描述同步状态的变量

private volatile int state;

state是一个可见的状态值,一般0表示锁还没有被用,1表示占用,1+表示重入次数。

AQS中的东西其实上面都分析过了

// 独占锁获取
void acquire(int arg)
//与acquire方法相同,但在同步队列中进行等待的时候可以检测中断;
void acquireInterruptibly(int arg)
//在acquireInterruptibly基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false;
boolean tryAcquireNanos(int arg, long nanosTimeout)
//释放同步状态,该方法会唤醒在同步队列中的下一个节点
boolean release(int arg)
//共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态;
void acquireShared(int arg)
//在acquireShared方法基础上增加了能响应中断的功能;
void acquireSharedInterruptibly(int arg)
//在acquireSharedInterruptibly基础上增加了超时等待的功能;
boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
//共享式释放同步状态
boolean releaseShared(int arg)

不在赘述。主要讲一下AQS中的队列和Condition队列

abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {private static final long serialVersionUID = 7373984972572414691L;protected AbstractQueuedSynchronizer() {}static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED = 1;//节点从同步队列中取消static final int SIGNAL = -1;//后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行;static final int CONDITION = -2;//当前节点进入等待队列中static final int PROPAGATE = -3;//表示下一次共享式同步状态获取将会无条件传播下去volatile int waitStatus //节点状态volatile Node prev //当前节点/线程的前驱节点volatile Node next; //当前节点/线程的后继节点volatile Thread thread;//加入同步队列的线程引用Node nextWaiter;//等待队列中的下一个节点}//队列的头指针private transient volatile Node head;//队列的尾指针private transient volatile Node tail;private volatile int state;
}

其实就是个双向队列,里面每一个节点里有一个线程,和它的状态。就这么简单。

condition

condition是一个接口。

public interface Condition {/*** Causes the current thread to wait until it is signalled or* {@linkplain Thread#interrupt interrupted}.** 

The lock associated with this {@code Condition} is atomically* released and the current thread becomes disabled for thread scheduling* purposes and lies dormant until one of four things happens:*

    *
  • Some other thread invokes the {@link #signal} method for this* {@code Condition} and the current thread happens to be chosen as the* thread to be awakened; or*
  • Some other thread invokes the {@link #signalAll} method for this* {@code Condition}; or*
  • Some other thread {@linkplain Thread#interrupt interrupts} the* current thread, and interruption of thread suspension is supported; or*
  • A "spurious wakeup" occurs.*
**

In all cases, before this method can return the current thread must* re-acquire the lock associated with this condition. When the* thread returns it is guaranteed to hold this lock.**

If the current thread:*

    *
  • has its interrupted status set on entry to this method; or*
  • is {@linkplain Thread#interrupt interrupted} while waiting* and interruption of thread suspension is supported,*
* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared. It is not specified, in the first* case, whether or not the test for interruption occurs before the lock* is released.**

Implementation Considerations**

The current thread is assumed to hold the lock associated with this* {@code Condition} when this method is called.* It is up to the implementation to determine if this is* the case and if not, how to respond. Typically, an exception will be* thrown (such as {@link IllegalMonitorStateException}) and the* implementation must document that fact.**

An implementation can favor responding to an interrupt over normal* method return in response to a signal. In that case the implementation* must ensure that the signal is redirected to another waiting thread, if* there is one.** @throws InterruptedException if the current thread is interrupted* (and interruption of thread suspension is supported)*/void await() throws InterruptedException;/*** Causes the current thread to wait until it is signalled.**

The lock associated with this condition is atomically* released and the current thread becomes disabled for thread scheduling* purposes and lies dormant until one of three things happens:*

    *
  • Some other thread invokes the {@link #signal} method for this* {@code Condition} and the current thread happens to be chosen as the* thread to be awakened; or*
  • Some other thread invokes the {@link #signalAll} method for this* {@code Condition}; or*
  • A "spurious wakeup" occurs.*
**

In all cases, before this method can return the current thread must* re-acquire the lock associated with this condition. When the* thread returns it is guaranteed to hold this lock.**

If the current thread's interrupted status is set when it enters* this method, or it is {@linkplain Thread#interrupt interrupted}* while waiting, it will continue to wait until signalled. When it finally* returns from this method its interrupted status will still* be set.**

Implementation Considerations**

The current thread is assumed to hold the lock associated with this* {@code Condition} when this method is called.* It is up to the implementation to determine if this is* the case and if not, how to respond. Typically, an exception will be* thrown (such as {@link IllegalMonitorStateException}) and the* implementation must document that fact.*/void awaitUninterruptibly();/*** Causes the current thread to wait until it is signalled or interrupted,* or the specified waiting time elapses.**

The lock associated with this condition is atomically* released and the current thread becomes disabled for thread scheduling* purposes and lies dormant until one of five things happens:*

    *
  • Some other thread invokes the {@link #signal} method for this* {@code Condition} and the current thread happens to be chosen as the* thread to be awakened; or*
  • Some other thread invokes the {@link #signalAll} method for this* {@code Condition}; or*
  • Some other thread {@linkplain Thread#interrupt interrupts} the* current thread, and interruption of thread suspension is supported; or*
  • The specified waiting time elapses; or*
  • A "spurious wakeup" occurs.*
**

In all cases, before this method can return the current thread must* re-acquire the lock associated with this condition. When the* thread returns it is guaranteed to hold this lock.**

If the current thread:*

    *
  • has its interrupted status set on entry to this method; or*
  • is {@linkplain Thread#interrupt interrupted} while waiting* and interruption of thread suspension is supported,*
* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared. It is not specified, in the first* case, whether or not the test for interruption occurs before the lock* is released.**

The method returns an estimate of the number of nanoseconds* remaining to wait given the supplied {@code nanosTimeout}* value upon return, or a value less than or equal to zero if it* timed out. This value can be used to determine whether and how* long to re-wait in cases where the wait returns but an awaited* condition still does not hold. Typical uses of this method take* the following form:**

 {@code* boolean aMethod(long timeout, TimeUnit unit) {*   long nanos = unit.toNanos(timeout);*   lock.lock();*   try {*     while (!conditionBeingWaitedFor()) {*       if (nanos <= 0L)*         return false;*       nanos = theCondition.awaitNanos(nanos);*     }*     // ...*   } finally {*     lock.unlock();*   }* }}
**

Design note: This method requires a nanosecond argument so* as to avoid truncation errors in reporting remaining times.* Such precision loss would make it difficult for programmers to* ensure that total waiting times are not systematically shorter* than specified when re-waits occur.**

Implementation Considerations**

The current thread is assumed to hold the lock associated with this* {@code Condition} when this method is called.* It is up to the implementation to determine if this is* the case and if not, how to respond. Typically, an exception will be* thrown (such as {@link IllegalMonitorStateException}) and the* implementation must document that fact.**

An implementation can favor responding to an interrupt over normal* method return in response to a signal, or over indicating the elapse* of the specified waiting time. In either case the implementation* must ensure that the signal is redirected to another waiting thread, if* there is one.** @param nanosTimeout the maximum time to wait, in nanoseconds* @return an estimate of the {@code nanosTimeout} value minus* the time spent waiting upon return from this method.* A positive value may be used as the argument to a* subsequent call to this method to finish waiting out* the desired time. A value less than or equal to zero* indicates that no time remains.* @throws InterruptedException if the current thread is interrupted* (and interruption of thread suspension is supported)*/long awaitNanos(long nanosTimeout) throws InterruptedException;/*** Causes the current thread to wait until it is signalled or interrupted,* or the specified waiting time elapses. This method is behaviorally* equivalent to:*

 {@code awaitNanos(unit.toNanos(time)) > 0}
** @param time the maximum time to wait* @param unit the time unit of the {@code time} argument* @return {@code false} if the waiting time detectably elapsed* before return from the method, else {@code true}* @throws InterruptedException if the current thread is interrupted* (and interruption of thread suspension is supported)*/boolean await(long time, TimeUnit unit) throws InterruptedException;/*** Causes the current thread to wait until it is signalled or interrupted,* or the specified deadline elapses.**

The lock associated with this condition is atomically* released and the current thread becomes disabled for thread scheduling* purposes and lies dormant until one of five things happens:*

    *
  • Some other thread invokes the {@link #signal} method for this* {@code Condition} and the current thread happens to be chosen as the* thread to be awakened; or*
  • Some other thread invokes the {@link #signalAll} method for this* {@code Condition}; or*
  • Some other thread {@linkplain Thread#interrupt interrupts} the* current thread, and interruption of thread suspension is supported; or*
  • The specified deadline elapses; or*
  • A "spurious wakeup" occurs.*
**

In all cases, before this method can return the current thread must* re-acquire the lock associated with this condition. When the* thread returns it is guaranteed to hold this lock.***

If the current thread:*

    *
  • has its interrupted status set on entry to this method; or*
  • is {@linkplain Thread#interrupt interrupted} while waiting* and interruption of thread suspension is supported,*
* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared. It is not specified, in the first* case, whether or not the test for interruption occurs before the lock* is released.***

The return value indicates whether the deadline has elapsed,* which can be used as follows:*

 {@code* boolean aMethod(Date deadline) {*   boolean stillWaiting = true;*   lock.lock();*   try {*     while (!conditionBeingWaitedFor()) {*       if (!stillWaiting)*         return false;*       stillWaiting = theCondition.awaitUntil(deadline);*     }*     // ...*   } finally {*     lock.unlock();*   }* }}
**

Implementation Considerations**

The current thread is assumed to hold the lock associated with this* {@code Condition} when this method is called.* It is up to the implementation to determine if this is* the case and if not, how to respond. Typically, an exception will be* thrown (such as {@link IllegalMonitorStateException}) and the* implementation must document that fact.**

An implementation can favor responding to an interrupt over normal* method return in response to a signal, or over indicating the passing* of the specified deadline. In either case the implementation* must ensure that the signal is redirected to another waiting thread, if* there is one.** @param deadline the absolute time to wait until* @return {@code false} if the deadline has elapsed upon return, else* {@code true}* @throws InterruptedException if the current thread is interrupted* (and interruption of thread suspension is supported)*/boolean awaitUntil(Date deadline) throws InterruptedException;/*** Wakes up one waiting thread.**

If any threads are waiting on this condition then one* is selected for waking up. That thread must then re-acquire the* lock before returning from {@code await}.**

Implementation Considerations**

An implementation may (and typically does) require that the* current thread hold the lock associated with this {@code* Condition} when this method is called. Implementations must* document this precondition and any actions taken if the lock is* not held. Typically, an exception such as {@link* IllegalMonitorStateException} will be thrown.*/void signal();/*** Wakes up all waiting threads.**

If any threads are waiting on this condition then they are* all woken up. Each thread must re-acquire the lock before it can* return from {@code await}.**

Implementation Considerations**

An implementation may (and typically does) require that the* current thread hold the lock associated with this {@code* Condition} when this method is called. Implementations must* document this precondition and any actions taken if the lock is* not held. Typically, an exception such as {@link* IllegalMonitorStateException} will be thrown.*/void signalAll(); }

他有一个实现类,叫做ConditionObject

conditionObject是通过基于单链表的条件队列来管理等待线程的。线程在调用await方法进行等待时,会释放同步状态。同时线程将会被封装到一个等待节点中,并将节点置入条件队列尾部进行等待。当有线程在获取独占锁的情况下调用signal或singalAll方法时,队列中的等待线程将会被唤醒,重新竞争锁。另外,需要说明的是,一个锁对象可同时创建多个 ConditionObject 对象,这意味着多个竞争同一独占锁的线程可在不同的条件队列中进行等待。在唤醒时,可唤醒指定条件队列中的线程。

方法:

await

   public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}

一点点来分析

if (Thread.interrupted())throw new InterruptedException();

如果线程中断,抛出异常

 Node node = addConditionWaiter();

向尾部添加

 private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
    final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}

完全释放

    final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}

众所周知,wait方法和sleep方法的不同是wait的时候会释放锁,这里其实也是一样的,首先获取同步锁的状态值,然后调用 release 释放指定数量的同步状态。

  public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

这里的代码之前都看过不在赘述,意思就是释放锁,释放完了就完事,返回释放的状态值。

isOnSyncQueue

 final boolean isOnSyncQueue(Node node) {if (node.waitStatus == Node.CONDITION || node.prev == null)return false;if (node.next != null) // If has successor, it must be on queuereturn true;/** node.prev can be non-null, but not yet on queue because* the CAS to place it on queue can fail. So we have to* traverse from tail to make sure it actually made it.  It* will always be near the tail in calls to this method, and* unless the CAS failed (which is unlikely), it will be* there, so we hardly ever traverse much.*/return findNodeFromTail(node);}

如果节点的状态为CONDITION 或者prev没有,那么节点就不在队列上,就这样去判断节点是不是在队列里。

private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}
}
 while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}

如果节点不在队列,则阻塞当前线程

   if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);

acquireQueued之前分析过了。
连起来整个过程就是:当前线程已经中断则抛出中断异常,将当前线程加入到条件队列中去,释放当前线程所占用的锁,并保存当前锁的状态,然后等待唤醒,如果说跳出了循环那么一定是被唤醒了。

signal

public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}

isHeldExclusively

protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}

这个代码的意思是判断当前独占锁的线程是不是当前线程,如果不是那就抛出异常。获取头节点,对他进行唤醒操作。

 private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}

transferForSignal

 final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}

通过transferForSignal方法将first节点从条件队列转移到同步队列当中,然后将first节点从条件队列当中踢出去。

串起来就是唤醒就是把condition队列的元素踢到同步队列去争夺锁

doSignalAll就是唤醒所有的条件队列里的线程。

结束

至此。JAVA并发章节全部结束,后续不会再更新。
整个源码部分不算太难,基本跑一遍看一遍就OK。

相关内容

热门资讯

【看表情包学Linux】进程地...   🤣 爆笑教程 👉 《看表情包学Linux》👈 猛...
育碧GDC2018程序化大世界... 1.传统手动绘制森林的问题 采用手动绘制的方法的话,每次迭代地形都要手动再绘制森林。这...
编译原理陈火旺版第三章课后题答... 下面答案仅供参考! 1.编写一个对于 Pascal 源程序的预处理程序。该程序的作用是...
MacBookPro M2芯片... MacBookPro M2芯片下如何搭建React-Native环境目录软件下载环境配置 目录 写在...
Android studio ... 解决 Android studio 出现“The emulator process for AVD ...
pyflink学习笔记(六):... 在pyflink学习笔记(一)中简单介绍了table-sql的窗口函数,下面简单介绍下...
创建deployment 创建deployment服务编排-DeploymentDeployment工作负载均衡器介绍Depl...
gma 1.1.4 (2023... 新增   1、地图工具    a. 增加【GetWorldDEMDataSet】。提供了一套 GEO...
AI专业教您保姆级在暗影精灵8... 目录 一、Stable Diffusion介绍    二、Stable Diffusion环境搭建 ...
vue笔记 第一个Vue应用 Document{{content}}{{...
Unity自带类 --- Ti... 1.在Unity中,自己写的类(脚本)的名字不能与Unit...
托福口语21天——day5 发... 目录 一、连读纠音 二、语料输入+造句输出 三、真题 一、连读纠音 英语中的连读方式有好几种...
五、排序与分页 一、排序 1、语法 ORDER BY 字段 ASC | DESC ASC(ascen...
Linux系统中如何安装软件 文章目录一、rpm包安装方式步骤:二、deb包安装方式步骤:三、tar....
开荒手册4——Related ... 0 写在前面 最早读文献的时候,每每看到related work部分都会选择性的忽略&...
实验01:吃鸡蛋问题 1.实验目的: 通过实验理解算法的概念、算法的表示、算法的时间复杂度和空间复杂度分析&...
8个免费图片/照片压缩工具帮您... 继续查看一些最好的图像压缩工具,以提升用户体验和存储空间以及网站使用支持。 无数图像压...
Spring Cloud Al... 前言 本文小新为大家带来 Sentinel控制台规则配置 相关知识,具体内容包括流控...
多项目同时进行,如何做好进度管... 多项目同时进行,如何做好进度管理? 大多数时候,面对项目进...
ATTCK红队评估实战靶场(二... 前言 第二个靶机来喽,地址:vulunstack 环境配置 大喊一声我...