AbstractQueuedSynchronized

AbstractQueuedSynchronized 简称 AQS,这个类是整个并发包的基础工具类, ReentrantLock、CountDownLatch、Semaphore、FutureTask 等并发工具类底层都是通过它来实现的

AQS 定义了两种资源共享的方式:

  • Exclusive:独占式,只有一个线程能获取资源并执行,比如 ReentrantLock。
  • Share:共享式,多个线程获取资源,多个线程可以同时执行,比如 CountDownLatch,ReentrantReadWriteLock 的 ReadLock 等

AQS 结构

属性

主要的就是这三个 volatile 修饰的 Node 对象,还有一些对应的偏移量(用于 CAS 的)

/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
* 头节点,可以理解为当前持有锁的节点
* 在分析的过程中不要将它算作队列的一部分!它只是一个空节点
*/
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
* 尾节点
*/
private transient volatile Node tail;

/**
* The synchronization state.
* 同步状态,0 代表没有被占用,1 代表被一个线程占用,>1 代表被同一个线程多次占用(可重入)
*/
private volatile int state;

Node 节点

static final class Node {
/** Marker to indicate a node is waiting in shared mode */
//共享模式
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
//独占模式
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
//取消抢锁
static final int CANCELLED = 1;

/** waitStatus value to indicate successor's thread needs unparking */
//代表当前节点的后续节点需要被 unparking,也就是说后继节点状态是 parking
static final int SIGNAL = -1;

/** waitStatus value to indicate thread is waiting on condition */
//在 condition 上等待
static final int CONDITION = -2;

/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

//上面的那些状态
volatile int waitStatus;

//前驱节点
volatile Node prev;
//后继节点
volatile Node next;

/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;

/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

//返回前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

ReentrantLock 分析

我们知道 ReentrantLock 内部有两个锁,一个是公平锁 (FairSync)🔒,一个是非公平锁 (NonFairSync)🔒,这两个锁都是独占锁,两者实现的差异其实并不大,我们先从公平锁开始说起。

static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

Lock()

🔔 这里我们为了模拟真实的情况,我们假设有两个线程Thread0Thread1 过来执行了Lock() 方法,且Thread0Thread1 要先执行。

Lock()方法中调用了父类的acquire(1)

acquire()

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

首先尝试 tryAcquire(1),这个 tryLock() 在 AQS 中没有具体实现是交给子类去实现的,所以这里就会调用 FairSync 的,tryAquire(1)

tryAquire()

protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
if (c == 0) { //0 代表还没有线程占用
//判断有没有前驱节点(除 head 节点外),没有则进行 CAS 设置同步状态获取锁
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
//设置当前线程为独占线程
setExclusiveOwnerThread(current);
return true;
}
}
//到这里就说明已经有线程占用了,所以下面是为了重入
else if (current == getExclusiveOwnerThread()) {
//这里不用担心并发的问题,因为是独占锁,同时只有一个线程会到达这里
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

hasQueuedPredecessors()

公平锁和非公平锁的 tryAcquire 方法区别就在这里,这个方法就是判断有没有前驱节点(不包含头节点 head,也就是)存在,有的话为了保证公平性就是需要等待,返回 true。

public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 头不等于尾,并且队列的第一个节点所持有线程非当前线程返回 true
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

💡 到这里我们分析下Thread0Thread1 的执行情况

🔸 首先Thread0先执行了tryAcquire(1) 没有任何阻碍,执行成功直接 retrurn

🔸 Thread1 此时有多种情况:

  • 还没有获取 state ,Thread0执行完后获取 State==1 ,由于是独占锁直接 return false ,获取锁失败。

  • 已经getState()==0了,执行hasQueuedPredecessors 方法,注意,此时 head 和 tail 都还没有初始化,都还是 null(官方的注释中也提到 head 和 tail 是 lazily initialized )所以这里会直接 return false,然后继续执行 CAS,由于前面Thread0 已经将 state 设置为了 1 ,所以这里 CAS 肯定失败了,最终Thread1的 tryAcquire 失败返回 false。

🔸 既然tryAcquire() 失败了,那Thread1 就会转头继续执行后面的方法

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

首先执行的就是AddWaiter()

addWaiter()

这个方法的作用就是将 Thread 和 mode 包装成 Node 然后添加到链表尾部然后返回这个 Node

private Node addWaiter(Node mode) {
//将当前线程和模式封装进 Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果尾节点不为空
if (pred != null) {
//将 node 连接在当前 tail 后面
node.prev = pred;
//cas 设置当前 tail 为 node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 其实前面看似会有并发的问题其实并没有
// 上面抢锁失败的线程会直接进入 enq 方法自旋重新设置,直到成功
enq(node);
return node;
}

💡 根据前面分析 head 和 tail 都还没有初始化都还是 null,所以这里会直接进入 enq()方法

enq()

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

🔸 可以看到这里是一个循环,因为 head 和 tail 都还是空的所以这里进入第一个循环,CAS 设置一个空的 Node() 为头节点 head,然后将 tail 也指向这个 head,到这里 head 和 tail 才算是初始化完成了 (lazily initialized )。

🔸 循环,进入 else,这里就将当前 node 连接到 tail 后面并且利用 CAS 自旋设置 tail 为当前 node 也就是包含Thread1 的 node,然后 return 当前节点的前驱节点(这里返回值并没有用到)。

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

下一步就是执行acquireQueued()

为什么不直接在构造器里面就初始化头节点 head?而要采用懒加载的方式?

CLH queues need a dummy header node to get started. Butwe don’t create them on construction, because it would be wasted effort if there is never contention. Instead, the nodeis constructed and head and tail pointers are set up on first contention.

以上摘自** Doug Lea** 大师的注释解释,根据我们的上面的分析,其实我们也看到了,第一个线程Thread0 过来的时候并没有去初始化 head,后面的线程Thread1过来的时候 有了竞争才初始化了这个头节点 head,如果直接初始化这个 head,然后又没有锁竞争,这个 head 节点就被浪费了。 大师就是大师,太强了 😮

acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点是 head 就尝试去获取锁
if (p == head && tryAcquire(arg)) {
//设置头节点为当前节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//发生异常取消抢锁
if (failed)
cancelAcquire(node);
}
}

💡 因为前置节点是 head,所以这里作为队列第一个可以去尝试获取锁,可以看到这里是个死循环,会一直尝试获取锁,其实类似与 CAS 的自旋,但是相比 CAS 自旋又有很大不同,它并不会一直自旋,详细可以继续往下看。

🔸 前面传递过来的 node 前继节点正好就是 head,所以执行 tryAcquire() 但是由于Thread0 还没有释放锁所以这里仍然失败。

🔸 进入第二个 if 执行 shouldParkAfterFailedAcquire(p,node)

shouldParkAfterFailedAcquire()

看名字就知道是干啥的了,获取失败是否 Park?

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前置节点的状态
int ws = pred.waitStatus;

if (ws == Node.SIGNAL)
//前置节点状态为-1,代表当前节点的后续节点需要被挂起
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
// 前驱节点 waitStatus 大于 0,说明前驱节点取消了排队。
// 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
// 所以下面这块代码说的是将当前节点的 prev 指向 waitStatus<=0 的节点,
// 简单说,就是为了找个正常的前驱节点,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,就无法唤醒你了
// 同时这个操作也会将那些 ws>0 的节点移除掉
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//设置前驱节点状态为 -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

注意这个里面回剔除不正常的节点,为下面的唤醒操作考虑

💡 分析Thread1,我们先看看当前队列的状态

mark

🔸 因为前面的操作并没有对 state 进行操作,所以这里会直接进入最后的 else,设置前驱节点的状态为 SIGNAL

然后 renturn false 回到 acquireQueued 的内循环

🔸 再次尝试获取锁,Thread0 仍然没有释放锁,失败,再次进入 shouldParkAfterFailedAcquire,这一次由于已经将前继节点的状态设置为SIGNAL 所以直接 return true,进入后面的 parkAndCheckInterrupt() 方法

此时状态变为

mark

parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

到这里我们的线程Thread1就会被阻塞住,也不会继续自旋获取锁了。

LockSupport.park() 实际上调用的是 Unsafe 提供的指令属于线程阻塞原语,可以理解为二元信号量(只有一个 permit),这个方法也会响应 Interrupt 参考

🔔 假设此时又有一个线程Thread2过来了,并且Thread0 依然没有释放锁

🔸 tryAcquire 失败

🔸addWaiter(), 将Thread2和模式包装成 Node 添加到队尾(这个时候就不会进入 enq 了,因为 tail 此时为Thread1已经不为空了)然后返回包含Thread2的节点,队列状态变为:

mark

🔸acquireQueued(),根据上面的状态图,这里前置节点并不是 head,直接进入 shouldParkAfterFailedAcquire()

🔸shouldParkAfterFailedAcquire(),明显前驱节点状态并不是SIGNAL 而是 0,所以直接利用 CAS 设置前驱节点为SIGNAL 状态变为:

mark

回到 acquireQueued() 继续自旋

🔸 前置节点不是 head,调用 shouldParkAfterFailedAcquire(NodeT1,mode),成功,调用 parkAndCheckInterrupt 阻塞,至此,Thread1Thread2 都在这里 park 住了。

unLock()

🔔 继续上面的模拟,此时Thread0释放锁 ,调用unlock() 方法,unlock() 会调用 AQS 中的release(1)方法

release()

public final boolean release(int arg) {
if (tryRelease(arg)) {
//获取头节点
Node h = head;
//头节点不为空,并且头节点的 waitStatus 不是 0
if (h != null && h.waitStatus != 0)
//unpark 后继节点
unparkSuccessor(h);
return true;
}
return false;
}

首先执行tryRelease(1) ,和tryAcquire() 一样,这个方法最后是交给子类去实现的

tryRelease()

protected final boolean tryRelease(int releases) {
//同步状态减 1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
//解锁线程不是当前线程,解铃还须系铃人
throw new IllegalMonitorStateException();
//和上面一样这里不用担心并发的问题,因为是独占锁,同时只有一个线程会到达这里
boolean free = false;
//不为 0 则代表被重入了,需要多次 release 直到 0 才会释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

tryRelease() 成功继续执行后面的语句,看一下当前 AQS 队列的状态

mark

🔸 头节点 head ! =null 并且 head.waitState 不是 0,执行 unparkSuccessor().

unparkSuccessor()

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
//如果当前节点的 ws 小于 0 就设置为 0,允许失败
//其实是独占锁的话这里肯定不会失败,因为只有一个线程
//release 执行完之后,这个节点就会被移除掉。然后被 GC
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//获取后继节点
Node s = node.next;
//后继节点为空,或者已经撤销了,取消抢锁(1>0)
if (s == null || s.waitStatus > 0) {
s = null;
//从后往前遍历找到正数第一个 waitStatus<0 的
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//然后释放它
LockSupport.unpark(s.thread);
}

🔸 Thread1unpark() 继续执行。

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

返回Thread1的 中断标志位,并复原为 false,结束parkAndCheckInterrupt()方法,再次回到acquireQueued() 的循环中执行第一个 if

for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; //返回中断状态
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}

🔸 由于Thread0 已经释放锁,同步状态已经变为 0,Thread1 可以直接tryAcquire获取到锁,然后设置头节点为当前节点,将之前的 head 节点移除,返回中断状态,由于之前 park 期间没有被中断直接return false,acquire 成功!!! Thread1 获得锁!!!

为什么要从后往前遍历?

这里看了一些博客介绍,大概有两个说法,一个是在enq() 方法里面,先设置的node.prev = pred;再执行的 CAS 最后执行的t.next = node; CAS 成功后 next 也许还没有设置成功,从前往后遍历有可能找不到这个刚加入的节点;其次,在cancelAcquire(node); 的最后一步有一个node.next=node的操作,如果这个时候从前往后遍历会导致死循环。

从后往前遍历找到最前面第一个 waitStatus<0 的节点,这个操作如果返回的是个中间节点怎么办?

不要怕,我们继续执行,首先它是个中间节点而且是公平锁,它有前驱节点,unpark 后肯定获取不到锁(公平锁需要检测是否有前驱节点),然后执行shouldParkAfterFailedAcquire(),还记得这个方法里面的一个操作么??如果前驱节点状态>0,他就会清除这些不正常的节点,返回 false,不 park 自旋,下一次循环这个节点在获取锁就可以获取到了,妙哉!!!

注意** setHead **是这样的

private void setHead(Node node) {
head = node;
node.thread = null; //设置线程为 null
node.prev = null; //设置前驱为空
}

此时队列状态变为:

mark

再往后就是重复前面的过程啦。

非公平锁公平锁区别

上面是介绍的公平锁,所谓的公平就是先来后到 FIFO。

我们来看一下非公平锁的 lock 和 nonfairTryAcquire() 的实现,这两个锁的区别其实就是这两个方法。

lock()

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

可以看到非公平锁lock()的时候,不管三七二十一先 CAS 试一下能不能获取到锁,获取到就直接返回

nonfairTryAcquire()

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

对比公平锁的实现,会发现少了hasQueuedPredecessors() 这个方法,所以如果前面lock() 的时候没有 CAS 成功,到这里后如果之前持有锁的线程释放了锁,它又会再次尝试 CAS 获取锁,这里其实就体现了非公平锁的特点,**先等待锁的线程不一定能先获取到锁,中间允许有人"插队"**,如果这一次还是失败了,就会和公平锁一样老老实实去等待队列中排队

一般而言,非公平锁的性能会比公平锁好,而非公平锁可能会导致排在后面的线程饥饿

流程图

mark

参考

《Java 并发编程之美》

一行一行源码分析清楚 AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 源码剖析(六)- 深刻解析与模拟线程竞争资源

[浅谈 Java 并发编程系列(八)—— LockSupport 原理剖析]

Java 同步器——AQS 学习