AQS源码与Lock
liduoan.efls Engineer

什么是AQS

Java并发编程核心在于java.concurrent.util包,而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。

我们常用的各种同步组件或者锁都是基于这个框架实现的,一般都是通过定义内部类Sync继承AQS将同步器所有调用都映射到Sync对应的方法 。

AQS的特性

首先大家需要简单了解AQS中的一些特性。

三大核心原理

自旋,LocksSuport, CAS,queue队列

资源状态

AQS中定义了一个状态变量,用于描述当前资源的状态:

1
2
3
4
5
/**
* The synchronization state.
*/
//状态变量
private volatile int state;
  • 0:表示当前资源可用,比如锁已被某个线程占用
  • 1:表示当前资源不可用

资源共享方式

AQS定义两种资源共享方式:

  • Exclusive(独占):只有一个线程能执行,如ReentrantLock。在独享方式下,AQS的父类AbstractOwnableSynchronizer中的包含一个字段,用于指向独享资源的线程:
1
2
// 当前持有资源的线程
private transient Thread exclusiveOwnerThread;
  • Share(共享):多个线程可以同时执行,如SemaphoreCountDownLatch

两种队列

不同的自定义同步器的特性不同,因此争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

AQS中主要包括以下两种队列:

同步等待队列

AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列。

同步器依赖内部的同步队列来完成同步状态的管理,当前线程获取资源失败时,同步器会将当前线程以及等待状态等信息构造成称为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步资源释放时,会把首节点中的线程唤醒,使其再次尝试获取同步资源。链表的结点是AQS中的内部类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 链表的节点,AbstractQueuedSynchronizer的内部类
static final class Node {

// 共享模式
static final Node SHARED = new Node();

// 互斥(独占)模式
static final Node EXCLUSIVE = null;

// 节点的四个生命状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

/** 节点的生命状态(信号量)
* SIGNAL = -1 可被唤醒
* CANCELLED = 1 出现异常,中断引起的,需要废弃结束
* CONDITION = -2 条件等待,用于条件等待队列,不用于CLH
* PROPAGATE = -3 传播
* 0 初始状态(默认)
* 为了保证所有阻塞线程对象能够被唤醒
*/
volatile int waitStatus;

// 前驱节点
volatile Node prev;

// 后继节点
volatile Node next;

// 节点同步状态的线程
volatile Thread thread;

// 该字段用于条件等待队列,不用于CLH队列
Node nextWaiter;

// 判断是否是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}

// 获取结点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

// 构造方法
Node() {
}

Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

一个CLH队列的结构大致如下:

image

其头节点和尾结点对应AQS的两个字段:

1
2
3
4
5
6
7
// CLH双向链表的head头节点
// 懒加载,由第一个进入等待队列的线程结点加载
// head恒为空结点,即其中的thread为null
private transient volatile Node head;

// CLH双向链表的tail尾结点,排在队列最后的线程结点
private transient volatile Node tail;

条件等待队列

与CLH队列不同,Condition是一个多线程间协调通信的工具类,使得某个或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁。AQS中条件等待队列的结构如下图所示:

image

与CLH队列相比,条件等待队列是一个单向链表,并且它的所有节点都不为空(CLH中的头节点为空)。条件等待队列同样也是基于Node构建的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 链表的节点,AbstractQueuedSynchronizer的内部类
static final class Node {

// 条件等待队列中指向下一个节点的指针
Node nextWaiter;
}


// ConditionObject是AQS中对Condition的实现
public class ConditionObject implements Condition, java.io.Serializable {

// 条件等待队列中的头节点
private transient Node firstWaiter;

// 条件等待队列中的尾节点
private transient Node lastWaiter;
}

多线程安全性

AQS如何保证多线程并行下操作的安全性?AQS中使用了volatile关键字修饰变量,并且通过CAS原子操作对这些volatile变量进行修改,保证了多线程情况下操作的原子性和可见性。

AQS在类加载初始化阶段,会执行类中最后部分的静态代码块,完成对类中几个字段的偏移量的初始化,方便后续的CAS操作。

主要都是CAS操作,compare and swap是很好的原子获取值。

AQS源码分析

这是我第一次进行源码分析!

大局看

粗略的看加锁和解锁流程,如下图:

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
// 这里是继承了AQS
private final Sync sync;

//这里前置工作做好了
public ReentrantLock() {
//默认不公平锁
sync = new NonfairSync();
}
//进行构造函数判定是否公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

//使用的锁是调用sync
public void lock() {
sync.lock();
}
//使用另外一种加锁方式
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//解锁方式
public void unlock() {
sync.release(1);
}

}

接下来我们看下Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//这里还是虚拟类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
//对应的lock
abstract void lock();

//加锁逻辑
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();

if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

//释放锁的逻辑
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}

而对应的有公平锁和非公平锁的继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//其实最后还是用的这个Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//实现对应的方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
//进行加锁
acquire(1);
}
//这里很重要!!
/**
* 如果获取到锁,或者重入锁,那么返回true
* 否则返回false
*/
protected final boolean tryAcquire(int acquires) {
//当前线程是否
//在acquire的里面会有这个调用
//看是否是第一次获取还是没有获取
final Thread current = Thread.currentThread();
int c = getState();
//判读是否锁空闲
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

首先我们说说,ReentrantLocksynchronized它们两的区别:

  1. Synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现
  2. Synchronized的锁状态是无法在代码中直接判断的,但是ReentrantLock可以通过lock.isLocked()判断
  3. Synchronized是非公平锁,ReentrantLock是可以是公平也可以是非公平的
  4. Synchronized是不可以被强行中断的,而ReentrantLock通过lock.lockInterruptibly()方法加锁的话,是可以被强行中断的
  5. 在发生异常时Synchronized会自动释放锁(由javac编译时自动实现),而ReentrantLock需要开发者在finally块中释放锁
  6. ReentrantLock获取锁的形式有多种:如立即返回是否成功的tryLock(),以及等待指定时长的获取,更加灵活

加锁

我们先进行加锁的思路走,可以发现无论是公平锁还是非公平锁都是调用acquire(1);进行加锁的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final void acquire(int arg) {
/**
* tryAcquire 是调用公平锁和非公平锁中的代码
* 这部分代码可以拆解 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
* 这是添加进同步等待队列中
* addWaiter(Node.EXCLUSIVE)
* 这里是把本线程进入阻塞
* acquireQueued(Node, arg)
* 这里很重要的是if逻辑是否会进去
* 如果进去的话需要会进行自我中断,这是因为acquireQueue返回true是中断产生!
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果进入这里说明一开始没有获取到锁,进入等待队列中,获取到了锁
//但是获取锁是依靠中断的
//那么由于acquireQueued中我们使用的Thread.interrupted();
//导致中断信号被清空,所以需要再添加上中断信号
selfInterrupt();
}

我们一步步来,先看tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  //其实最后还是用的这个Sync
static final class NonfairSync extends Sync {
/**
* 看的出来由于是非公平锁
* 那么调用Sync的nonfairTryAcquire(acquires)
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

static final class FairSync extends Sync {
//调用这里的方法
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取当前锁状态
int c = getState();
//如果锁是空闲的
if (c == 0) {
//把锁状态由0->acquires
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果锁不空闲 判断此刻拥有锁的线程是否是本线程
//重入锁的原理
else if (current == getExclusiveOwnerThread()) {
//锁状态+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//这里不使用CAS是因为
//仅仅只会有本线程进入锁,不必担心并发问题
setState(nextc);
return true;
}
//如果锁不是空闲,又不是重入
return false;
}
}

很清晰的获取锁的流程【单指公平锁。

接下来我们根据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//接下来我们看向addWaiter
//我们开看
/**
* 这里使用的是同步等待队列
*/
private Node addWaiter(Node mode) {
//new一个节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
/**
* 这里通过尾节点是否为空判断是否需要初始化操作
* 初始化的话直接进入enq函数
*/
if (pred != null) {
//如果不需要初始化节点就会进入此处
//其实和enq里的类似
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
//这里会直接返回的!!
return node;
}
}
enq(node);
return node;
}
//这里是初始化才会进入这里
private Node enq(final Node node) {
for (;;) {
//这里自旋
Node t = tail;
//尾指针为空,那么初始化一个空节点
//使得首尾指针都指向这个空节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//不为空说明我们已经初始化好了
node.prev = t;
//设置尾节点为node
if (compareAndSetTail(t, node)) {
//使得前面节点和node链接
t.next = node;
return t;
}
}
}
}

//前面是把该线程放入同步等待队列
//现在是把该线程进行阻塞
/**
* 注意有个特点
* 如果你是同步等待队列中的第一个线程
* 那么不是直接阻塞,是看看还有机会获得锁吗?
* 如果没有机会才阻塞
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//自旋
final Node p = node.predecessor();
//判断是否是同步等待队列的第一个线程
//如果是的话 会 tryAcquire(arg)
if (p == head && tryAcquire(arg)) {
//因为进入此处
//代表已经获得锁了
//那么需要把该节点从同步等待队列中丢弃
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//由于不是第一个线程,故而直接阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果是被中断唤醒的,那么设为true
interrupted = true;
}
} finally {
//最终会判断是否获取到了锁?
//如果没获取到,说明这个线程走到这里是由于别的问题产生的!
if (failed)
cancelAcquire(node);
}
}

我们已经分析完了从获取锁到如何进入等待队列,到阻塞线程。

其中阻塞线程我们可以在深究一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  //这里会对同步等待队列中的waitStatus
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;
//判断这里是否为-1
if (ws == Node.SIGNAL)
//如果这里是-1,那么就直接返回
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果这里为0,那么我们将其转为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
//这里是把线程阻塞
LockSupport.park(this);
//被唤醒是有多种可能
//比如API唤醒,或者被中断
//如果是被中断唤醒的话,中断信号应该为true
//这里是返回是否被中断
return Thread.interrupted();
}

我们看到了最终是阻塞在parkAndCheckInterrupt

在这个自旋中

waitestate = 0 - > -1 head节点为什么改到-1

因为持有锁的线程T0在释放锁的时候,得判断head节点的waitestate是否!=0,如果!=0成立,会再把waitstate = -1->0

解锁

接下来我们看解锁过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//一.解锁
public void unlock() {
sync.release(1);
}
// 二.释放锁的方法
public final boolean release(int arg) {
// 调用tryRelease判断是否锁空闲
if (tryRelease(arg)) {
// 如果锁空闲,说明可以让其他线程获取锁
Node h = head;
// 如果头节点不为空,并且状态不为0,说明后面节点可以唤醒
if (h != null && h.waitStatus != 0)
// 唤醒头节点后第一个满足条件的节点
unparkSuccessor(h);
//这里说明某个线程被唤醒了
return true;
}
return false;
}
//解锁
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//这里是真正的解锁!!
LockSupport.unpark(s.thread);
}

unparkSuccessor(h);

看得出来这里是把头节点带入函数中,也就是说判断head节点的waitestate是否!=0,如果!=0成立,会再把waitstate = -1->0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

protected final boolean tryRelease(int releases) {
// 状态变换
int c = getState() - releases;
// 当前线程不是锁持有线程,那么就抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果锁状态为空闲
if (c == 0) {
free = true;
//设置锁持有线程为null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}

lockInterruptibly()

lockInterruptibly()也是Lock的一种加锁方式

在普通的LockSupport.park()会判断是否有中断标记,如果有,那么不用阻塞。

利用LockSupport.park()响应中断不会抛出异常的特性,之前的lock方法中,线程在被中断唤醒并且成功抢到锁后,只会修改线程的中断标记为true。

而使用lockInterruptibly方法加锁,一旦线程被中断唤醒,线程状态就会被标记为CANCELLED,从队伍中被剔除并且抛出异常。

话不多说,直接看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
   //标准操作
//注意到这里有抛出中断异常
public void lockInterruptibly() throws InterruptedException {
//照旧调用sync
sync.acquireInterruptibly(1);
}
//开始操作
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//如果被中断了,那么直接抛异常
if (Thread.interrupted())
throw new InterruptedException();
//否则就尝试获取锁
//如果获取不成功,那么进入代码
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//核心
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//把线程加入同步等待队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
//开始自旋!!
for (;;) {
//看头节点
final Node p = node.predecessor();
//如果可以就尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
//这里就是常态了,和lock的那个类似
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//差别在这里,一旦出现中断
//那么直接抛出异常,而不是接着获取锁
throw new InterruptedException();
}
} finally {
//由于中断,所以会把failed是true,会进入if代码中
if (failed)
// 将当前线程节点从队列中剔除
//此时没有获取锁
cancelAcquire(node);
}
}

// 将当前线程节点从队列中剔除的方法
private void cancelAcquire(Node node) {
if (node == null)
return;
// 当前节点中的thread置为null
node.thread = null;

// 获取当前节点的前驱节点
Node pred = node.prev;
// 找到队伍中连续状态为CANCELLED的第一个节点的前驱结点pred
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext为队伍中连续状态为CANCELLED的第一个节点
Node predNext = pred.next;
// 当前节点的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 以下操作把pedfNext到node的所有结点从队中剔除,因为这些结点的状态均为CANCELLED
if (node == tail && compareAndSetTail(node, pred)) {
// 1.如果node是队尾,将pred(predNext的前驱节点)设为队尾
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 2.如果pred不是队头
if (pred != head &&
// 并且pred的状态不为(-1)
((ws = pred.waitStatus) == Node.SIGNAL ||
// 或者pred的状态<=0(0)并且CAS将其替换为-1成功
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 并且pred中的thread不为空
pred.thread != null) {
// 获取node的后继节点next
Node next = node.next;
// 如果next不为空且状态<=0
if (next != null && next.waitStatus <= 0)
// 将pred的后继节点设置为next
compareAndSetNext(pred, predNext, next);
} else {
// 3.如果node是队头,调用unparkSuccessor方法唤醒node后第一个满足条件的节点
unparkSuccessor(node);
}

node.next = node;
}
}


// 唤醒当前节点满足条件的后继节点
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
// 将node节点的状态置为0
compareAndSetWaitStatus(node, ws, 0);

// 获取node的后继节点s
Node s = node.next;
// 如果s为空或者s的状态为CANCELLED
if (s == null || s.waitStatus > 0) {
s = null;
// 找到node后面第一个状态小于0(-1)的节点t
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
// 将t赋值给s
s = t;
}
if (s != null)
// 将s唤醒
LockSupport.unpark(s.thread);
}

简单来说,lock lockInterruptibly 区别在于:

lock 优先考虑获取锁,待获取锁成功后,才响应中断;

lock的中断处理是在acquire方法中加上了中断信号标志而已。

lockInterruptibly 优先考虑响应中断,一旦中断就不再继续获取锁

他的中断处理是我不要锁了,直接把这个线程中断,然后删除种种信息。

偏重点不同。