队列同步器
队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组
件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获
取线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状
态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3
个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操
作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部
类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来
供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获
取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、
ReentrantReadWriteLock和CountDownLatch等)。
队列同步器的接口与示例
同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的
方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些
模板方法将会调用使用者重写的方法。
以下是重写同步器方法实现的一个独占锁。
class Mutex implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() { return new ConditionObject(); }
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
队列同步器的实现分析
同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取
同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其
加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再
次尝试获取同步状态。
节点是构成同步队列的基础。
同步器拥有首节点(head)
和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的
基本结构如图示。
同步队列在加入尾结点需要cas来保证线程安全,而设置头结点不需要。
独占式同步状态获取与释放
主要流程为:
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是
由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同
步队列中移出。
//同步器的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等
待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法
保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式
Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)
方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该
节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的
唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
//同步器的addWaiter和enq方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试在尾部添加
Node pred = tail;if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能
够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释
放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
总结:在获取同步状态时,同步器维
护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列
(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步
器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状
态。
通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) cancelAcquire(node);
}
}
在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状
态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同
步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是
tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自
旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示
该次获取同步状态成功并从自旋过程中退出。
与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以
释放同步状态。方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线
程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShared(int arg)
方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为
释放同步状态的操作会同时来自多个线程。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
独占式超时获取同步状态
通过调用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状
态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。该
方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。
超时获取同步状态过程可以被视作响应中断获取同步状态过程的“增强版”,
doAcquireNanos(int arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的
特性。针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知,
nanosTimeout计算公式为:nanosTimeout-=now-lastTime,其中now为当前唤醒时间,lastTime为上
次唤醒时间,如果nanosTimeout大于0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒,
反之,表示已经超时。
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0) return false;
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
//计算时间,当前时间now减去睡眠之前的时间lastTime得到已经睡眠
//的时间delta,然后被原有超时时间nanosTimeout减去,得到了
//还应该睡眠的时间
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed) cancelAcquire(node);
}
}
该方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功
则从该方法返回,这个过程和独占式同步获取的过程类似,但是在同步状态获取失败的处理
上有所不同。如果当前线程获取同步状态失败,则判断是否超时(nanosTimeout小于等于0表示
已经超时),如果没有超时,重新计算超时间隔nanosTimeout,然后使当前线程等待
nanosTimeout纳秒(当已到设置的超时时间,该线程会从LockSupport.parkNanos(Object
blocker,long nanos)方法返回)。
如果nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行
超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果
这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超
时非常短的场景下,同步器会进入无条件的快速自旋。
独占式超时获取同步态的流程如图5所示。
全文来自 Java并发编程的艺术