什么是AQS Java并发编程核心在于java.concurrent.util
包,而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer
简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。
我们常用的各种同步组件或者锁都是基于这个框架实现的,一般都是通过定义内部类Sync
继承AQS将同步器所有调用都映射到Sync对应的方法 。
AQS的特性 首先大家需要简单了解AQS中的一些特性。
三大核心原理
自旋,LocksSuport, CAS,queue队列
资源状态 AQS中定义了一个状态变量,用于描述当前资源的状态:
1 2 3 4 5 private volatile int state;
0:表示当前资源可用,比如锁已被某个线程占用
1:表示当前资源不可用
资源共享方式 AQS定义两种资源共享方式:
Exclusive(独占):只有一个线程能执行,如ReentrantLock
。在独享方式下,AQS的父类AbstractOwnableSynchronizer
中的包含一个字段,用于指向独享资源的线程:
1 2 private transient Thread exclusiveOwnerThread;
Share(共享):多个线程可以同时执行,如Semaphore
和CountDownLatch
。
两种队列 不同的自定义同步器的特性不同,因此争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
AQS中主要包括以下两种队列:
同步等待队列 AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表 数据结构的队列,是FIFO先入先出 线程等待队列。
同步器依赖内部的同步队列来完成同步状态的管理,当前线程获取资源失败时,同步器会将当前线程以及等待状态等信息构造成称为一个节点(Node
)并将其加入同步队列,同时会阻塞当前线程,当同步资源释放时,会把首节点中的线程唤醒,使其再次尝试获取同步资源。链表的结点是AQS中的内部类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException (); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
一个CLH队列的结构大致如下:
其头节点和尾结点对应AQS的两个字段:
1 2 3 4 5 6 7 private transient volatile Node head;private transient volatile Node tail;
条件等待队列 与CLH队列不同,Condition是一个多线程间协调通信的工具类,使得某个或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁。AQS中条件等待队列的结构如下图所示:
与CLH队列相比,条件等待队列是一个单向链表,并且它的所有节点都不为空(CLH中的头节点为空)。条件等待队列同样也是基于Node
构建的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static final class Node { Node nextWaiter; } public class ConditionObject implements Condition , java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; }
多线程安全性 AQS如何保证多线程并行下操作的安全性?AQS中使用了volatile
关键字修饰变量,并且通过CAS原子操作对这些volatile
变量进行修改,保证了多线程情况下操作的原子性和可见性。
AQS在类加载初始化阶段,会执行类中最后部分的静态代码块,完成对类中几个字段的偏移量 的初始化,方便后续的CAS操作。
主要都是CAS操作,compare and swap是很好的原子获取值。
AQS源码分析 这是我第一次进行源码分析!
大局看 粗略的看加锁和解锁流程,如下图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class ReentrantLock implements Lock , java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L ; private final Sync sync; public ReentrantLock () { sync = new NonfairSync (); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); } public void lock () { sync.lock(); } public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public void unlock () { sync.release(1 ); } }
接下来我们看下Sync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L ; abstract void lock () ; final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } }
而对应的有公平锁和非公平锁的继承
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
首先我们说说,ReentrantLock
和synchronized
它们两的区别:
Synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现
Synchronized的锁状态是无法在代码中直接判断的,但是ReentrantLock可以通过lock.isLocked()
判断
Synchronized是非公平锁,ReentrantLock是可以是公平也可以是非公平的
Synchronized是不可以被强行中断的,而ReentrantLock通过lock.lockInterruptibly()
方法加锁的话,是可以被强行中断的
在发生异常时Synchronized会自动释放锁(由javac编译时自动实现),而ReentrantLock需要开发者在finally 块中释放锁
ReentrantLock获取锁的形式有多种:如立即返回是否成功的tryLock(),以及等待指定时长的获取,更加灵活
加锁 我们先进行加锁的思路走,可以发现无论是公平锁还是非公平锁都是调用acquire(1);
进行加锁的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我们一步步来,先看tryAcquire
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 static final class NonfairSync extends Sync { protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
很清晰的获取锁的流程【单指公平锁。
接下来我们根据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
我们已经分析完了从获取锁到如何进入等待队列,到阻塞线程。
其中阻塞线程我们可以在深究一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
我们看到了最终是阻塞在parkAndCheckInterrupt
在这个自旋中
waitestate = 0 - > -1
head节点为什么改到-1
因为持有锁的线程T0在释放锁的时候,得判断head节点的waitestate
是否!=0,
如果!=0
成立,会再把waitstate = -1->0
解锁 接下来我们看解锁过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
unparkSuccessor(h);
看得出来这里是把头节点带入函数中,也就是说判断head节点的waitestate
是否!=0
,如果!=0
成立,会再把waitstate = -1->0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L ; protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } }
lockInterruptibly() lockInterruptibly()
也是Lock
的一种加锁方式
在普通的LockSupport.park()会判断是否有中断标记,如果有,那么不用阻塞。
利用LockSupport.park()
响应中断不会抛出异常的特性,之前的lock方法中,线程在被中断唤醒并且成功抢到锁后,只会修改线程的中断标记为true。
而使用lockInterruptibly方法加锁,一旦线程被中断唤醒,线程状态就会被标记为CANCELLED
,从队伍中被剔除并且抛出异常。
话不多说,直接看源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } } private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
简单来说,lock
与 lockInterruptibly
区别在于:
lock
优先考虑获取锁,待获取锁成功后,才响应中断;
lock
的中断处理是在acquire
方法中加上了中断信号标志而已。
而lockInterruptibly
优先考虑响应中断,一旦中断就不再继续获取锁
他的中断处理是我不要锁了,直接把这个线程中断,然后删除种种信息。
偏重点不同。