失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Thinking in Java----------并发篇章

Thinking in Java----------并发篇章

时间:2021-01-08 16:50:12

相关推荐

Thinking in Java----------并发篇章

去年已经看过这本书了,今年回看这本书发现这本书内容跟没看过一样,便重新进行阅览并且对其中的代码进行编写以及课后练习的习题做一个记录。(自我认识:对Java的并发了解并不透彻,并且还是一知半解的状态,大体情况只停留在理论阶段)

关键字:Thread(sleep,join,yield,Object.wait),Runnable,Callable,JUC,volatile,线程池(Executors),优先级(几乎没什么用不同操作系统有所不同),共享资源,原子类,临界区,线程中断。

个人理解:先编写完一个业务流程的顺序执行过程后转线程,因为一个线程就是在进程中的一个单一的顺序控制流。

线程驱动任务,那么定义任务实现Runnable接口后重写Run方法(ListOFF发射之前的倒计时方法)

public class LiftOff implements Runnable{protected int countDown = 10;//defaultprivate static int taskCount = 0;private final int id = taskCount++;public LiftOff() {}public LiftOff(int countDown) {this.countDown = countDown;}public String status(){return "#" + id + "(" + (countDown > 0 ? countDown : "LiftOff") + "), ";}@Overridepublic void run() {while (countDown-- > 0){System.out.print(status());Thread.yield();}}}

ListOff方法中有一个技巧taskCount是静态变量伴随着Class类,并且id是final修饰经过初始化后不可再进行更改,用来标识不同的任务。

public class MainThread {public static void main(String[] args) {for (int i = 0; i < 2; i++) {Thread t = new Thread(new LiftOff());t.start();}System.out.println("Waiting For ListOff");}}

会出现不同线程交叉运行的结果,这是由于线程只要获得CPU执行权就可以运行,并且方法没有进行同步操作。

线程池Executors

有返回值(Future)使用exec.submit()持有该任务的上下文,无返回值直接执行exec.execute()

在讲线程池之前要对线程池的操作原理以及线程池的几个参数进行讲解一下

①一个任务提交到了线程池,首先判断是否少于核心线程数如果少则创建一个线程执行任务否则进入到②判断阻塞队列是否已满,如果没满则进入阻塞队列否则进入③

③如果当前线程数没大于最大线程数那么就创建一个线程,如果大于的话进行删除策略

注:每次线程池在开启以后要手动进行关闭,否则会出现内存泄漏问题

1.shutdown():线程池中线程状态为shutdown,并且中断所有没有执行任务的线程

2.shutdownNow():线程池的线程状态为stop,停止正在执行或者暂停的任务线程

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)

参数的类型可以查看Java全栈知识点

练习:实现一个Runnable,在run()内部打印一个消息,然后调用yield()。重复操作三次,然后从run()中返回。在构造器中放置一条启动消息,并且防止一条在任务终止时的关闭消息。使用线程池创建大量任务并驱动它们

public class task implements Runnable{private static int taskCount = 0;private final int id = taskCount++;public task() {System.out.println(id + "task is construct");}@Overridepublic void run() {for (int i = 0; i < 3; i++) {System.out.println(id + "task is running");Thread.yield();}System.out.println(id + " task compelte");}}public class CacheThreadPool {public static void main(String[] args) {ExecutorService threadpool = Executors.newSingleThreadExecutor();for (int i = 0; i < 5; i++) {threadpool.execute(new task());}threadpool.shutdown();}}

解决共享资源的竞争

场景:你坐在桌边手拿叉子,正要去叉盘子中的最后一片食物,当你的叉子就要够着它时,这片食物突然消失了,因为你的线程被挂起来,而另一个餐者进入并吃掉了它。这正是你在编写并发程序时需要处理的问题,对于并发工作,你需要同步方式来防止两个任务访问相同的资源

public class data implements Runnable{private int x = 10;private int y = 0;public synchronized void plusXminusY(){++x;Thread.yield();--y;System.out.println(x +" " +y);}@Overridepublic String toString() {return "data{" +"x=" + x +", y=" + y +'}';}@Overridepublic void run() {plusXminusY();}}public class Main {public static void main(String[] args) {data d = new data();for (int i = 0; i < 6; i++) {new Thread(d).start();}}}

使用synchronized方法进行并发同步的注意点

① 使用并发时域设置为private,否则synchronized无法防止其他任务直接访问域

② 多个并发任务要共享同一把锁,(针对每一个类都有一个类锁所以synchronized static方法可以在类的方位防止对static数据的并发访问)

public class AtomicityTest implements Runnable{private int i = 0;public int getValue(){return i;}private synchronized void evenIncrement(){i++;i++;}@Overridepublic void run() {while (true){evenIncrement();}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();AtomicityTest at = new AtomicityTest();exec.execute(at);while (true){int val = at.getValue();if(val % 2 != 0){System.out.println(val);System.exit(0);}}}}

上述程序几点问题

①即使evenIncrement上了同步,return i当中也是原子性操作但是只要涉及到对同步资源域操作就上必须synchronized,导致程序在加到1时候,main主程序获取到i=1以后那么就退出程序了。

② 并且i也不是volatile存在内存可见性问题

注:对volatile修饰的变量是直接针对内存进行修改,而没有被缓存

原子类(Atomic)

使用Atomic原子类对上述例子进行改写,底层原子类通过CAS乐观锁机制对值进行修改,那么就无需显示上锁,并且设置一个Timer,让程序5s后结束。

/*** @author Lyyyys* @version 1.0* @date /10/3 14:34*/public class AtomicIntegerTest implements Runnable{private AtomicInteger i = new AtomicInteger(0);public int getValue(){return i.get();}private void evenIncrement(){i.getAndAdd(2);}@Overridepublic void run() {while (true){evenIncrement();}}public static void main(String[] args) {new Timer().schedule(new TimerTask() {@Overridepublic void run() {System.out.println("Aborting");System.exit(0);}},5000);ExecutorService exec = Executors.newCachedThreadPool();AtomicIntegerTest at = new AtomicIntegerTest();exec.execute(at);while (true){int val = at.getValue();if(val % 2 != 0){System.out.println(val);System.exit(0);}}}}

临界区

只是希望防止多个线程同时访问方法内部的部分代码而不是防止访问整个方法,使用同步代码块进行抽离出来的代码块称为临界区

public class PairManager1 extends PairManager{@Overridepublic synchronized void increment() {p.incrementX();p.incrementY();store(getPair());}}//临界区使用同步代码块,多个任务对访问对象的时间性能提升public class PairManager2 extends PairManager{@Overridepublic void increment() {Pair temp;synchronized (this){p.incrementX();p.incrementY();temp = getPair();}store(temp);}}//显示lockpublic class PairManager3 extends PairManager{private ReentrantLock lock = new ReentrantLock();@Overridepublic void increment() {lock.lock();try {p.incrementX();p.incrementY();store(getPair());} finally {lock.unlock();}}}

线程中断

只发生在任务将要进入阻塞状态又或者是线程已经进入阻塞状态,除了不可中断的I/O或被阻塞的synchronized。

方法:

①ReentrantLock搭配lock.lockinterruptly使用

/*** @author Lyyyys* @version 1.0* @date /10/5 14:02*/public class Blocked implements Runnable{BlockedMutex blocked = new BlockedMutex();@Overridepublic void run() {System.out.println("Waiting for f() in BlockedMutex");blocked.f();System.out.println("Broken out of blocked call");}}public class BlockedMutex {private Lock lock = new ReentrantLock();public BlockedMutex(){lock.lock();}public void f(){try {lock.lockInterruptibly();System.out.println("lock acquired in f()");} catch (InterruptedException e) {System.out.println("Interrupted from lock acquisition in f()");}}}public class Interrupting {public static void main(String[] args) throws InterruptedException {Thread t = new Thread(new Blocked());t.start();Thread.sleep(1);System.out.println("Issuing t.interrupt()");t.interrupt();}}

② interrupt搭配interrupted使用,不仅可以检测中断状态,还可以清除中断状态

/*** @author Lyyyys* @version 1.0* @date /10/5 13:33*/public class Nontask {public static void test(){if(Thread.interrupted()){System.out.println("Thread interrupted");}}}

③ interrupt搭配异常来进行使用

④ 线程池当中使用shutdown或者submit中断线程

/*** @author Lyyyys* @version 1.0* @date /10/5 13:33*/public class Nontask {public static void test(){try {Thread.sleep(300);} catch (InterruptedException e) {System.out.println("Sleep Interrupted");}finally {System.out.println("Good Bye");}}}public class Worker implements Runnable{@Overridepublic void run() {Nontask.test();}}public class Ex18 {public static void main(String[] args) {//method1Thread t = new Thread(new Worker());t.start();t.interrupt();//method2ExecutorService exec = Executors.newSingleThreadExecutor();exec.execute(new Worker());exec.shutdownNow();//method3ExecutorService exec1 = Executors.newSingleThreadExecutor();Future<?> f = exec1.submit(new Worker());try {Thread.sleep(100);} catch (InterruptedException e) {System.out.println("Sleep interrupted in main");}f.cancel(true);exec1.shutdown();}}

线程通信

wait(),notify(),notifyAll(),基于基类Object当中的方法,并且只能在同步方法或者同步代码块当中。notifyAll()唤醒的线程是只有等待这个锁的任务才会被唤醒。

场景:一个餐厅里面有一个厨师以及一个服务员,服务员必须等待厨师准备好膳食才可以通知服务员进行上菜。(生产者消费者模型wait与notify)

/*** @author Lyyyys* @version 1.0* @date /10/5 16:54*/public class Meal {private final int orderNum;public Meal(int orderNum) {this.orderNum = orderNum;}@Overridepublic String toString() {return "Meal " + orderNum;}}public class Chef implements Runnable{private Restaurant restaurant;private int count = 0;public Chef(Restaurant restaurant) {this.restaurant = restaurant;}@Overridepublic void run() {try {while (!Thread.interrupted()){synchronized (this){while (restaurant.meal != null)wait();}if(++count == 10){System.out.println("Out of food,closing");restaurant.exec.shutdownNow();//return;}System.out.print("Order up! ");synchronized (restaurant.waitPerson){restaurant.meal = new Meal(count);restaurant.waitPerson.notifyAll();}Thread.sleep(100);}} catch (InterruptedException e) {System.out.println("Chef interrupted");}}}public class WaitPerson implements Runnable{private Restaurant restaurant;public WaitPerson(Restaurant restaurant) {this.restaurant = restaurant;}@Overridepublic void run() {try {while (!Thread.interrupted()){synchronized (this){while (restaurant.meal == null)wait();}System.out.print("WaitPerson got" + restaurant.meal);System.out.println();synchronized (restaurant.chef){restaurant.meal = null;restaurant.chef.notifyAll();}}} catch (InterruptedException e) {System.out.println("WaitPerson interrupted");}}}public class Restaurant{Meal meal;ExecutorService exec = Executors.newCachedThreadPool();Chef chef = new Chef(this);WaitPerson waitPerson = new WaitPerson(this);public Restaurant() {exec.execute(chef);exec.execute(waitPerson);}public static void main(String[] args) {new Restaurant();}}

在该场景下有几点细节处:

① 当使用wait()进行等待的时候防止措施信号的可能性

while(ConditionIsNotMet) { wait() }

② shutdownNow()向所有线程池启动的任务发送interrupt(),但是在Chef中并没有在获得interrupt立刻关闭,而是试图在进入sleep阻塞状态的时候才抛出InterruptedException异常。如果移除的话那么,将返回run顶部Thread.interrupted()循环判断退出。如果在下面使用return 也是直接将程序返回不再继续往下执行。

(生产者消费者与队列)

使用同步队列可以实现生产者和消费者的关系,如果消费者试图从队列中获取对象,若此时队列为空,那么队列可以挂起消费者任务,当有更多元素时可以用则回复消费者任务。

场景:一台机器具有三个任务,一个制作吐司,一个给吐司抹黄油,另一个在抹过黄油的吐司上进行涂果酱。

/*** @author Lyyyys* @version 1.0* @date /10/5 19:23*/public class Toast {public enum Status{DRY,BUTTERED,JAMMED}private Status status = Status.DRY;private final int id;public Toast(int id) {this.id = id;}public void butter(){status = Status.BUTTERED;}public void jam(){status = Status.JAMMED;}public Status getStatus() {return status;}public int getId() {return id;}@Overridepublic String toString() {return "Toast" + id + ": " + status;}}public class Toaster implements Runnable{private ToastQueue toastQueue;private int count = 0;private Random rand = new Random(47);public Toaster(ToastQueue toastQueue) {this.toastQueue = toastQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()){Thread.sleep(100 + rand.nextInt(500));Toast t = new Toast(count++);System.out.println(t);toastQueue.put(t);}} catch (InterruptedException e) {System.out.println("Toaster interrupted");}System.out.println("Toaster Off");}}public class Butterer implements Runnable{private ToastQueue dryQueue,butteredQueue;public Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {this.dryQueue = dryQueue;this.butteredQueue = butteredQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()){Toast t = dryQueue.take();t.butter();System.out.println(t);butteredQueue.put(t);}} catch (InterruptedException e) {System.out.println("Butterer interrupted");}System.out.println("Butterer off");}}public class Jammer implements Runnable{private ToastQueue butteredQueue,finishedQueue;public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {this.butteredQueue = butteredQueue;this.finishedQueue = finishedQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()){Toast t = butteredQueue.take();t.jam();System.out.println(t);finishedQueue.put(t);}} catch (InterruptedException e) {System.out.println("Jammer interrupted");}System.out.println("Jammer off");}}public class Eater implements Runnable{private ToastQueue finishedQueue;private int counter = 0;public Eater(ToastQueue finishedQueue) {this.finishedQueue = finishedQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()){Toast t = finishedQueue.take();if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED){System.out.println(">>>>>>>Error: " + t);System.exit(1);}elseSystem.out.println("Chomp! " + t);}} catch (InterruptedException e) {System.out.println("Eater interrupted");}System.out.println("Eater off");}}public class ToastMatic {public static void main(String[] args) throws InterruptedException {ToastQueue dryQueue = new ToastQueue(),butteredQueue = new ToastQueue(),finishedQueue = new ToastQueue();ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new Toaster(dryQueue));exec.execute(new Butterer(dryQueue,butteredQueue));exec.execute(new Jammer(butteredQueue,finishedQueue));exec.execute(new Eater(finishedQueue));Thread.sleep(500);exec.shutdownNow();}}

注:Toast当中使用enum值(枚举类,里面存放常量值),没有显示使用同步,并且将wait和notify之间的耦合消除了,每个类直接与BlockingQueue直接通信并且底层自动地挂起以及恢复。

死锁(DeadLock)

四个必要条件

1.互斥:一个资源只能被一个线程所占有

2.占有且等待:一个资源本身占有资源并且等待别的资源的释放

3.不可剥夺:已占有资源的线程不可被其他线程所剥夺

4.循环等待:一个线程A占有C所需资源等待D,一个线程B占有D所需资源等待C形成一个循环等待这么一个条件

要解决死锁地话那么对四个条件进行破坏

①互斥:不可破坏,本来就是需要互斥上锁解决共享资源的访问

②占有且等待:一次性请求完所有资源

③不可剥夺:本身占据资源若在申请别的资源不通过的时候,释放自身的资源

④循环等待:按一定顺序申请资源,反序释放所有资源

public class LockTest {public static String obj1 = "obj1";public static String obj2 = "obj2";public static void main(String[] args) {LockA la = new LockA();new Thread(la).start();LockB lb = new LockB();new Thread(lb).start();}}class LockA implements Runnable{public void run() {try {System.out.println(new Date().toString() + " LockA 开始执行");while(true){synchronized (LockTest.obj1) {System.out.println(new Date().toString() + " LockA 锁住 obj1");Thread.sleep(3000); // 此处等待是给B能锁住机会synchronized (LockTest.obj2) {System.out.println(new Date().toString() + " LockA 锁住 obj2");Thread.sleep(60 * 1000); // 为测试,占用了就不放}}}} catch (Exception e) {e.printStackTrace();}}}class LockB implements Runnable{public void run() {try {System.out.println(new Date().toString() + " LockB 开始执行");while(true){synchronized (LockTest.obj2) {System.out.println(new Date().toString() + " LockB 锁住 obj2");Thread.sleep(3000); // 此处等待是给A能锁住机会synchronized (LockTest.obj1) {System.out.println(new Date().toString() + " LockB 锁住 obj1");Thread.sleep(60 * 1000); // 为测试,占用了就不放}}}} catch (Exception e) {e.printStackTrace();}}}

为了解决上述代码的死锁问题,破坏不可剥夺条件,只要获取资源不成功那么就释放自身所获取到的资源信息。

public class UnLockTest {public static String obj1 = "obj1";public static final Semaphore a1 = new Semaphore(1);public static String obj2 = "obj2";public static final Semaphore a2 = new Semaphore(1);public static void main(String[] args) {LockAa la = new LockAa();new Thread(la).start();LockBb lb = new LockBb();new Thread(lb).start();}}class LockAa implements Runnable {public void run() {try {System.out.println(new Date().toString() + " LockA 开始执行");while (true) {if (UnLockTest.a1.tryAcquire(1, TimeUnit.SECONDS)) {System.out.println(new Date().toString() + " LockA 锁住 obj1");if (UnLockTest.a2.tryAcquire(1, TimeUnit.SECONDS)) {System.out.println(new Date().toString() + " LockA 锁住 obj2");Thread.sleep(60 * 1000); // do something}else{System.out.println(new Date().toString() + "LockA 锁 obj2 失败");}}else{System.out.println(new Date().toString() + "LockA 锁 obj1 失败");}UnLockTest.a1.release(); // 释放UnLockTest.a2.release();Thread.sleep(1000); // 马上进行尝试,现实情况下do something是不确定的}} catch (Exception e) {e.printStackTrace();}}}class LockBb implements Runnable {public void run() {try {System.out.println(new Date().toString() + " LockB 开始执行");while (true) {if (UnLockTest.a2.tryAcquire(1, TimeUnit.SECONDS)) {System.out.println(new Date().toString() + " LockB 锁住 obj2");if (UnLockTest.a1.tryAcquire(1, TimeUnit.SECONDS)) {System.out.println(new Date().toString() + " LockB 锁住 obj1");Thread.sleep(60 * 1000); // do something}else{System.out.println(new Date().toString() + "LockB 锁 obj1 失败");}}else{System.out.println(new Date().toString() + "LockB 锁 obj2 失败");}UnLockTest.a1.release(); // 释放UnLockTest.a2.release();Thread.sleep(10 * 1000); // 这里只是为了演示,所以tryAcquire只用1秒,而且B要给A让出能执行的时间,否则两个永远是死锁}} catch (Exception e) {e.printStackTrace();}}}

--------------------------------------------------------------------JUC-------------------------------------------------------

CountdownLatch

用于同步一个或多个任务,可以为对象设置一个初始值,在需要进行同步好以后的任务调用await(),计数值到达0时便进行调用,只能触发一次,计数值不能被重置而CyclicBarrier可以重置

/*** @author Lyyyys* @version 1.0* @date /10/6 16:28*/public class WaitingTask implements Runnable{private static int counter = 0;private final int id = counter++;private final CountDownLatch latch;public WaitingTask(CountDownLatch latch) {this.latch = latch;}@Overridepublic void run() {try {latch.await();System.out.println("Latch barrier passed for " + this);} catch (InterruptedException e) {System.out.println(this + " interrupted");}}@Overridepublic String toString() {return String.format("WaitingTask %1$-3d " ,id);}}public class TaskPortion implements Runnable{private static int counter = 0;private final int id = counter++;private static Random random = new Random(47);private final CountDownLatch latch;public TaskPortion(CountDownLatch latch) {this.latch = latch;}@Overridepublic void run() {try {doWork();latch.countDown();} catch (InterruptedException e) {}}private void doWork() throws InterruptedException {Thread.sleep(random.nextInt(2000));System.out.println(this + "completed");}@Overridepublic String toString() {return String.format("%1$-3d",id);}}public class CountDownLatchDemo {static final int SIZE = 100;public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();CountDownLatch latch = new CountDownLatch(SIZE);for (int i = 0; i < 10; i++) {exec.execute(new WaitingTask(latch));}for (int i = 0; i < SIZE; i++) {exec.execute(new TaskPortion(latch));}System.out.println("Launched all tasks");exec.shutdown();}}

CyclicBarrier

也是一组任务并发地执行,然后在下一个步骤之前等待,直至所有任务都完成,但是CyclicBarrier可以多次重用,并且计数是累加。到达一定次数以后CyclicBarrier构造器当中有一个执行任务,并且可以指定在所有线程都进入屏障后的执行动作,该执行动作由最后一个进行屏障的线程执行。

场景:有多匹马进行赛跑,并且每次随机步数,每次马跑完一轮过后要进行统计跑了多少步数,最后赛出先到达终点地马。

/*** @author Lyyyys* @version 1.0* @date /10/6 17:39*/public class Horse implements Runnable{private static int counter = 0;private final int id = counter++;private int strides = 0;private Random rand = new Random();private static CyclicBarrier cyclicBarrier;public Horse(CyclicBarrier b){cyclicBarrier = b;}public synchronized int getStrides(){return strides;}@Overridepublic void run() {try {while (!Thread.interrupted()){synchronized (this){strides += rand.nextInt(3);}cyclicBarrier.await();}} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}@Overridepublic String toString() {return "Horse" + id +" ";}public String tracks(){StringBuffer s = new StringBuffer();for (int i = 0; i < getStrides(); i++) {s.append("*");}s.append(id);return s.toString();}}public class HorseRace {static final int FINISH_LINE = 75;private List<Horse> horses = new ArrayList<>();private ExecutorService exec = Executors.newCachedThreadPool();private CyclicBarrier barrier;public HorseRace(int nHorses,final int pause){barrier = new CyclicBarrier(nHorses, new Runnable() {@Overridepublic void run() {StringBuffer s = new StringBuffer();for (int i = 0; i < FINISH_LINE; i++) {s.append("=");}System.out.println(s);for (Horse horse: horses) {System.out.println(horse.tracks());}for (Horse horse: horses) {if(horse.getStrides() >= FINISH_LINE){System.out.print(horse + "won!");exec.shutdownNow();return;}}}});for (int i = 0; i < nHorses; i++) {Horse horse = new Horse(barrier);horses.add(horse);exec.execute(horse);}}public static void main(String[] args) {int nHorses = 7;int pause = 200;if(args.length > 0){int n = new Integer(args[0]);nHorses = n > 0 ? n :nHorses;}if(args.length > 1){int p = new Integer(args[1]);pause = p > -1 ? p : pause;}new HorseRace(nHorses,pause);}}

注意点:

①Random要随机不能使用带有种子的构造器,因为Random本身就是伪随机数,使用带种子的构造器产生的步数都一致

②因为CyclicBarrier可以重用,并且在barrier构造器当中可以指定所有线程进入到屏障后,并且由最后一个进入屏障的线程进行操作。计数进行递增当到达7以后执行barrier方法进行重置后,线程while条件并未中断并且继续执行等待线程中断的信号,由屏障逻辑中给出,当有一匹马到达终点后马赢了,exec.shutdownNow(),向所有启动的任务发送interrupt(),但仍会执行接下来的任务,所以需要直接return程序,不再执行接下来的操作。

Semaphore(允许N个任务同时访问这个资源)

个人理解:颁发令牌的一个机制,当令牌足够的时候才可以进行acquire(令牌数量)操作,并且令牌初始化后,当线程进行调用release()操作会增加当前令牌数,并不是说固定化令牌数量

Semaphore的案例可以看这里

如果觉得《Thinking in Java----------并发篇章》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。