飞道的博客

Spring @Async 注解的使用以及原理(二)

278人阅读  评论(0)

       在上一篇中《Spring @Async 注解的使用以及原理(一)》简单介绍了@Async的使用,本篇简单分析一下原理,源码版本:spring-context-5.0.5.RELEASE.

@EnableAsync注解:


  
  1. /**
  2. * Enables Spring's asynchronous method execution capability, similar to functionality
  3. * found in Spring's {@code <task:*>} XML namespace.
  4. *
  5. * <p>To be used together with @{@link Configuration Configuration} classes as follows,
  6. * enabling annotation-driven async processing for an entire Spring application context:
  7. *
  8. * <pre class="code">
  9. * &#064;Configuration
  10. * &#064;EnableAsync
  11. * public class AppConfig {
  12. *
  13. * }</pre>
  14. *
  15. * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with
  16. * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous}
  17. * annotation, or any custom annotation specified via the {@link #annotation} attribute.
  18. * The aspect is added transparently for any registered bean, for instance via this
  19. * configuration:
  20. *
  21. * <pre class="code">
  22. * &#064;Configuration
  23. * public class AnotherAppConfig {
  24. *
  25. * &#064;Bean
  26. * public MyAsyncBean asyncBean() {
  27. * return new MyAsyncBean();
  28. * }
  29. * }</pre>
  30. *
  31. * (以上部分展示了与 @Configuration 注解搭配使用的场景)
  32. *
  33. * <p>By default, Spring will be searching for an associated thread pool definition:
  34. * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
  35. * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
  36. * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
  37. * will be used to process async method invocations. Besides, annotated methods having a
  38. * {@code void} return type cannot transmit any exception back to the caller. By default,
  39. * such uncaught exceptions are only logged.
  40. *
  41. * (默认情况下,Spring寻找一个唯一的TaskExecutor类型的bean 或者 bean名称是“taskExecutor”的Executor类型的bean。
  42. * 如果二者都不存在,则使用SimpleAsyncTaskExecutor进行异步方法的执行.)
  43. * 返回类型为void 无法将任何异常传送回调用方。 默认情况下,仅记录此类未捕获的异常。
  44. *
  45. * <p>To customize all this, implement {@link AsyncConfigurer} and provide:
  46. * <ul>
  47. * <li>your own {@link java.util.concurrent.Executor Executor} through the
  48. * {@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and</li>
  49. * <li>your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler
  50. * AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler
  51. * getAsyncUncaughtExceptionHandler()}
  52. * method.</li>
  53. * </ul>
  54. *
  55. * <pre class="code">
  56. * &#064;Configuration
  57. * &#064;EnableAsync
  58. * public class AppConfig implements AsyncConfigurer {
  59. *
  60. * &#064;Override
  61. * public Executor getAsyncExecutor() {
  62. * ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  63. * executor.setCorePoolSize(7);
  64. * executor.setMaxPoolSize(42);
  65. * executor.setQueueCapacity(11);
  66. * executor.setThreadNamePrefix("MyExecutor-");
  67. * executor.initialize();
  68. * return executor;
  69. * }
  70. *
  71. * &#064;Override
  72. * public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
  73. * return MyAsyncUncaughtExceptionHandler();
  74. * }
  75. * }</pre>
  76. *
  77. * <p>If only one item needs to be customized, {@code null} can be returned to
  78. * keep the default settings. Consider also extending from {@link AsyncConfigurerSupport}
  79. * when possible.
  80. *
  81. * <p>Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed
  82. * Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method
  83. * if you want a fully managed bean. In such circumstances it is no longer necessary to
  84. * manually call the {@code executor.initialize()} method as this will be invoked
  85. * automatically when the bean is initialized.
  86. *
  87. * <p>For reference, the example above can be compared to the following Spring XML
  88. * configuration:
  89. *
  90. * <pre class="code">
  91. * {@code
  92. * <beans>
  93. *
  94. * <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>
  95. *
  96. * <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>
  97. *
  98. * <bean id="asyncBean" class="com.foo.MyAsyncBean"/>
  99. *
  100. * <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>
  101. *
  102. * </beans>
  103. * }</pre>
  104. *
  105. * The above XML-based and JavaConfig-based examples are equivalent except for the
  106. * setting of the <em>thread name prefix</em> of the {@code Executor}; this is because
  107. * the {@code <task:executor>} element does not expose such an attribute. This
  108. * demonstrates how the JavaConfig-based approach allows for maximum configurability
  109. * through direct access to actual componentry.
  110. *
  111. * <p>The {@link #mode} attribute controls how advice is applied: If the mode is
  112. * {@link AdviceMode#PROXY} (the default), then the other attributes control the behavior
  113. * of the proxying. Please note that proxy mode allows for interception of calls through
  114. * the proxy only; local calls within the same class cannot get intercepted that way.
  115. *
  116. *({@link#mode}属性控制通知的应用方式:如果模式是{@link AdviceMode#PROXY}(默认值),则其他
  117. * 属性控制代理的行为。请注意,代理模式只允许通过代理拦截调用;同一类中的本身(自)调用不能这样被拦
  118. * 截。)
  119. * <p>Note that if the {@linkplain #mode} is set to {@link AdviceMode#ASPECTJ}, then the
  120. * value of the {@link #proxyTargetClass} attribute will be ignored. Note also that in
  121. * this case the {@code spring-aspects} module JAR must be present on the classpath, with
  122. * compile-time weaving or load-time weaving applying the aspect to the affected classes.
  123. * There is no proxy involved in such a scenario; local calls will be intercepted as well.
  124. *(注意,如果{@linkplain#mode}设置为{@link AdviceMode#ASPECTJ},则
  125. * {@link#proxyTargetClass}属性的值将被忽略。还要注意,在这种情况下{@code spring aspects}
  126. * 模块JAR必须出现在类路径上,编译时编织或加载时编织将方面应用于受影响的类。在这种情况下不涉及代
  127. * 理;本地调用也将被拦截。)
  128. *
  129. * @author Chris Beams
  130. * @author Juergen Hoeller
  131. * @author Stephane Nicoll
  132. * @author Sam Brannen
  133. * @since 3.1
  134. * @see Async
  135. * @see AsyncConfigurer
  136. * @see AsyncConfigurationSelector
  137. */
  138. @Target(ElementType.TYPE)
  139. @Retention(RetentionPolicy.RUNTIME)
  140. @Documented
  141. @Import(AsyncConfigurationSelector.class) //注意这个,重点
  142. public @interface EnableAsync {
  143. /**
  144. * Indicate the 'async' annotation type to be detected at either class
  145. * or method level.
  146. * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
  147. * {@code @javax.ejb.Asynchronous} annotation will be detected.
  148. * <p>This attribute exists so that developers can provide their own
  149. * custom annotation type to indicate that a method (or all methods of
  150. * a given class) should be invoked asynchronously.
  151. */
  152. /**
  153. * 指示要在类或方法级别检测到的“异步”注解类型。
  154. * 默认情况下,Spring的@ {@Async}注解和EJB 3.1{@code @javax.ejb.Asynchronous}
  155. 注解将被检测到。
  156.   * 此属性存在,以便开发人员可以提供自己的自定义注解类型,以指示一个方法(或的所有方法给定的类)
  157. * 应该异步调用。
  158. */
  159. Class<? extends Annotation> annotation() default Annotation.class;
  160. /**
  161. * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
  162. * to standard Java interface-based proxies.
  163. * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
  164. * <p>The default is {@code false}.
  165. * <p>Note that setting this attribute to {@code true} will affect <em>all</em>
  166. * Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
  167. * For example, other beans marked with Spring's {@code @Transactional} annotation
  168. * will be upgraded to subclass proxying at the same time. This approach has no
  169. * negative impact in practice unless one is explicitly expecting one type of proxy
  170. * vs. another &mdash; for example, in tests.
  171. */
  172. /**
  173. * 指示与基于标准Java接口的代理相反,是否要创建基于子类(CGLIB)的代理。
  174. * 仅在{@link #mode}设置为{@link AdviceMode#PROXY}时适用,默认false
  175. * 请注意,将此属性设置为{@code true}将影响所有需要代理的<em> all </ em> Spring管理的
  176. * bean,而不仅仅是标记为{@code @Async}的bean。 例如,其他标有Spring的{@code
  177. * @Transactional}批注的bean将同时升级为子类代理。 这种方法在实践中不会产生负面影响,除非在
  178. * 测试中明确期望一种代理相对于另一种代理。
  179. */
  180. boolean proxyTargetClass() default false;
  181. /**
  182. * Indicate how async advice should be applied.
  183. * <p><b>The default is {@link AdviceMode#PROXY}.</b>
  184. * Please note that proxy mode allows for interception of calls through the proxy
  185. * only. Local calls within the same class cannot get intercepted that way; an
  186. * {@link Async} annotation on such a method within a local call will be ignored
  187. * since Spring's interceptor does not even kick in for such a runtime scenario.
  188. * For a more advanced mode of interception, consider switching this to
  189. * {@link AdviceMode#ASPECTJ}.
  190. */
  191. /** 指示应如何应用异步通知。
  192. * <p><b>默认值是{@link AdviceMode#PROXY}</b>
  193. * 请注意,代理模式只允许通过代理拦截调用。同一个类中的方法自调用不能以这种方式被拦截;本地自调用中
  194. * 此类方法上的{@link Async}注解将被忽略,因为Spring的拦截器甚至不启动此类运行时场景。对于
  195. * 更高级的拦截模式,请考虑将其切换到{@link AdviceMode#ASPECTJ}。
  196. */
  197. AdviceMode mode() default AdviceMode.PROXY;
  198. /**
  199. * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
  200. * should be applied.
  201. * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
  202. * after all other post-processors, so that it can add an advisor to
  203. * existing proxies rather than double-proxy.
  204. */
  205. /**
  206. * 指示应用{@link AsyncAnnotationBeanPostProcessor}的顺序。
  207. * 默认值是{@link Ordered#LOWEST_PRECEDENCE},以便在所有其他后处理器之后运行,这样它就可
  208. * 以向现有代理添加一个advisor,而不是双重代理。
  209. */
  210. int order() default Ordered.LOWEST_PRECEDENCE;
  211. }

    @EnableAsync 注解中有一行重要的代码:@Import(AsyncConfigurationSelector.class),引入了相关的配置类:


  
  1. public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
  2. private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
  3. "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
  4. /**
  5. * {@inheritDoc}
  6. * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for
  7. * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively
  8. */
  9. @Override
  10. @Nullable
  11. public String[] selectImports(AdviceMode adviceMode) {
  12. switch (adviceMode) {
  13. case PROXY:
  14. return new String[] { ProxyAsyncConfiguration.class.getName() };
  15. case ASPECTJ:
  16. return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
  17. default:
  18. return null;
  19. }
  20. }
  21. }

ProxyAsyncConfiguration类:

     以AdviceMode.PROXY为例,进入到ProxyAsyncConfiguration类中:


  
  1. public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
  2. @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
  3. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  4. public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
  5. Assert.notNull( this.enableAsync, "@EnableAsync annotation metadata was not injected");
  6. //实例化注册一个AsyncAnnotationBeanPostProcessor类型的bean
  7. AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
  8. Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass( "annotation");
  9. //设置自定义的异步注解类型
  10. if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
  11. bpp.setAsyncAnnotationType(customAsyncAnnotation);
  12. }
  13. //设置执行器
  14. if ( this.executor != null) {
  15. bpp.setExecutor( this.executor);
  16. }
  17. //设置异常处理器
  18. if ( this.exceptionHandler != null) {
  19. bpp.setExceptionHandler( this.exceptionHandler);
  20. }
  21. //读取@EnableAsync注解的属性值,见父类的setImportMetadata方法
  22. bpp.setProxyTargetClass( this.enableAsync.getBoolean( "proxyTargetClass"));
  23. bpp.setOrder( this.enableAsync.<Integer>getNumber( "order"));
  24. return bpp;
  25. }
  26. }

AsyncAnnotationBeanPostProcessor类:

    看名称有点像bean后置处理器,继承实现接口比较复杂,可以用IDEA生成一下类图:

     AsyncAnnotationBeanPostProcessor类的代码比较多,需要清楚Spring bean的生命周期初始化过程,比如BeanFactoryAware、BeanPostProcessor等的执行顺序,另外还要对Spring AOP APIs(编程式创建 @AspectJ 代理)有所了解。

     在AbstractAdvisingBeanPostProcessor类中的postProcessAfterInitialization方法会会生成目标类的代理类


  
  1. public Object postProcessAfterInitialization(Object bean, String beanName) {
  2. if ( this.advisor == null || bean instanceof AopInfrastructureBean) {
  3. // Ignore AOP infrastructure such as scoped proxies.
  4. return bean;
  5. }
  6. // 添加advisor
  7. if (bean instanceof Advised) {
  8. Advised advised = (Advised) bean;
  9. if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
  10. // Add our local Advisor to the existing proxy's Advisor chain...
  11. if ( this.beforeExistingAdvisors) {
  12. advised.addAdvisor( 0, this.advisor);
  13. }
  14. else {
  15. advised.addAdvisor( this.advisor);
  16. }
  17. return bean;
  18. }
  19. }
  20. //bean为非代理类时进入构造目标类的代理工厂
  21. if (isEligible(bean, beanName)) {
  22. ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
  23. //添加代理的接口
  24. if (!proxyFactory.isProxyTargetClass()) {
  25. evaluateProxyInterfaces(bean.getClass(), proxyFactory);
  26. }
  27. //设置切面
  28. proxyFactory.addAdvisor( this.advisor);
  29. customizeProxyFactory(proxyFactory);
  30. //返回代理类
  31. return proxyFactory.getProxy(getProxyClassLoader());
  32. }
  33. // No proxy needed.
  34. return bean;
  35. }

 

     这里展示一下实例化的AsyncAnnotationBeanPostProcessor类对应的bean的结构帮助理解:

     可以看到这个 bean中持有一个AsyncAnnotationAdvisor类的对象advisor:buildAdvice()方法生成通知,buildPointcut生成切点。


  
  1. protected Advice buildAdvice(@Nullable Executor executor,
  2. AsyncUncaughtExceptionHandler exceptionHandler) {
  3. return new AnnotationAsyncExecutionInterceptor(executor, exceptionHandler);
  4. }
  5. /**
  6. * Calculate a pointcut for the given async annotation types, if any.
  7. * @param asyncAnnotationTypes the async annotation types to introspect
  8. * @return the applicable Pointcut object, or {@code null} if none
  9. */
  10. protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
  11. ComposablePointcut result = null;
  12. for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
  13. Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
  14. Pointcut mpc = new AnnotationMatchingPointcut( null, asyncAnnotationType, true);
  15. if (result == null) {
  16. result = new ComposablePointcut(cpc);
  17. }
  18. else {
  19. result.union(cpc);
  20. }
  21. result = result.union(mpc);
  22. }
  23. return (result != null ? result : Pointcut.TRUE);
  24. }

AsyncAnnotationAdvisor中的buildAdvice()方法,生成了AnnotationAsyncExecutionInterceptor对象,它的父类AsyncExecutionInterceptor重写了AsyncExecutionInterceptor接口的invoke方法,通过委托实现@Async异步方法的调用。在invoke()方法中获取执行器executor,创建Callable异步线程任务,提交到执行器executor(对应的线程池)中执行。


  
  1. public Object invoke(final MethodInvocation invocation) throws Throwable {
  2. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  3. Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
  4. final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
  5. AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
  6. if (executor == null) {
  7. throw new IllegalStateException(
  8. "No executor specified and no default executor set on AsyncExecutionInterceptor either");
  9. }
  10. Callable<Object> task = () -> {
  11. try {
  12. Object result = invocation.proceed();
  13. if (result instanceof Future) {
  14. return ((Future<?>) result).get();
  15. }
  16. }
  17. catch (ExecutionException ex) {
  18. handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
  19. }
  20. catch (Throwable ex) {
  21. handleError(ex, userDeclaredMethod, invocation.getArguments());
  22. }
  23. return null;
  24. };
  25. return doSubmit(task, executor, invocation.getMethod().getReturnType());
  26. }

总结:

     @Async和@EnableAsync注解实现方法异步调用底层是通过AOP线程池实现的。

备注:AsyncAnnotationBeanPostProcessor类的实例化过程涉及到低级的AOP APIs有些复杂,需要对Spring bean生命周期解以及Spring AOP APIs的使用有所掌握。最好打断点,调试,感到混乱时,可以先debugger出代码块(比如方法)的结果,再进到代码中分析。

Spring bean生命周期大致流程:https://blog.csdn.net/qq_22076345/article/details/105580031

Spring AOP APIs官方文档:https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/core.html#aop-api


转载:https://blog.csdn.net/qq_22076345/article/details/82229447
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场