在很多业务开发中,经常为了保证性能,保证主业务流程正常, 副流程需要异步。例如目前我负责的支付主流程中,只要支付主业务正常,这笔交易就算是正常,例如:日志记录,邮件发送等不关键业务异步处理,只要保证最终处理成功就可。
代码部分:
首先看下面一段代码
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncTestController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("sync")
public void test(){
log.info("==========1");
asyncTestService.test();
log.info("==========4");
}
}
@Service
@Slf4j
public class AsyncTestService {
public void test() {
log.info("==========2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("==========3");
}
}
如上代码:发送请求:http://localhost:8080/async/sync
查看打印日志如下:
2019-09-26 19:57:31.056 INFO 4866 --- [nio-8080-exec-6] com.dsx.test.async.AsyncTestController : ==========1
2019-09-26 19:57:31.056 INFO 4866 --- [nio-8080-exec-6] c.d.test.async.service.AsyncTestService : ==========2
2019-09-26 19:57:32.061 INFO 4866 --- [nio-8080-exec-6] c.d.test.async.service.AsyncTestService : ==========3
2019-09-26 19:57:32.061 INFO 4866 --- [nio-8080-exec-6] com.dsx.test.async.AsyncTestController : ==========4
从上面日志很清楚可以看到,虽然service线程阻塞一秒,但是执行顺序还是1,2,3,4.因为上面代码都是在主线程中同步完成。
下面我们稍微修改一下代码,即可变成异步执行service代码;
- 在springboot 的启动类上添加 @EnableAsync 注解
- 在我们需要异步的service上方法上使用 @Async 注解
代码变成如下:
@SpringBootApplication
@EnableAsync
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}
@Service
@Slf4j
public class AsyncTestService {
@Async
public void test() {
log.info("==========2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("==========3");
}
}
再次执行上面的请求,输出结果信息为:
2019-09-26 20:04:33.468 INFO 4989 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 4 ms
2019-09-26 20:04:33.482 INFO 4989 --- [nio-8080-exec-1] com.dsx.test.async.AsyncTestController : ==========1
2019-09-26 20:04:33.484 INFO 4989 --- [nio-8080-exec-1] com.dsx.test.async.AsyncTestController : ==========4
2019-09-26 20:04:33.487 INFO 4989 --- [ task-1] c.d.test.async.service.AsyncTestService : ==========2
2019-09-26 20:04:34.488 INFO 4989 --- [ task-1] c.d.test.async.service.AsyncTestService : ==========3
由输出日志明显可以看出,输出顺序已经发生了变化,且2,3的输出线程为:task-1,;
然后 @Async 异步执行新启动了线程,那实现机制是每次新起线程还是用的线程池呢?如果是线程池,那默认配置又是多少呢?
改动代码,循环1000次异步调用代码如下:
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncTestController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("test2")
public void test2(){
log.info("==========1");
for (int i = 0; i < 1000; i++) {
asyncTestService.testAsync1();
}
log.info("==========4");
}
}
public class AsyncTestService {
@Async
public void testAsync1() {
log.info("=========={}",Thread.currentThread().getName());
}
}
请求地址:http://localhost:8080/async/test2 ,查看输出线程名称,发现线程最大值为task-9 ,最小是task-1
猜测:默认线程池大小是9,最大队列数为:Integer.MAX_VALUE 。(待源码验证)
能否自定义线程池?
这里不验证了猜想了,下篇会讲源码,源码分析阶段会得知可以自定义线程池,这里直接给出自定义线程池的demo
在启动类能扫描的包中添加如下配置类
package com.dsx.test.async.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author : tianwenqing
* @version : 1.0
* @date : 2019-09-26 20:31
* @description :
**/
@Configuration
public class AsyncConfig {
/**
* the core pool size of thread pool
*/
@Value("${system.async.core-pool-size:2}")
private int corePoolSize;
/**
* the max pool size of thread pool
*/
@Value("${system.async.max-pool-size:10}")
private int maxPoolSize;
/**
* the thread pool name
*/
@Value("${system.async.thread-name:api}")
private String threadName;
/**
* the queue size of thread pool
*/
@Value("${system.async.queue-size:3}")
private int queueSize;
/**
* 注入 Spring 线程池
*
* @return
*/
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setThreadNamePrefix(threadName);
taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.setQueueCapacity(queueSize);
return taskExecutor;
}
}
修改
controller ,service 代码如下:
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncTestController {
@GetMapping("test1")
public void test1(){
log.info("==========1");
asyncTestService.testAsync();
log.info("==========4");
}
@GetMapping("test2")
public void test2(){
log.info("==========1");
int y = 0 ;
for (int i = 0; i < 1000; i++) {
try {
asyncTestService.testAsync1();
} catch (Exception e) {
log.error("丢弃任务数"+(++y));
}
}
log.info("==========结束");
}
}
@Service
@Slf4j
public class AsyncTestService {
@Autowired(required = false)
@Qualifier(value = "taskExecutor")
private TaskExecutor taskExecutor;
@Async
public void testAsync() {
log.info("==========2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("==========3");
}
@Async
public void testAsync1() {
ThreadPoolTaskExecutor tpe = ((ThreadPoolTaskExecutor) taskExecutor);
log.info("----------线程池中的目前存在的线程数量:{}", tpe.getThreadPoolExecutor().getPoolSize());
log.info("----------线程池中队列大小:{}",tpe.getThreadPoolExecutor().getQueue().size());
log.info("----------线程池中正在执行任务的线程数量:{}",tpe.getThreadPoolExecutor().getActiveCount());
log.info("----------线程池中需要执行的任务数量:{}",tpe.getThreadPoolExecutor().getTaskCount());
log.info("----------线程池中完成的任务数量:{}",tpe.getThreadPoolExecutor().getCompletedTaskCount());
log.info("=========={}", Thread.currentThread().getName());
}
}
访问请求:http://localhost:8080/async/test2
输出部分内容如下:
2019-09-27 18:36:44.162 INFO 3567 --- [nio-8080-exec-4] com.dsx.test.async.AsyncTestController : ==========1
2019-09-27 18:36:44.163 INFO 3567 --- [ api5] c.d.test.async.service.AsyncTestService : ==========api5
2019-09-27 18:36:44.163 INFO 3567 --- [ api9] c.d.test.async.service.AsyncTestService : ==========api9
2019-09-27 18:36:44.163 ERROR 3567 --- [nio-8080-exec-4] com.dsx.test.async.AsyncTestController : 丢弃任务数1
2019-09-27 18:36:44.163 INFO 3567 --- [ api7] c.d.test.async.service.AsyncTestService : ==========api7
2019-09-27 18:36:44.163 INFO 3567 --- [ api7] c.d.test.async.service.AsyncTestService : ==========api7
2019-09-27 18:36:44.163 INFO 3567 --- [ api7] c.d.test.async.service.AsyncTestService : ==========api7
2019-09-27 18:36:44.163 ERROR 3567 --- [nio-8080-exec-4] com.dsx.test.async.AsyncTestController : 丢弃任务数2
从输出内容:明显看出,异步线程已经变成自定义的线程名称,并由输出信息顺便验证了线程池部分知识。配置文件里的参数在设置的很小,就是为了验证线程池的内容,生产环境可根据自己的情况修改为合适值,首先初始化coreSize的线程数,当coreSize的线程都被占用,则放入queue, 当queue满了之后继续生成线程至最大线程数(maxSize). 如果最大线程数满了,则执行处理策略,ThreadPoolTaskExecutor 的默认策略是抛出 TaskRejectedException 异常,所以demo中在controller 捕获异常,可以记录日志,待有可用线程时再次执行。
顺便验证一下线程池的监控参数:
controller 添加如下代码:
package com.dsx.test.async;
import com.dsx.test.async.service.AsyncTestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import sun.rmi.log.ReliableLog;
/**
* @author : tianwenqing
* @version : 1.0
* @date : 2019-09-26 19:42
* @description : @Async 异步测试
**/
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncTestController {
@Autowired
private AsyncTestService asyncTestService;
@Autowired(required = false)
@Qualifier(value = "taskExecutor")
private TaskExecutor taskExecutor;
@GetMapping("test2")
public void test2(){
log.info("==========1");
int y = 0 ;
for (int i = 0; i < 1000; i++) {
try {
asyncTestService.testAsync1();
} catch (Exception e) {
e.printStackTrace();
log.error("丢弃任务数"+(++y));
}
}
log.info("==========结束");
}
@GetMapping("test3")
public void test3(){
ThreadPoolTaskExecutor tpe = ((ThreadPoolTaskExecutor) taskExecutor);
log.info("----------线程池中的线程数量:{}",tpe.getThreadPoolExecutor().getPoolSize());
log.info("----------线程池中曾经创建过的最大线程数:{}",tpe.getThreadPoolExecutor().getLargestPoolSize());
log.info("----------线程池中活跃的线程数:{}",tpe.getThreadPoolExecutor().getActiveCount());
log.info("----------线程池中需要执行的任务数量:{}",tpe.getThreadPoolExecutor().getTaskCount());
log.info("----------线程池中完成的任务数量:{}",tpe.getThreadPoolExecutor().getCompletedTaskCount());
}
}
当请求完地址:http://localhost:8080/async/test2 然后请求:http://localhost:8080/async/test3
输出结果如下:
2019-09-27 18:50:50.066 INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController : ----------线程池中的线程数量:10
2019-09-27 18:50:50.066 INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController : ----------线程池中曾经创建过的最大线程数:10
2019-09-27 18:50:50.066 INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController : ----------线程池中活跃的线程数:0
2019-09-27 18:50:50.066 INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController : ----------线程池中需要执行的任务数量:768
2019-09-27 18:50:50.066 INFO 3776 --- [nio-8080-exec-3] com.dsx.test.async.AsyncTestController : ----------线程池中完成的任务数量:768
从输出内容大概可以看出监控池的主要指标:
getActiveCount() | 目前还在执行任务的线程数,如果没有任务则会变为0 |
getLargestPoolSize() | 曾经创建过的最大线程数:这个线程数只会增多不会变少 |
getPoolSize() | 目前线程池中存在的线程数量 |
结论:
- 在springboot 工程中只需要 @EnableAsync,@Async 两个注解即可完成简单异步执行。
- 猜测:默认线程池大小是9,最大队列数为:Integer.MAX_VALUE 。(待源码验证)
- 支持自定义线程池,只需要返回一个 TaskExecutor 实力交给Spring 管理。
附:文中相关代码地址:https://gitee.com/disanxian/test/tree/master/src/main/java/com/dsx/test
转载:https://blog.csdn.net/homework_tian/article/details/101468939