小言_互联网的博客

正确使用@Async完成异步执行

384人阅读  评论(0)

在很多业务开发中,经常为了保证性能,保证主业务流程正常, 副流程需要异步。例如目前我负责的支付主流程中,只要支付主业务正常,这笔交易就算是正常,例如:日志记录,邮件发送等不关键业务异步处理,只要保证最终处理成功就可。

 

代码部分:

首先看下面一段代码

@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncTestController {

    @Autowired
    private AsyncTestService asyncTestService;

    @GetMapping("sync")
    public void test(){
        log.info("==========1");
        asyncTestService.test();
        log.info("==========4");
    }

}
@Service
@Slf4j
public class AsyncTestService {

    public void test() {
        log.info("==========2");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("==========3");
    }

}

如上代码:发送请求:http://localhost:8080/async/sync

查看打印日志如下:

2019-09-26 19:57:31.056  INFO 4866 --- [nio-8080-exec-6] com.dsx.test.async.AsyncTestController   : ==========1
2019-09-26 19:57:31.056  INFO 4866 --- [nio-8080-exec-6] c.d.test.async.service.AsyncTestService  : ==========2
2019-09-26 19:57:32.061  INFO 4866 --- [nio-8080-exec-6] c.d.test.async.service.AsyncTestService  : ==========3
2019-09-26 19:57:32.061  INFO 4866 --- [nio-8080-exec-6] com.dsx.test.async.AsyncTestController   : ==========4

从上面日志很清楚可以看到,虽然service线程阻塞一秒,但是执行顺序还是1,2,3,4.因为上面代码都是在主线程中同步完成。

下面我们稍微修改一下代码,即可变成异步执行service代码;

  1. 在springboot 的启动类上添加 @EnableAsync 注解
  2. 在我们需要异步的service上方法上使用 @Async 注解

 代码变成如下:

@SpringBootApplication
@EnableAsync
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

}


@Service
@Slf4j
public class AsyncTestService {

    @Async
    public void test() {
        log.info("==========2");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("==========3");
    }

}
  

再次执行上面的请求,输出结果信息为:

2019-09-26 20:04:33.468  INFO 4989 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 4 ms
2019-09-26 20:04:33.482  INFO 4989 --- [nio-8080-exec-1] com.dsx.test.async.AsyncTestController   : ==========1
2019-09-26 20:04:33.484  INFO 4989 --- [nio-8080-exec-1] com.dsx.test.async.AsyncTestController   : ==========4
2019-09-26 20:04:33.487  INFO 4989 --- [         task-1] c.d.test.async.service.AsyncTestService  : ==========2
2019-09-26 20:04:34.488  INFO 4989 --- [         task-1] c.d.test.async.service.AsyncTestService  : ==========3

由输出日志明显可以看出,输出顺序已经发生了变化,且2,3的输出线程为:task-1,;

然后 @Async 异步执行新启动了线程,那实现机制是每次新起线程还是用的线程池呢?如果是线程池,那默认配置又是多少呢?

改动代码,循环1000次异步调用代码如下:

@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncTestController {

    @Autowired
    private AsyncTestService asyncTestService;


    @GetMapping("test2")
    public void test2(){
        log.info("==========1");
        for (int i = 0; i < 1000; i++) {
            asyncTestService.testAsync1();
        }
        log.info("==========4");
    }

}


public class AsyncTestService {


    @Async
    public void testAsync1() {
        log.info("=========={}",Thread.currentThread().getName());
    }

}

请求地址:http://localhost:8080/async/test2 ,查看输出线程名称,发现线程最大值为task-9 ,最小是task-1

猜测:默认线程池大小是9,最大队列数为:Integer.MAX_VALUE 。(待源码验证)

能否自定义线程池?

这里不验证了猜想了,下篇会讲源码,源码分析阶段会得知可以自定义线程池,这里直接给出自定义线程池的demo

在启动类能扫描的包中添加如下配置类

package com.dsx.test.async.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * @author : tianwenqing
 * @version : 1.0
 * @date : 2019-09-26 20:31
 * @description :
 **/

@Configuration
public class AsyncConfig {

    /**
     * the core pool size of thread pool
     */
    @Value("${system.async.core-pool-size:2}")
    private int corePoolSize;
    /**
     * the max pool size of thread pool
     */
    @Value("${system.async.max-pool-size:10}")
    private int maxPoolSize;
    /**
     * the  thread pool name
     */
    @Value("${system.async.thread-name:api}")
    private String threadName;
   /**
     * the  queue size of thread pool
     */
    @Value("${system.async.queue-size:3}")
    private int queueSize;

    /**
     * 注入 Spring 线程池
     *
     * @return
     */
    @Bean
    public TaskExecutor taskExecutor() {

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setThreadNamePrefix(threadName);
        taskExecutor.setCorePoolSize(corePoolSize);
        taskExecutor.setMaxPoolSize(maxPoolSize);
        taskExecutor.setQueueCapacity(queueSize);
        return taskExecutor;
    }
}

修改

controller ,service 代码如下: 

@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncTestController {

    @GetMapping("test1")
    public void test1(){
        log.info("==========1");
        asyncTestService.testAsync();
        log.info("==========4");
    }

    @GetMapping("test2")
    public void test2(){
        log.info("==========1");
        int y = 0 ;
        for (int i = 0; i < 1000; i++) {
            try {
                asyncTestService.testAsync1();
            } catch (Exception e) {
                log.error("丢弃任务数"+(++y));
            }
        }
        log.info("==========结束");

    }

}




@Service
@Slf4j
public class AsyncTestService {


    @Autowired(required = false)
    @Qualifier(value = "taskExecutor")
    private TaskExecutor taskExecutor;

    @Async
    public void testAsync() {
        log.info("==========2");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("==========3");
    }


    @Async
    public void testAsync1() {
        ThreadPoolTaskExecutor tpe = ((ThreadPoolTaskExecutor) taskExecutor);
        log.info("----------线程池中的目前存在的线程数量:{}", tpe.getThreadPoolExecutor().getPoolSize());
        log.info("----------线程池中队列大小:{}",tpe.getThreadPoolExecutor().getQueue().size());
        log.info("----------线程池中正在执行任务的线程数量:{}",tpe.getThreadPoolExecutor().getActiveCount());
        log.info("----------线程池中需要执行的任务数量:{}",tpe.getThreadPoolExecutor().getTaskCount());
        log.info("----------线程池中完成的任务数量:{}",tpe.getThreadPoolExecutor().getCompletedTaskCount());
        log.info("=========={}", Thread.currentThread().getName());
    }

}



访问请求:http://localhost:8080/async/test2

输出部分内容如下:

2019-09-27 18:36:44.162  INFO 3567 --- [nio-8080-exec-4] com.dsx.test.async.AsyncTestController   : ==========1
2019-09-27 18:36:44.163  INFO 3567 --- [           api5] c.d.test.async.service.AsyncTestService  : ==========api5
2019-09-27 18:36:44.163  INFO 3567 --- [           api9] c.d.test.async.service.AsyncTestService  : ==========api9
2019-09-27 18:36:44.163 ERROR 3567 --- [nio-8080-exec-4] com.dsx.test.async.AsyncTestController   : 丢弃任务数1
2019-09-27 18:36:44.163  INFO 3567 --- [           api7] c.d.test.async.service.AsyncTestService  : ==========api7
2019-09-27 18:36:44.163  INFO 3567 --- [           api7] c.d.test.async.service.AsyncTestService  : ==========api7
2019-09-27 18:36:44.163  INFO 3567 --- [           api7] c.d.test.async.service.AsyncTestService  : ==========api7
2019-09-27 18:36:44.163 ERROR 3567 --- [nio-8080-exec-4] com.dsx.test.async.AsyncTestController   : 丢弃任务数2

从输出内容:明显看出,异步线程已经变成自定义的线程名称,并由输出信息顺便验证了线程池部分知识。配置文件里的参数在设置的很小,就是为了验证线程池的内容,生产环境可根据自己的情况修改为合适值,首先初始化coreSize的线程数,当coreSize的线程都被占用,则放入queue, 当queue满了之后继续生成线程至最大线程数(maxSize). 如果最大线程数满了,则执行处理策略,ThreadPoolTaskExecutor 的默认策略是抛出 TaskRejectedException 异常,所以demo中在controller 捕获异常,可以记录日志,待有可用线程时再次执行。

顺便验证一下线程池的监控参数:

controller 添加如下代码:

package com.dsx.test.async;

import com.dsx.test.async.service.AsyncTestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import sun.rmi.log.ReliableLog;

/**
 * @author : tianwenqing
 * @version : 1.0
 * @date : 2019-09-26 19:42
 * @description : @Async 异步测试
 **/
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncTestController {

    @Autowired
    private AsyncTestService asyncTestService;

    @Autowired(required = false)
    @Qualifier(value = "taskExecutor")
    private TaskExecutor taskExecutor;


    @GetMapping("test2")
    public void test2(){
        log.info("==========1");
        int y = 0 ;
        for (int i = 0; i < 1000; i++) {
            try {
                asyncTestService.testAsync1();
            } catch (Exception e) {
                e.printStackTrace();
                log.error("丢弃任务数"+(++y));
            }
        }
        log.info("==========结束");

    }

    @GetMapping("test3")
    public void test3(){
        ThreadPoolTaskExecutor tpe = ((ThreadPoolTaskExecutor) taskExecutor);
        log.info("----------线程池中的线程数量:{}",tpe.getThreadPoolExecutor().getPoolSize());
        log.info("----------线程池中曾经创建过的最大线程数:{}",tpe.getThreadPoolExecutor().getLargestPoolSize());
        log.info("----------线程池中活跃的线程数:{}",tpe.getThreadPoolExecutor().getActiveCount());
        log.info("----------线程池中需要执行的任务数量:{}",tpe.getThreadPoolExecutor().getTaskCount());
        log.info("----------线程池中完成的任务数量:{}",tpe.getThreadPoolExecutor().getCompletedTaskCount());
    }

}

 当请求完地址:http://localhost:8080/async/test2 然后请求:http://localhost:8080/async/test3

输出结果如下:

2019-09-27 18:50:50.066  INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController   : ----------线程池中的线程数量:10
2019-09-27 18:50:50.066  INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController   : ----------线程池中曾经创建过的最大线程数:10
2019-09-27 18:50:50.066  INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController   : ----------线程池中活跃的线程数:0
2019-09-27 18:50:50.066  INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController   : ----------线程池中需要执行的任务数量:768
2019-09-27 18:50:50.066  INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController   : ----------线程池中完成的任务数量:768
从输出内容大概可以看出监控池的主要指标:
getActiveCount() 目前还在执行任务的线程数,如果没有任务则会变为0
getLargestPoolSize() 曾经创建过的最大线程数:这个线程数只会增多不会变少
getPoolSize() 目前线程池中存在的线程数量

 

 

 

 

结论:

  1. 在springboot 工程中只需要 @EnableAsync,@Async 两个注解即可完成简单异步执行。
  2. 猜测:默认线程池大小是9,最大队列数为:Integer.MAX_VALUE 。(待源码验证)
  3. 支持自定义线程池,只需要返回一个 TaskExecutor 实力交给Spring 管理。

附:文中相关代码地址:https://gitee.com/disanxian/test/tree/master/src/main/java/com/dsx/test

 

 


转载:https://blog.csdn.net/homework_tian/article/details/101468939
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场