失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Java多线程学习四十二:有哪些解决死锁问题的策略和哲学家就餐问题

Java多线程学习四十二:有哪些解决死锁问题的策略和哲学家就餐问题

时间:2020-12-22 08:44:33

相关推荐

Java多线程学习四十二:有哪些解决死锁问题的策略和哲学家就餐问题

线上发生死锁应该怎么办

如果线上环境发生了死锁,那么其实不良后果就已经造成了,修复死锁的最好时机在于“防患于未然”,而不是事后补救。就好比发生火灾时,一旦着了大火,想要不造成损失去扑灭几乎已经不可能了。死锁也是一样的,如果线上发生死锁问题,为了尽快减小损失,最好的办法是保存 JVM 信息、日志等“案发现场”的数据,然后立刻重启服务,来尝试修复死锁。为什么说重启服务能解决这个问题呢?因为发生死锁往往要有很多前提条件的,并且当并发度足够高的时候才有可能会发生死锁,所以重启后再次立刻发生死锁的几率并不是很大,当我们重启服务器之后,就可以暂时保证线上服务的可用,然后利用刚才保存过的案发现场的信息,排查死锁、修改代码,最终重新发布。

常见修复策略

避免策略

检测与恢复策略

鸵鸟策略

避免策略最主要的思路就是,优化代码逻辑,从根本上消除发生死锁的可能性。通常而言,发生死锁的一个主要原因是顺序相反的去获取不同的锁。因此我们就演示如何通过调整锁的获取顺序来避免死锁。

转账时避免死锁

我们先来看一下转账时发生死锁的情况。这个例子是一个示意性的,是为了学习死锁所而写的例子,所以和真实的银行系统的设计有很大不同,不过没关系,因为我们主要看的是如何避免死锁,而不是转账的业务逻辑。

(1)发生了死锁

我们的转账系统为了保证线程安全,在转账前需要首先获取到两把锁(两个锁对象),分别是被转出的账户和被转入的账户。如果不做这一层限制,那么在某一个线程修改余额的期间,可能会有其他线程同时修改该变量,可能导致线程安全问题。所以在没有获取到这两把锁之前,是不能对余额进行操作的;只有获取到这两把锁之后,才能进行接下来真正的转账操作。当然,如果要转出的余额大于账户的余额,也不能转账,因为不允许余额变成负数。

而这期间就隐藏着发生死锁的可能,我们来看下代码:

复制代码

publicclassTransferMoneyimplementsRunnable{

intflag;

staticAccounta=newAccount(500);

staticAccountb=newAccount(500);

staticclassAccount{

publicAccount(intbalance){

this.balance=balance;

}

intbalance;

}

@Override

publicvoidrun(){

if(flag==1){

transferMoney(a,b,200);

}

if(flag==0){

transferMoney(b,a,200);

}

}

publicstaticvoidtransferMoney(Accountfrom,Accountto,intamount){

//先获取两把锁,然后开始转账

synchronized(to){

synchronized(from){

if(from.balance-amount<0){

System.out.println("余额不足,转账失败。");

return;

}

from.balance-=amount;

to.balance+=amount;

System.out.println("成功转账"+amount+"元");

}

}

}

publicstaticvoidmain(String[]args)throwsInterruptedException{

TransferMoneyr1=newTransferMoney();

TransferMoneyr2=newTransferMoney();

r1.flag=1;

r2.flag=0;

Threadt1=newThread(r1);

Threadt2=newThread(r2);

t1.start();

t2.start();

t1.join();

t2.join();

System.out.println("a的余额"+a.balance);

System.out.println("b的余额"+b.balance);

}

}

在代码中,首先定义了 int 类型的 flag,它是一个标记位,用于控制不同线程执行不同逻辑。然后建了两个 Account 对象 a 和 b,代表账户,它们最初都有 500 元的余额。

我们接下来看run方法,该方法里面会根据 flag 值,来决定传入 transferMoney 方法的参数的顺序,如果 flag 为 1,那么就代表从 a 账户转给 b 账户 200元;相反,如果 flag 为 0,那么它就从 b 账户转给 a 账户 200 元。

我们再来看一下 transferMoney 转账方法,这个方法会先尝试获取两把锁,也就是 synchronized (to) 和 synchronized (from)。当都获取成功之后,它首先会判断余额是不是足以转出本次的转账金额,如果不足的话,则直接用return 来退出;如果余额足够,就对转出账户进行减余额,对被转入的账户加余额,最后打印出“成功转账 XX 元”的消息。

在主函数中我们新建了两个 TransferMoney 对象,并且把它们的 flag 分别设置为 1 和 0,然后分别传入两个线程中,并把它们都启动起来,最后,打印出各自的余额。

执行结果如下:

复制代码

成功转账200元

成功转账200元

a的余额500

b的余额500

代码是可以正常执行的,打印结果也是符合逻辑的。此时并没有发生死锁,因为每个锁的持有时间很短,同时释放也很快,所以在低并发的情况下,不容易发生死锁的现象。那我们对代码做一些小调整,让它发生死锁。

如果我们在两个 synchronized 之间加上一个 Thread.sleep(500),来模拟银行网络迟延等情况,那么 transferMoney 方法就变为:

复制代码

publicstaticvoidtransferMoney(Accountfrom,Accountto,intamount){

//先获取两把锁,然后开始转账

synchronized(to){

try{

Thread.sleep(500);

}catch(InterruptedExceptione){

e.printStackTrace();

}

synchronized(from){

if(from.balance-amount<0){

System.out.println("余额不足,转账失败。");

return;

}

from.balance-=amount;

to.balance+=amount;

System.out.println("成功转账"+amount+"元");

}

}

}

可以看到 transferMoney 的变化就在于,在两个 synchronized 之间,也就是获取到第一把锁后、获取到第二把锁前,我们加了睡眠 500 毫秒的语句。此时再运行程序,会有很大的概率发生死锁,从而导致控制台中不打印任何语句,而且程序也不会停止。

我们分析一下它为什么会发生死锁,最主要原因就是,两个不同的线程获取两个锁的顺序是相反的(第一个线程获取的这两个账户和第二个线程获取的这两个账户顺序恰好相反,第一个线程的“转出账户”正是第二个线程的“转入账户”),所以我们就可以从这个“相反顺序”的角度出发,来解决死锁问题。

(2)实际上不在乎获取锁的顺序

经过思考,我们可以发现,其实转账时,并不在乎两把锁的相对获取顺序。转账的时候,我们无论先获取到转出账户锁对象,还是先获取到转入账户锁对象,只要最终能拿到两把锁,就能进行安全的操作。所以我们来调整一下获取锁的顺序,使得先获取的账户和该账户是“转入”或“转出”无关,而是使用 HashCode 的值来决定顺序,从而保证线程安全。

修复之后的 transferMoney 方法如下:

复制代码

publicstaticvoidtransferMoney(Accountfrom,Accountto,intamount){

intfromHash=System.identityHashCode(from);

inttoHash=System.identityHashCode(to);

if(fromHash<toHash){

synchronized(from){

synchronized(to){

if(from.balance-amount<0){

System.out.println("余额不足,转账失败。");

return;

}

from.balance-=amount;

to.balance+=amount;

System.out.println("成功转账"+amount+"元");

}

}

}elseif(fromHash>toHash){

synchronized(to){

synchronized(from){

if(from.balance-amount<0){

System.out.println("余额不足,转账失败。");

return;

}

from.balance-=amount;

to.balance+=amount;

System.out.println("成功转账"+amount+"元");

}

}

}

}

可以看到,我们会分别计算出这两个 Account 的 HashCode,然后根据 HashCode 的大小来决定获取锁的顺序。这样一来,不论是哪个线程先执行,不论是转出还是被转入,它获取锁的顺序都会严格根据 HashCode 的值来决定,那么大家获取锁的顺序就一样了,就不会出现获取锁顺序相反的情况,也就避免了死锁。

(3)有主键就更安全、方便

下面我们看一下用主键决定锁获取顺序的方式,它会更加的安全方便。刚才我们使用了 HashCode 作为排序的标准,因为 HashCode 比较通用,每个对象都有,不过这依然有极小的概率会发生 HashCode 相同的情况。在实际生产中,需要排序的往往是一个实体类,而一个实体类一般都会有一个主键 ID,主键 ID 具有唯一、不重复的特点,所以如果我们这个类包含主键属性的话就方便多了,我们也没必要去计算 HashCode,直接使用它的主键 ID 来进行排序,由主键 ID 大小来决定获取锁的顺序,就可以确保避免死锁。

以上我们介绍了死锁的避免策略。

检测与恢复策略

下面我们再来看第二个策略,那就是检测与恢复策略。

什么是死锁检测算法

它和之前避免死锁的策略不一样,避免死锁是通过逻辑让死锁不发生,而这里的检测与恢复策略,是先允许系统发生死锁,然后再解除。例如系统可以在每次调用锁的时候,都记录下来调用信息,形成一个“锁的调用链路图”,然后隔一段时间就用死锁检测算法来检测一下,搜索这个图中是否存在环路,一旦发生死锁,就可以用死锁恢复机制,比如剥夺某一个资源,来解开死锁,进行恢复。所以它的思路和之前的死锁避免策略是有很大不同的。

在检测到死锁发生后,如何解开死锁呢?

方法1——线程终止

第一种解开死锁的方法是线程(或进程,下同)终止,在这里,系统会逐个去终止已经陷入死锁的线程,线程被终止,同时释放资源,这样死锁就会被解开。

当然这个终止是需要讲究顺序的,一般有以下几个考量指标。

(1)优先级

一般来说,终止时会考虑到线程或者进程的优先级,先终止优先级低的线程。例如,前台线程会涉及界面显示,这对用户而言是很重要的,所以前台线程的优先级往往高于后台线程。

(2)已占用资源、还需要的资源

同时也会考虑到某个线程占有的资源有多少,还需要的资源有多少?如果某线程已经占有了一大堆资源,只需要最后一点点资源就可以顺利完成任务,那么系统可能就不会优先选择终止这样的线程,会选择终止别的线程来优先促成该线程的完成。

(3)已经运行时间

另外还可以考虑的一个因素就是已经运行的时间,比如当前这个线程已经运行了很多个小时,甚至很多天了,很快就能完成任务了,那么终止这个线程可能不是一个明智的选择,我们可以让那些刚刚开始运行的线程终止,并在之后把它们重新启动起来,这样成本更低。

这里会有各种各样的算法和策略,我们根据实际业务去进行调整就可以了。

方法2——资源抢占

第二个解开死锁的方法就是资源抢占。其实,我们不需要把整个的线程终止,而是只需要把它已经获得的资源进行剥夺,比如让线程回退几步、 释放资源,这样一来就不用终止掉整个线程了,这样造成的后果会比刚才终止整个线程的后果更小一些,成本更低。

当然这种方式也有一个缺点,那就是如果算法不好的话,我们抢占的那个线程可能一直是同一个线程,就会造成线程饥饿。也就是说,这个线程一直被剥夺它已经得到的资源,那么它就长期得不到运行。

以上就是死锁的检测与恢复策略。

鸵鸟策略

下面我们再来看一下鸵鸟策略,鸵鸟策略以鸵鸟命名,因为鸵鸟有一个特点,就是遇到危险的时候,它会把头埋到沙子里,这样一来它就看不到危险了。

鸵鸟策略的意思就是,如果我们的系统发生死锁的概率不高,并且一旦发生其后果不是特别严重的话,我们就可以选择先忽略它。直到死锁发生的时候,我们再人工修复,比如重启服务,这并不是不可以的。如果我们的系统用的人比较少,比如是内部的系统,那么在并发量极低的情况下,它可能几年都不会发生死锁。对此我们考虑到投入产出比,自然也没有必要去对死锁问题进行特殊的处理,这是需要根据我们的业务场景进行合理选择的。

=======================哲学家就餐问题===========

问题描述

哲学家就餐问题也被称为刀叉问题,或者吃面问题。我们先来描述一下这个问题所要说明的事情,这个问题如下图所示:

有 5 个哲学家,他们面前都有一双筷子,即左手有一根筷子,右手有一根筷子。当然,这个问题有多个版本的描述,可以说是筷子,也可以说是一刀一叉,因为吃牛排的时候,需要刀和叉,缺一不可,也有说是用两把叉子来吃意大利面。这里具体是刀叉还是筷子并不重要,重要的是必须要同时持有左右两边的两个才行,也就是说,哲学家左手要拿到一根筷子,右手也要拿到一根筷子,在这种情况下哲学家才能吃饭。为了方便理解,我们选取和我国传统最贴近的筷子来说明这个问题。

为什么选择哲学家呢?因为哲学家的特点是喜欢思考,所以我们可以把哲学家一天的行为抽象为思考,然后吃饭,并且他们吃饭的时候要用一双筷子,而不能只用一根筷子。

1. 主流程

我们来看一下哲学家就餐的主流程。哲学家如果想吃饭,他会先尝试拿起左手的筷子,然后再尝试拿起右手的筷子,如果某一根筷子被别人使用了,他就得等待他人用完,用完之后他人自然会把筷子放回原位,接着他把筷子拿起来就可以吃了(不考虑卫生问题)。这就是哲学家就餐的最主要流程。

2. 流程的伪代码

我们来看一下这个流程的伪代码,如下所示:

复制代码

while(true){

//思考人生、宇宙、万物...

think();

//思考后感到饿了,需要拿筷子开始吃饭

pick_up_left_chopstick();

pick_up_right_chopstick();

eat();

put_down_right_chopstick();

put_down_left_chopstick();

//吃完饭后,继续思考人生、宇宙、万物...

}

while(true) 代表整个是一个无限循环。在每个循环中,哲学家首先会开始思考,思考一段时间之后(这个时间长度可以是随机的),他感到饿了,就准备开始吃饭。在吃饭之前必须先拿到左手的筷子,再拿到右手的筷子,然后才开始吃饭;吃完之后,先放回右手的筷子,再放回左手的筷子;由于这是个 while 循环,所以他就会继续思考人生,开启下一个循环。这就是整个过程。

有死锁和资源耗尽的风险

这里存在什么风险呢?就是发生死锁的风险。如下面的动画所示:

根据我们的逻辑规定,在拿起左手边的筷子之后,下一步是去拿右手的筷子。大部分情况下,右边的哲学家正在思考,所以当前哲学家的右手边的筷子是空闲的,或者如果右边的哲学家正在吃饭,那么当前的哲学家就等右边的哲学家吃完饭并释放筷子,于是当前哲学家就能拿到了他右手边的筷子了。

但是,如果每个哲学家都同时拿起左手的筷子,那么就形成了环形依赖,在这种特殊的情况下,每个人都拿着左手的筷子,都缺少右手的筷子,那么就没有人可以开始吃饭了,自然也就没有人会放下手中的筷子。这就陷入了死锁,形成了一个相互等待的情况。代码如下所示:

复制代码

publicclassDiningPhilosophers{

publicstaticclassPhilosopherimplementsRunnable{

privateObjectleftChopstick;

privateObjectrightChopstick;

publicPhilosopher(ObjectleftChopstick,ObjectrightChopstick){

this.leftChopstick=leftChopstick;

this.rightChopstick=rightChopstick;

}

@Override

publicvoidrun(){

try{

while(true){

doAction("思考人生、宇宙、万物、灵魂...");

synchronized(leftChopstick){

doAction("拿起左边的筷子");

synchronized(rightChopstick){

doAction("拿起右边的筷子");

doAction("吃饭");

doAction("放下右边的筷子");

}

doAction("放下左边的筷子");

}

}

}catch(InterruptedExceptione){

e.printStackTrace();

}

}

privatevoiddoAction(Stringaction)throwsInterruptedException{

System.out.println(Thread.currentThread().getName()+""+action);

Thread.sleep((long)(Math.random()*10));

}

}

publicstaticvoidmain(String[]args){

Philosopher[]philosophers=newPhilosopher[5];

Object[]chopsticks=newObject[philosophers.length];

for(inti=0;i<chopsticks.length;i++){

chopsticks[i]=newObject();

}

for(inti=0;i<philosophers.length;i++){

ObjectleftChopstick=chopsticks[i];

ObjectrightChopstick=chopsticks[(i+1)%chopsticks.length];

philosophers[i]=newPhilosopher(rightChopstick,leftChopstick);

newThread(philosophers[i],"哲学家"+(i+1)+"号").start();

}

}

}

在这个代码中,有一个内部类叫作 Philosophers,是哲学家的意思。在创建这个哲学家实例,也就是调用构造方法的时候,需要传入两个参数,分别是左手的筷子和右手的筷子。Philosophers 类实现了 Runnable 接口,在它的 run 方法中是无限循环,每个循环中,会多次调用 doAction 方法。在这里的 doAction 方法的定义在下方,这个方法实际上就是把当前输入的字符串给打印出来,并且去进行一段随机时间的休眠。

这里的随机休眠是为了模拟真实的场景,因为每个哲学家的思考、吃饭和拿筷子的时间会各不相同。同样,在线上的实际场景中,这个时间也肯定是不相同的,所以我们用随机数来模拟。

我们继续看 while 中的代码,哲学家会首先思考人生,然后获取左边筷子这把锁,并打印出“拿起左边的筷子”;接着他去获取右边筷子这把锁,并会打印出“拿起右边的筷子”、“吃饭”,并且“放下右边的筷子”,接下来,他会退出右边筷子的这个同步代码块,释放锁;最后打印出“放下左边的筷子”,随即退出左边筷子的这个同步代码块,释放锁。这样就完成了这个过程,当然他会继续进行 while 循环。

最后我们来看一下 main 方法,main 方法中新建了 5 个哲学家,并按照哲学家的数量去新建对应数量的筷子,并且把它们都初始化出来。筷子只用于充当锁对象,所以就把它定义为一个普通的 Object 类型。

接下来,我们需要初始化哲学家。初始化哲学家需要两个入参,分别是左手筷子和右手筷子,在这里会选取之前定义好的 chopsticks 数组中的对象来给 leftChopstick 和 rightChopstick 进行合理的赋值。当然有一种特殊情况,那就是考虑到最后一个哲学家右手的筷子,由于它已经转完了桌子的一圈,所以他实际上拿的还是第一根筷子,在这里会进行一个取余操作。

创建完哲学家之后,就会把它作为 Runnable 对象,传入 Thread,创建一个线程并启动。在 for 循环执行完毕之后,5 个哲学家都启动了起来,于是他们就开始思考并且吃饭。其中一种可能的执行结果如下所示:

复制代码

哲学家1号思考人生、宇宙、万物...

哲学家3号思考人生、宇宙、万物...

哲学家2号思考人生、宇宙、万物...

哲学家4号思考人生、宇宙、万物...

哲学家5号思考人生、宇宙、万物...

哲学家4号拿起左边的筷子

哲学家5号拿起左边的筷子

哲学家1号拿起左边的筷子

哲学家3号拿起左边的筷子

哲学家2号拿起左边的筷子

哲学家 1、3、2、4、5 几乎同时开始思考,然后,假设他们思考的时间比较相近,于是他们都在几乎同一时刻想开始吃饭,都纷纷拿起左手的筷子,这时就陷入了死锁状态,没有人可以拿到右手的筷子,也就没有人可以吃饭,于是陷入了无穷等待,这就是经典的哲学家就餐问题。

多种解决方案

对于这个问题我们该如何解决呢?有多种解决方案,这里我们讲讲其中的几种。前面我们讲过,要想解决死锁问题,只要破坏死锁四个必要条件的任何一个都可以。

1. 服务员检查

第一个解决方案就是引入服务员检查机制。比如我们引入一个服务员,当每次哲学家要吃饭时,他需要先询问服务员:我现在能否去拿筷子吃饭?此时,服务员先判断他拿筷子有没有发生死锁的可能,假如有的话,服务员会说:现在不允许你吃饭。这是一种解决方案。

2. 领导调节

我们根据上一讲的死锁检测和恢复策略,可以引入一个领导,这个领导进行定期巡视。如果他发现已经发生死锁了,就会剥夺某一个哲学家的筷子,让他放下。这样一来,由于这个人的牺牲,其他的哲学家就都可以吃饭了。这也是一种解决方案。

3. 改变一个哲学家拿筷子的顺序

我们还可以利用死锁避免策略,那就是从逻辑上去避免死锁的发生,比如改变其中一个哲学家拿筷子的顺序。我们可以让 4 个哲学家都先拿左边的筷子再拿右边的筷子,但是有一名哲学家与他们相反,他是先拿右边的再拿左边的,这样一来就不会出现循环等待同一边筷子的情况,也就不会发生死锁了。

死锁解决

我们把“改变一个哲学家拿筷子的顺序”这件事情用代码来写一下,修改后的 main 方法如下:

复制代码

publicstaticvoidmain(String[]args){

Philosopher[]philosophers=newPhilosopher[5];

Object[]chopsticks=newObject[philosophers.length];

for(inti=0;i<chopsticks.length;i++){

chopsticks[i]=newObject();

}

for(inti=0;i<philosophers.length;i++){

ObjectleftChopstick=chopsticks[i];

ObjectrightChopstick=chopsticks[(i+1)%chopsticks.length];

if(i==philosophers.length-1){

philosophers[i]=newPhilosopher(rightChopstick,leftChopstick);

}else{

philosophers[i]=newPhilosopher(leftChopstick,rightChopstick);

}

newThread(philosophers[i],"哲学家"+(i+1)+"号").start();

}

}

在这里最主要的变化是,我们实例化哲学家对象的时候,传入的参数原本都是先传入左边的筷子再传入右边的,但是当我们发现他是最后一个哲学家的时候,也就是if (i == philosophers.length - 1) ,在这种情况下,我们给它传入的筷子顺序恰好相反,这样一来,他拿筷子的顺序也就相反了,他会先拿起右边的筷子,再拿起左边的筷子。那么这个程序运行的结果,是所有哲学家都可以正常地去进行思考和就餐了,并且不会发生死锁。

引用:/course/courseInfo.htm?courseId=16#/detail/pc?id=309

如果觉得《Java多线程学习四十二:有哪些解决死锁问题的策略和哲学家就餐问题》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。