失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析

Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析

时间:2019-05-18 09:04:35

相关推荐

Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析

自己一个人随便看看源码学习的心得,分享一下啦,不过我觉得还是建议去买本Java并发编程的书来看会比较好点,毕竟个人的理解有限嘛。

独占锁和共享锁

首先先引入这两个锁的概念:

独占锁即同一时刻只有一个线程才能获取到锁,Lock的实现类中ReentrantLock和WriteLock就是独占锁,所以独占锁也叫排它锁;

共享锁是同一时刻多线程能获取到的锁叫共享锁,即ReadLock;在读写锁共有的情况下,会出现共存的情况即读-读共存,读-写、写-写不能共存,他们会产生互斥。

数据结构

打开源码首先就得看下它的所有方法和变量,就会发现,AQS其实用的设计模式是模板模式,先讲一下啥是模板模式吧,直接上代码吧,用代码看会比较直观一点:

public abstract class UseTemplate{//这个定义的三个抽象接口方法public abstract void function1();public abstract void function2();public abstract void function3();//...还有其他属性或方法//这个方法就是模板方法,在继承的子类中实现好抽象方法,然后在调用父类的模板方法public final void runTemplate(){//在父类中写好功能执行的方法即可,也可设置权限,禁止重新覆盖function1();function2();function3();}}

AQS是采用的是一个双向链表队列,在该类中包含了一个Node类,双向链表就是右这个实现的

先了解一下这个静态内部类,以下为源代码(每个属性在下面都有解释):

static final class Node{/** Marker to indicate a node is waiting in shared mode *///用于共享锁的节点属性static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode *///用于独占锁使用属性static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;/*** Status field, taking on only the values:* SIGNAL:The successor of this node is (or will soon be)*blocked (via park), so the current node must*unpark its successor when it releases or*cancels. To avoid races, acquire methods must*first indicate they need a signal,*then retry the atomic acquire, and then,*on failure, block.* CANCELLED: This node is cancelled due to timeout or interrupt.*Nodes never leave this state. In particular,*a thread with cancelled node never again blocks.* CONDITION: This node is currently on a condition queue.*It will not be used as a sync queue node*until transferred, at which time the status*will be set to 0. (Use of this value here has*nothing to do with the other uses of the*field, but simplifies mechanics.)* PROPAGATE: A releaseShared should be propagated to other*nodes. This is set (for head node only) in*doReleaseShared to ensure propagation*continues, even if other operations have*since intervened.* 0:None of the above** The values are arranged numerically to simplify use.* Non-negative values mean that a node doesn't need to* signal. So, most code doesn't need to check for particular* values, just for sign.** The field is initialized to 0 for normal sync nodes, and* CONDITION for condition nodes. It is modified using CAS* (or when possible, unconditional volatile writes).*///当前节点的状态,状态值即上面几个状态值,有相对应的英文解释,就不翻译了volatile int waitStatus;/*** Link to predecessor node that current node/thread relies on* for checking waitStatus. Assigned during enqueuing, and nulled* out (for sake of GC) only upon dequeuing. Also, upon* cancellation of a predecessor, we short-circuit while* finding a non-cancelled one, which will always exist* because the head node is never cancelled: A node becomes* head only as a result of successful acquire. A* cancelled thread never succeeds in acquiring, and a thread only* cancels itself, not any other node.*/volatile Node prev;//前一个节点指向/*** Link to the successor node that the current node/thread* unparks upon release. Assigned during enqueuing, adjusted* when bypassing cancelled predecessors, and nulled out (for* sake of GC) when dequeued. The enq operation does not* assign next field of a predecessor until after attachment,* so seeing a null next field does not necessarily mean that* node is at end of queue. However, if a next field appears* to be null, we can scan prev's from the tail to* double-check. The next field of cancelled nodes is set to* point to the node itself instead of null, to make life* easier for isOnSyncQueue.*/volatile Node next;//下一个节点指向/*** The thread that enqueued this node. Initialized on* construction and nulled out after use.*/volatile Thread thread;//当前节点的线程/*** Link to next node waiting on condition, or the special* value SHARED. Because condition queues are accessed only* when holding in exclusive mode, we just need a simple* linked queue to hold nodes while they are waiting on* conditions. They are then transferred to the queue to* re-acquire. And because conditions can only be exclusive,* we save a field by using special value to indicate shared* mode.*/Node nextWaiter;//使用在Condition上面,即下一个等待节点/*** Returns true if node is waiting in shared mode.*/final boolean isShared() {return nextWaiter == SHARED;}/*** Returns previous node, or throws NullPointerException if null.* Use when predecessor cannot be null. The null check could* be elided, but is present to help the VM.** @return the predecessor of this node*///获取前一个节点final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) {// Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}

了解完这个之后,在AQS类中有三个变量:

/*** Head of the wait queue, lazily initialized. Except for* initialization, it is modified only via method setHead. Note:* If head exists, its waitStatus is guaranteed not to be* CANCELLED.*/private transient volatile Node head;//当前头部节点/*** Tail of the wait queue, lazily initialized. Modified only via* method enq to add new wait node.*/private transient volatile Node tail;//当前尾节点/*** The synchronization state.*/private volatile int state;//拿锁的状态

AQS中还有一个内部类ConditionObject,这个类继承了Condition接口,这个类也有一个队列,不过不是双向队列,单向队列,也是通过Node实现的:

/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;

AQS方法

独占式获取:​ accquire()、​ acquireInterruptibly()、​ tryAcquireNanos()

共享式获取:​ acquireShared()、​ acquireSharedInterruptibly()、​ tryAcquireSharedNanos()

独占式释放锁:​ release()

共享式释放锁:​ releaseShared()

需要子类覆盖的流程方法

​独占式获取: tryAcquire()

​独占式释放: tryRelease()

​共享式获取: tryAcquireShared()

共享式释放: tryReleaseShared()

这个同步器是否处于独占模式 isHeldExclusively()

同步状态state:1)、​ getState():获取当前的同步状态;2)、​ setState(int):设置当前同步状态;3)、​ compareAndSetState(int,int) 使用CAS设置状态,保证状态设置的原子性.

AQS使用方法及流程

需要继承AQS这个类并实现他的方法,我们先自己写一个独占锁的实现方式:

/*** state 当为0的时候表示还没有拿到锁 为1的时候为拿到锁了* @author 26225**/static final class Sync extends AbstractQueuedSynchronizer{/*** */private static final long serialVersionUID = 1L;@Overrideprotected boolean tryAcquire(int arg) {if(getState() == 1)return false;else {/*** 需使用compareAndSetState() CAS原子操作 而不是使用setState(1),因为使用了volatile修饰了state 虽然保证了可见性,但是修改还是得保证原子操作*/if(compareAndSetState(getState(), arg)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}}@Overrideprotected boolean tryRelease(int arg) {if(getState() == 0 )throw new UnsupportedOperationException();setExclusiveOwnerThread(null);setState(arg);return true;}@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}Condition newCondition() {return new ConditionObject();}}private final Sync sync = new Sync();public void lock() {sync.acquire(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock() {return sync.tryAcquire(1);}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {// TODO Auto-generated method stubreturn sync.tryAcquireNanos(1, unit.toNanos(time));}public void unlock() {sync.release(0);}public Condition newCondition() {return sync.newCondition();}

先简单的讲解一下该段代码:

在调用Lock的unlock()方法时,需调用AQS的acquire()方法,我们先看一下这个方法调用:

/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

其实就是先调用我们自己实现的方法,判断拿锁的状态,并且设置这个状态;如果拿到了锁的话,就不用管它,没有拿到的话,就得加入到等待队列中,我们看一下加入等待队列的方法:

/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

判断头部节点是否存在,如果不存在的话就调用enq()方法,判断当前尾节点存不存在,存在的话,则将当前节点通过CAS操作设置成尾节点,如果不存在则将当前线程包装成一个Node对象设置成头节点,并且将头部和尾部设置成同一个

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

这个方法是加入到队列,就是设置当前Node对象的前一个节点指向以及后一个节点指向,并且调用parkAndCheckInterrupt()通过LockSupport将当前线程进入到等待状态。

以上就是调用lock()方法基本流程,现在看一下代用unlock()的基本流程,执行的是AQS的release()

/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

在释放锁的过程中,首先还原之前的拿锁状态,还原之后将队列头部节点的下一个节点通过LockSupport.unpark()进行唤醒。

AbstractOwnableSynchronizer

在JDK1.6之后AQS继承了AbstractOwnableSynchronizer这个类,其实这个类主要是用来记录当前访问的线程:

/*** The current owner of exclusive mode synchronization.*/private transient Thread exclusiveOwnerThread;/*** Sets the thread that currently owns exclusive access.* A {@code null} argument indicates that no thread owns access.* This method does not otherwise impose any synchronization or* {@code volatile} field accesses.* @param thread the owner thread*/protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}/*** Returns the thread last set by {@code setExclusiveOwnerThread},* or {@code null} if never set. This method does not otherwise* impose any synchronization or {@code volatile} field accesses.* @return the owner thread*/protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}

AQS的基本流程差不多分析完了,讲的有问题的话,还请大佬指正!!!

如果觉得《Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。