失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > java 利用Future异步获取多线程任务结果

java 利用Future异步获取多线程任务结果

时间:2023-07-13 07:59:25

相关推荐

java 利用Future异步获取多线程任务结果

独角兽企业重金招聘Python工程师标准>>>

简述

Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实现,可以来进行异步计算。

有了Future就可以进行三段式的编程了,1.启动多线程任务2.处理其他事3.收集多线程任务结果。从而实现了非阻塞的任务调用。在途中遇到一个问题,那就是虽然能异步获取结果,但是Future的结果需要通过isdone来判断是否有结果,或者使用get()函数来阻塞式获取执行结果。这样就不能实时跟踪其他线程的结果状态了,所以直接使用get还是要慎用,最好配合isdone来使用。

这里有一种更好的方式来实现对任意一个线程运行完成后的结果都能及时获取的办法:使用CompletionService,它内部添加了阻塞队列,从而获取future中的值,然后根据返回值做对应的处理。一般future使用和CompletionService使用的两个测试案例如下:

import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;/*** 多线程执行,异步获取结果* * @author i-clarechen**/public class AsyncThread {public static void main(String[] args) {AsyncThread t = new AsyncThread();List<Future<String>> futureList = new ArrayList<Future<String>>();t.generate(3, futureList);t.doOtherThings();t.getResult(futureList);}/*** 生成指定数量的线程,都放入future数组* * @param threadNum* @param fList*/public void generate(int threadNum, List<Future<String>> fList) {ExecutorService service = Executors.newFixedThreadPool(threadNum);for (int i = 0; i < threadNum; i++) {//添加到线程池中,有返回值Future<String> f = service.submit(getJob(i));fList.add(f);}//关闭执行服务对象service.shutdown();}/*** other things*/public void doOtherThings() {try {for (int i = 0; i < 3; i++) {System.out.println("do thing no:" + i);Thread.sleep(1000 * (new Random().nextInt(10)));}} catch (InterruptedException e) {e.printStackTrace();}}/*** 从future中获取线程结果,打印结果* * @param fList*/public void getResult(List<Future<String>> fList) {ExecutorService service = Executors.newSingleThreadExecutor();service.execute(getCollectJob(fList));service.shutdown();}/*** 生成指定序号的线程对象* * @param i* @return*/public Callable<String> getJob(final int i) {final int time = new Random().nextInt(10);return new Callable<String>() {@Overridepublic String call() throws Exception {Thread.sleep(1000 * time);return "thread-" + i;}};}/*** 生成结果收集线程对象* * @param fList* @return*/public Runnable getCollectJob(final List<Future<String>> fList) {return new Runnable() {public void run() {for (Future<String> future : fList) {try {while (true) {if (future.isDone() && !future.isCancelled()) {System.out.println("Future:" + future+ ",Result:" + future.get());break;} else {Thread.sleep(1000);}}} catch (Exception e) {e.printStackTrace();}}}};}}

运行结果打印和future放入列表时的顺序一致,为0,1,2:

do thing no:0do thing no:1do thing no:2Future:java.util.concurrent.FutureTask@68e1ca74,Result:thread-0Future:java.util.concurrent.FutureTask@3fb2bb77,Result:thread-1Future:java.util.concurrent.FutureTask@6f31a24c,Result:thread-2

下面是先执行完的线程先处理的方案:

import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.pletionService;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingDeque;public class testCallable {public static void main(String[] args) {try {completionServiceCount();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}/*** 使用completionService收集callable结果* @throws ExecutionException * @throws InterruptedException */public static void completionServiceCount() throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newCachedThreadPool();CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executorService);int threadNum = 5;for (int i = 0; i < threadNum; i++) {completionService.submit(getTask(i));}int sum = 0;int temp = 0;for(int i=0;i<threadNum;i++){temp = completionService.take().get();sum += temp;System.out.print(temp + "\t");}System.out.println("CompletionService all is : " + sum);executorService.shutdown();}public static Callable<Integer> getTask(final int no) {final Random rand = new Random();Callable<Integer> task = new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int time = rand.nextInt(100)*100;System.out.println("thead:"+no+" time is:"+time);Thread.sleep(time);return no;}};return task;}}

运行结果为最先结束的线程结果先被处理:

thead:0 time is:4200thead:1 time is:6900thead:2 time is:2900thead:3 time is:9000thead:4 time is:71000 1 4 3 CompletionService all is : 10

介绍

submit和execute都是 ExecutorService 的方法,都是添加线程到线程池中。

区别 submit 有返回值 返回future , execute没有

submit 返回值 future 用处 可以执行cancle方法,取消执行 可以通过get()方法,判断是否执行成功 ==null表示执行成功

如果觉得《java 利用Future异步获取多线程任务结果》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。