小言_互联网的博客

Future和Callable学习

459人阅读  评论(0)

我们知道使用多线程时,最初的Thread到线程池,此时对于线程的使用,提供了其使用的复用率。而实现多线程的三种方式:继承Thread;实现Runnable接口,重写run方法;实现Callable接口,同时重写call方法,同时通过Future获取执行的返回值。也就是说callable执行任务,而Future拿到执行的结果。Future具有阻塞性在于其get()方法具有阻塞性,而isDone()是不具有阻塞性的。

通常使用线程池+Runnable的时候,会发现Runnable不能返回值,也就执行的结果情况,同时对于出现异常,我们获取异常信息,进行相应的处理。如果需要返回结果,同时需要进一步加工的时候,就可以考虑使用Future+Callable了。同时接口Future的默认实现是FutureTask,因此对于其实现get()方法,会有一个问题,就是如果前面的任务一旦执行的时间耗时较长的时候,就会出现一直阻塞的状态,此时就会出现排队等待的状态,大大影响其性能。适用场景:当一个线程需要等待另一个线程把某个任务执行完成后它才能继续执行,此时可以使用FutureTask。因为FutureTask基于AQS实现,因此其具有阻塞性。

Future的使用

/**
 *
 * @description: Future使用
 * <p>
 * 实现callable接口,同时重写call方法,其优点:与Runnable不同的是,其可以返回结果,
 * 同时可以声明异常,返回一个执行检查的异常信息,而Runnable返回的是void,
 * 因此在程序上方便排查问题,同时了解执行的结果情况,如果返回的结果想是void的,则可以在实现时选择
 * Callable<void>
 * </p>
 * @author: lyz
 * @date: 2020/05/24 11:23
 **/
@Slf4j
public class FutureTest {
     static class MyCallable implements Callable<String>{
        public String call() throws Exception {
            //业务逻辑执行部分
            log.info("do something in callable");
            Thread.sleep(1000);
            return "Done";
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //ExecutorService executorService = Executors.newCachedThreadPool();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(2,3,5L,TimeUnit.SECONDS,new LinkedBlockingDeque<>());
        //执行需要提交的任务,其get()方法拿到需要的结果
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do something in main");
        Thread.sleep(1000);
        //拿到执行后返回的结果
        String result = future.get();
        log.info("result:{}",result);
        executorService.shutdown();
    }

}

执行结果:

 INFO [main] - do something in main
 INFO [pool-1-thread-1] - do something in callable
 INFO [main] - result:Done

Future+Runnable,不带返回值:

/**
 *
 * @description: Future使用
 * <p>
 * 实现callable接口,同时重写call方法,方法submit不仅可以传入Callable对象,
 * 也可以传入Runnable对象,说明submit()方法支持有返回值和无返回值的功能
 * get具有阻塞性,而isDone不阻塞
 * Callable<void>
 * </p>
 * @author: lyz
 * @date: 2020/05/24 11:28
 **/
@Slf4j
public class FutureTest2 {
    public static void main(String[] args) {
        try{
/*        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                    System.out.println("查看get结果打印信息");
                }
            };
        */
           //采用lambda表达式
            Runnable runnable = ()->System.out.println("查看get结果打印信息");
            ExecutorService executor = Executors.newCachedThreadPool();
            Future future = executor.submit(runnable);
            //此时get返回的值为null,说明支持void的方式
            System.out.println("future get result:"+future.get()+" and isDone:"+future.isDone());
        }catch(Exception e){
                log.error("do the thing error:{}"+e.getMessage());
        }
    }
}

运行结果:

查看get结果打印信息
future get result:null and isDone:true

Future+Runnable,带返回结果

/**
 *
 * @description: Future使用
 * <p>
 * 实现callable接口,同时重写call方法,方法submit不仅可以传入Callable对象,
 * 而且还可以Runnable,同时可以从api中可以看到submit(Runnable,result)携带返回信息
 * </p>
 * @author: lyz
 * @date: 2020/05/24 11:38
 **/
@Slf4j
public class FutureTest3 {

    //实现Runnable接口,重写run方法
    static class MyRunnable implements Runnable{
        private User user;
        public MyRunnable(User user){
            this.user = user;
        }

        @Override
        public void run() {
          user.setUsername("在路上");
          user.setPassword("123456");
        }
    }

    public static void main(String[] args) {
       try{
           User user =new User("123","345");
           MyRunnable myRunnable = new MyRunnable(user);
           ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>());
           Future<User> future = executor.submit(myRunnable,user);
           System.out.println("start Time="+System.currentTimeMillis());
           user = future.get();
           System.out.println("get Value="+user.getUsername()+"==="+user.getPassword());
           System.out.println("end Time="+System.currentTimeMillis());
       }catch (Exception e){
           log.info("执行异常:{}",e.getMessage());
       }
    }


}

运行结果:

start Time=1590322809154
get Value=在路上===123456
end Time=1590322809157

执行多个任务:

/**
 * @description: Future使用
 * <p>
 * 实现callable接口,同时重写call方法,方法submit不仅可以传入Callable对象,
 * 而且还可以Runnable,同时可以从api中可以看到submit(Runnable,result)
 * get具有阻塞性,而isDone不阻塞。cancel与isCancelled使用
 * 其优点是从线程中返回数据以便进行后期的处理,其缺点是具有阻塞性
 * Callable<void>
 * </p>
 * @author: lyz
 * @date: 2020/05/24 11:48
 **/
@Slf4j
public class FutureTest4 {

    public static void main(String[] args) {
        try {
            MyCallable callable1 = new MyCallable("123", 5000);
            MyCallable callable2 = new MyCallable("456", 4000);
            MyCallable callable3 = new MyCallable("236", 3000);
            MyCallable callable4 = new MyCallable("678", 2000);
            MyCallable callable5 = new MyCallable("789", 1000);

            List<Callable> callableList = new ArrayList<>();
            callableList.add(callable1);
            callableList.add(callable2);
            callableList.add(callable3);
            callableList.add(callable4);
            callableList.add(callable5);

            List<Future> futureList = new ArrayList<>();
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
            for (int i = 0; i < 5; i++) {
                futureList.add(executor.submit(callableList.get(i)));
            }
            System.out.println("run first time=" + System.currentTimeMillis());
            for (int i = 0; i < 5; i++) {
                System.out.println(futureList.get(i).get()+" "+System.currentTimeMillis());
            }
        } catch (Exception e) {
           log.error("执行任务出错:{}",e.getMessage());
        }
    }
}

运行结果:

run first time=1590322937399
456
678
123
236
789
return123 1590322942515
return456 1590322942515
return236 1590322942515
return678 1590322942515
return789 1590322942515

从运行结果可以看到返回的结果线程是有序的,也即其会等待线程运行完成才会返回结果,而执行的线程我们可以不是有序的,是乱序的。因此可以看到其如果执行的程序中有一段任务出现执行过程较长时,就会被阻塞,进行排队。

使用FutureTask:

/**
 *
 * @description: Future使用
 * <p>
 * FutureTask:是Future的实现类,而且在使用线程池时,默认的情况下也是使用
 * futureTask类作为接口Future的实现类,但需要注意的是,Future接口调用get()方法
 * 取得处理的结果时是阻塞性的,也就是如果调用get()方法时,任务尚未完成,则
 * 调用get()方法时一直阻塞到此任务完成时为止。如果是这样的相关,则前面先执行的任务
 * 一旦耗时很多,则后面的任务调用get()方法就呈现阻塞状态,也就是排队等待,大大影响运行效率。
 * 也即主线程并不能保证首先获的是最先完成任务的返回值,这是future的缺点,影响效率
 * </p>
 * @author: lyz
 * @date: 2020/05/24 12:00
 **/
public class FutureTask {
    /**
     * FutureTask由线程池执行
     */
    private static void exeForPool(){
        // 创建 FutureTask,采用三个线程执行主线程
        java.util.concurrent.FutureTask<Integer> futureTask = new java.util.concurrent.FutureTask<>(()-> 1+2);
        // 创建线程池
        ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool().build();

        try{
            // 提交 FutureTask
            executor.submit(futureTask);
            // 获取计算结果
            Integer result = futureTask.get();
            System.out.println(result);
        }catch(Exception e){
            e.printStackTrace();
        }finally {
            //进行优雅关闭
            ThreadPoolUtil.gracefulShutdown(executor,1);
        }
    }

    /**
     * FutureTask由线程处理
     */
    private static void exeForThread(){
        // 创建 FutureTask
        java.util.concurrent.FutureTask<Integer> futureTask = new java.util.concurrent.FutureTask<>(()-> 1+2);
        // 创建并启动线程
        Thread T1 = new Thread(futureTask);
        T1.start();
        // 获取计算结果
        try{
            Integer result = futureTask.get();
            System.out.println(result);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 利用FutureTask实现烧水泡茶
     */
    private static void fireWater(){

        // 创建任务 T2 的 FutureTask
        java.util.concurrent.FutureTask<String> ft2 = new java.util.concurrent.FutureTask<>(new T2Task());
        // 创建任务 T1 的 FutureTask
        java.util.concurrent.FutureTask<String> ft1 = new java.util.concurrent.FutureTask<>(new T1Task(ft2));
        // 线程 T1 执行任务 ft1
        Thread t1 = new Thread(ft1);
        t1.start();
        // 线程 T2 执行任务 ft2
        Thread t2 = new Thread(ft2);
        t2.start();
        // 等待线程 T1 执行结果
        try{
            System.out.println(ft1.get());

        }catch (Exception e){
            e.printStackTrace();
        }
    }
    /**
     * 洗水壶、烧开水、泡茶,实现Callable接口,重写call方法
     */
    static class T1Task implements Callable<String> {
        java.util.concurrent.FutureTask<String> ft2;
        T1Task(java.util.concurrent.FutureTask<String> ft2){
            this.ft2 = ft2;
        }
        @Override
        public String call() throws Exception {
            System.out.println("T1: 洗水壶...");
            TimeUnit.SECONDS.sleep(1);

            System.out.println("T1: 烧开水...");
            TimeUnit.SECONDS.sleep(15);
            // 获取 T2 线程的茶叶
            String tf = ft2.get();
            System.out.println("T1: 拿到茶叶:"+tf);

            System.out.println("T1: 泡茶...");
            return " 上茶:" + tf;
        }
    }
    /**
     * 洗茶壶、洗茶杯、拿茶叶,实现Callable接口,重写call方法
     */
    static class T2Task implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("T2: 洗茶壶...");
            TimeUnit.SECONDS.sleep(1);

            System.out.println("T2: 洗茶杯...");
            TimeUnit.SECONDS.sleep(2);

            System.out.println("T2: 拿茶叶...");
            TimeUnit.SECONDS.sleep(1);
            return " 白茶 ";
        }
    }
    public static void main(String[] args) {
        exeForPool();
        exeForThread();
        fireWater();
    }
}

运行结果:

3
3
T2: 洗茶壶...
T1: 洗水壶...
T2: 洗茶杯...
T1: 烧开水...
T2: 拿茶叶...
T1: 拿到茶叶: 白茶 
T1: 泡茶...
 上茶: 白茶 

前面说到其阻塞性,影响了其运行的效率,而在jdk1.5之后,引入了CompletionService,CompletionService可以一边执行新的任务,一边处理返回的结果,将结果进行返回,采用submit+take或者采用poll()方法,而采用poll方法不具有阻塞性,因此性能上有提高。


转载:https://blog.csdn.net/trytostudy/article/details/106321395
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场