失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > java并发编程(三)--java中的锁(Lock接口和队列同步器AQS)

java并发编程(三)--java中的锁(Lock接口和队列同步器AQS)

时间:2020-08-17 08:29:53

相关推荐

java并发编程(三)--java中的锁(Lock接口和队列同步器AQS)

一.Synchronized关键字

在Java中,每一个对象都拥有一个锁标记(monitor),也称为监视器,多线程同时访问某个对象时,线程只有获取了该对象的锁才能访问。

在Java中,可以使用synchronized关键字来标记一个方法或者代码块,当某个线程调用该对象的synchronized方法或者访问synchronized代码块时,这个线程便获得了该对象的锁,其他线程暂时无法访问这个方法,只有等待这个方法执行完毕或者代码块执行完毕,这个线程才会释放该对象的锁,其他线程才能执行这个方法或者代码块。

遵循如下规则:

一、当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一个时间内只能有一个线程得到执行。另一个线程必须等待当前线程执行完这个代码块以后才能执行该代码块;

二、当一个线程访问object的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该object中的非synchronized(this)同步代码块;

三、当一个线程访问object的一个synchronized(this)同步代码块时,其他线程对object中所有其它synchronized(this)同步代码块的访问将被阻塞;

四、以上规则对其它对象锁同样适用。

二.Synchronized的缺陷

如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有三种情况:

1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有;

2)线程执行发生异常,此时JVM会让线程自动释放锁;

3)等待唤醒机制里面的wait()方法,在等待的时候立即释放锁,方便其他的线程使用锁。

试想一种情况,如果这个获取锁的线程由于IO或者其他原因(比如调用sleep方法)被阻塞了,但是又没有释放锁,其他线程便一直等待,效率大大降低;

还有一种情况,在多线程读写文件时,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象,这时采用synchronized关键字来实现同步的话,就会导致一个问题:如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。

三.Lock接口

Lock接口(以及相关实现类)是在JavaSE 5之后并发包中新增加的,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。

和synchronized关键字相比,虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却避开了synchronized的缺陷,拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

Lock接口API

publicinterface Lock {void lock();//获取锁,调用该方法当前线程会获取锁,当锁获得后,从该方法返回void lockInterruptibly() throwsInterruptedException;//可中断获取锁,和lock()方法的不同之处在于该方法可响应中断boolean tryLock();//非阻塞获取锁,调用立即返回,获取成功返回true,获取失败返回falseboolean tryLock(long time, TimeUnitunit) throws InterruptedException;//超时的获取锁当前线程在一下3中情况会返回:当前线程在超时时间内获得了锁;当前线程在超时时间内被中断;超时时间结束,返回falsevoid unlock();//释放锁Condition newCondition();//获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,调用后,当前线程释放锁}

四.实现锁的关键--队列同步器(AbstractQueuedSynchronizer)

4.1 AQS介绍

队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,核心思想是基于volatile int state这样的volatile变量,配合Unsafe工具对其原子性的操作来实现对当前锁状态进行修改。

同步器AQS内部的实现是依赖同步队列(一个FIFO的双向队列,其实就是数据结构双向链表)来完成同步状态的管理。当前线程获取同步状态失败时,同步器AQS会将当前线程和等待状态等信息构造成为一个节点(node)加入到同步队列,同时会阻塞当前线程;当同步状态释放的时候,会把首节点中的线程唤醒,使首节点的线程再次尝试获取同步状态。AQS是独占锁和共享锁的实现的父类。

4.2 AQS使用

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态的修改或者访问主要通过同步器提供的3个方法:

getState()获取当前的同步状态;setState(intnewState) 设置当前同步状态;compareAndSetState(intexpect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性。

继承同步器的子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

4.3 锁聚合AQS的设计思想

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中,没有直接继承AQS,而是定义了一个静态内部类去继承AQS,锁的实现聚合同步器,利用同步器实现锁的语义。

可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。

4.4 队列同步器的接口

public abstract class AbstractQueuedSynchronizer extendsAbstractOwnableSynchronizer implements java.io.Serializable { //内部类--节点static final class Node {…}//等待队列的头节点private transient volatile Node head;//等待队列的尾节点private transient volatile Node tail;//同步状态private volatile int state;protected final int getState() { return state;}protected final void setState(int newState) { state = newState;}//同步器几个可重写的方法//独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}//独占式释放同步状态,等待获取同步状态的线程有机会获取同步状态protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}//共享式获取同步状态,返回大于0的值,表示获取成功,反之,获取失败protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}//共享式释放同步状态protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}//当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所占用protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}//同步器提供的模板方法//独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,进入同步队列等待,该方法会调用重写的tryAcquire(int arg)方法void acquire(int arg)//与acquire(int arg)相同,但该方法响应中断void acquireInterruptibly(int arg)//在acquireInterruptibly(int arg)基础上增加超时限制boolean tryAcquireNanos(int arg, long nanosTimeout)//共享式获取同步状态,与独占式的区别在于同一时刻可以有多个线程获取同步状态void acquireShared(int arg)//响应中断void acquireSharedInterruptibly(int arg)//增加超时限制boolean tryAcquireSharedNanos(int arg, long nanosTimeout)//独占式释放同步状态,释放后,将同步队列中第一个节点包含的线程唤醒boolean release(int arg)//共享式释放boolean releaseShared(int arg)//获取等待在同步队列上的线程集合Collection<Thread> getQueuedThread()…}

同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

4.5 《java并发编程的艺术》中自定义同步器的简单实例

package com.secondbook.thread.lock;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;/*** Created by w1992wishes on /6/1.*/public class Mutex implements Lock {// 静态内部类,自定义同步器private static class Sync extends AbstractQueuedSynchronizer{// 是否处于占用状态protected boolean isHeldExclusively(){return getState() == 1;}// 当状态为0的时候获取锁public boolean tryAcquire(int acquires){if (compareAndSetState(0, 1)){setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//释放锁,将当前状态设置为0protected boolean tryRelease(int releases){if (getState() == 0){throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}// 返回一个Condition,每个condition都包含了一个condition队列Condition newCondition(){return new ConditionObject();}}// 仅需要将操作代理到Sync上即可private final Sync sync = new Sync();public void lock() {sync.acquire(1);}public boolean tryLock() {return sync.tryAcquire(1);}public void unlock() {sync.release(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}public Condition newCondition() {return sync.newCondition();}public boolean isLocked(){return sync.isHeldExclusively();}public boolean hasQueuedThreads() {return sync.hasQueuedThreads();}}

独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法中,如果经过CAS设置成功(同步状态设置为1),则代表获取了同步状态,而在tryRelease(int releases)方法中只是将同步状态重置为0。用户使用Mutex时并不会直接和内部同步器的实现打交道,而是调用Mutex提供的方法,在Mutex的实现中,以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args)即可,当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义同步组件的门槛。

五.队列同步器的实现分析

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

5.1 Node

同步队列中的节点(Node)是队列同步器的一个内部类,用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。

static final class Node {//该等待同步的节点处于共享模式static final Node SHARED = new Node();//该等待同步的节点处于独占模式static final Node EXCLUSIVE = null;//等待状态,这个和state是不一样的:有1,0,-1,-2,-3五个值volatile int waitStatus;static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;volatile Node prev;//前驱节点volatile Node next;//后继节点volatile Thread thread;//等待锁的线程//和节点是否共享有关Node nextWaiter;//Returns true if node is waiting in shared modefinal boolean isShared() {return nextWaiter == SHARED;}}

waitStatus五个值的含义:

CANCELLED(1):该节点的线程可能由于超时或被中断而处于被取消(作废)状态,一旦处于这个状态,节点状态将一直处于CANCELLED(作废),因此应该从队列中移除。

SIGNAL(-1):当前节点为SIGNAL时,后继节点会被挂起,因此在当前节点释放锁或被取消之后必须被唤醒(unparking)其后继结点。

CONDITION(-2):该节点的线程处于等待条件状态,在等待队列中,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为0,从等待队列转移到同步队列中,加入到对同步状态的获取中。

PROPAGATE(-3):表示下一次共享式同步状态获取将会无条件传播下去。

0:新加入的节点。

5.2 AQS同步器的结构

节点是构成同步队列的基础,同步器拥有首节点(head)和尾节点(tail)。同步队列的基本结构如下:

同步队列设置尾节点(未获取到锁的线程加入同步队列): 同步器AQS中包含两个节点类型的引用:一个指向头结点的引用(head),一个指向尾节点的引用(tail),当一个线程成功的获取到锁(同步状态),其他线程无法获取到锁,而是被构造成节点(包含当前线程,等待状态)加入到同步队列中等待获取到锁的线程释放锁。这个加入队列的过程,必须要保证线程安全。否则如果多个线程的环境下,可能造成添加到队列等待的节点顺序错误,或者数量不对。因此同步器提供了CAS原子的设置尾节点的方法(保证一个未获取到同步状态的线程加入到同步队列后,下一个未获取的线程才能够加入)。 如下图,设置尾节点:

同步队列设置首节点(原头节点释放锁,唤醒后继节点):同步队列遵循FIFO,头节点是获取锁(同步状态)成功的节点,头节点在释放同步状态的时候,会唤醒后继节点,而后继节点将会在获取锁(同步状态)成功时候将自己设置为头节点。设置头节点是由获取锁(同步状态)成功的线程来完成的,由于只有一个线程能够获取同步状态,则设置头节点的方法不需要CAS保证,只需要将头节点设置成为原首节点的后继节点,并断开原头结点的next引用。如下图,设置首节点:

5.3 独占式同步状态获取

通过调用同步器的acquire(intarg)方法可以获取同步状态,该方法对中断不敏感,也就是

由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同

步队列中移出。

5.3.1同步器的acquire方法源码

public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:

(1)当前线程实现通过tryAcquire()方法尝试获取锁,获取成功的话直接返回,如果尝试失败的话,进入等待队列排队等待,可以保证线程安全(CAS)的获取同步状态。

(2)如果尝试获取锁失败的话,构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法,将节点加入到同步队列的队列尾部。

(3)最后调用acquireQueued(finalNode node, int args)方法,使该节点以死循环的方式获取同步状态,如果获取不到,则阻塞节点中的线程。acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点的时候才能尝试获取锁(同步状态)( p == head && tryAcquire(arg))。

5.3.2addWaiter方法的源码

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}

上述代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。

5.3.3enq方法的源码

private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS变得“串行化”了。

5.3.4 acquireQueued方法源码

节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)。

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,原因是:

1.头结点是成功获取同步状态的节点,而头结点的线程释放锁以后,将唤醒后继节点,后继节点线程被唤醒后要检查自己的前驱节点是否为头结点。

2.维护同步队列的FIFO原则,节点进入同步队列以后,就进入了一个自旋的过程,每个节点(后者说每个线程)都在自省的观察。

节点自旋获取同步状态的行为如图:

独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如图

5.4 独占式锁的释放

/* 1. unlock():unlock()是解锁函数,它是通过AQS的release()函数来实现的。* 在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。* 由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。unlock()在ReentrantLock.java中实现的,源码如下:*/public void unlock() {sync.release(1);}

5.4.1 release方法源码

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;

}

5.4.2 tryRelease方法源码

protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}

5.4.3unparkSuccessor方法源码

// 4. 唤醒头结点的后继节点private void unparkSuccessor(Node node) {//获取头结点(线程)的状态int ws = node.waitStatus;//如果状态<0,设置当前线程对应的锁的状态为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;//解释:Thread to unpark is held in successor, which is normally just the next node. //But if cancelled or apparently(显然) null, traverse backwards(向后遍历) from tail to find the actual(实际的) non-cancelled successor(前继节点).if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}//唤醒后继节点对应的线程if (s != null)LockSupport.unpark(s.thread);}}

5.5 共享式同步状态获取与释放

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(intarg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。

如果觉得《java并发编程(三)--java中的锁(Lock接口和队列同步器AQS)》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。