飞道的博客

Java 多线程上下文传递在复杂场景下的实践

437人阅读  评论(0)

一、引言

海外商城从印度做起,慢慢的会有一些其他国家的诉求,这个时候需要我们针对当前的商城做一个改造,可以支撑多个国家的商城,这里会涉及多个问题,多语言,多国家,多时区,本地化等等。在多国家的情况下如何把识别出来的国家信息传递下去,一层一层直到代码执行的最后一步。甚至还有一些多线程的场景需要处理。

二、背景技术

2.1 ThreadLocal

ThreadLocal是最容易想到了,入口识别到国家信息后,丢进ThreadLocal,这样后续代码、redis、DB等做国家区分的时候都能使用到。

这里先简单介绍一下ThreadLocal:


  
  1. /**
  2. * Sets the current thread's copy of this thread-local variable
  3. * to the specified value. Most subclasses will have no need to
  4. * override this method, relying solely on the {@link #initialValue}
  5. * method to set the values of thread-locals.
  6. *
  7. * @param value the value to be stored in the current thread's copy of
  8. * this thread-local.
  9. */
  10. public void set(T value) {
  11. Thread t = Thread.currentThread();
  12. ThreadLocalMap map = getMap(t);
  13. if (map != null)
  14. map.set( this, value);
  15. else
  16. createMap(t, value);
  17. }
  18. /**
  19. * Returns the value in the current thread's copy of this
  20. * thread-local variable. If the variable has no value for the
  21. * current thread, it is first initialized to the value returned
  22. * by an invocation of the {@link #initialValue} method.
  23. *
  24. * @return the current thread's value of this thread-local
  25. */
  26. public T get() {
  27. Thread t = Thread.currentThread();
  28. ThreadLocalMap map = getMap(t);
  29. if (map != null) {
  30. ThreadLocalMap.Entry e = map.getEntry( this);
  31. if (e != null) {
  32. @SuppressWarnings("unchecked")
  33. T result = (T)e.value;
  34. return result;
  35. }
  36. }
  37. return setInitialValue();
  38. }
  39. /**
  40. * Get the map associated with a ThreadLocal. Overridden in
  41. * InheritableThreadLocal.
  42. *
  43. * @param t the current thread
  44. * @return the map
  45. */
  46. ThreadLocalMap getMap(Thread t) {
  47. return t.threadLocals;
  48. }
  49. /**
  50. * Get the entry associated with key. This method
  51. * itself handles only the fast path: a direct hit of existing
  52. * key. It otherwise relays to getEntryAfterMiss. This is
  53. * designed to maximize performance for direct hits, in part
  54. * by making this method readily inlinable.
  55. *
  56. * @param key the thread local object
  57. * @return the entry associated with key, or null if no such
  58. */
  59. private Entry getEntry(ThreadLocal<?> key) {
  60. int i = key.threadLocalHashCode & (table.length - 1);
  61. Entry e = table[i];
  62. if (e != null && e.get() == key)
  63. return e;
  64. else
  65. return getEntryAfterMiss(key, i, e);
  66. }

 

  • 每一个Thread线程都有属于自己的threadLocals(ThreadLocalMap),里面有一个弱引用的Entry(ThreadLocal,Object)。
  • get方法首先通过Thread.currentThread得到当前线程,然后拿到线程的threadLocals(ThreadLocalMap),再从Entry中取得当前线程存储的value。

  • set值的时候更改当前线程的threadLocals(ThreadLocalMap)中Entry对应的value值。

实际使用中除了同步方法之外,还有起异步线程处理的场景,这个时候就需要把ThreadLocal的内容从父线程传递给子线程,这个怎么办呢?

不急,Java 还有InheritableThreadLocal来帮我们解决这个问题。

2.2 InheritableThreadLoca


   
  1. public class InheritableThreadLocal<T> extends ThreadLocal<T> {
  2. /**
  3. * Computes the child's initial value for this inheritable thread-local
  4. * variable as a function of the parent's value at the time the child
  5. * thread is created. This method is called from within the parent
  6. * thread before the child is started.
  7. * <p>
  8. * This method merely returns its input argument, and should be overridden
  9. * if a different behavior is desired.
  10. *
  11. * @param parentValue the parent thread's value
  12. * @return the child thread's initial value
  13. */
  14. protected T childValue(T parentValue) {
  15. return parentValue;
  16. }
  17. /**
  18. * Get the map associated with a ThreadLocal.
  19. *
  20. * @param t the current thread
  21. */
  22. ThreadLocalMap getMap(Thread t) {
  23. return t.inheritableThreadLocals;
  24. }
  25. /**
  26. * Create the map associated with a ThreadLocal.
  27. *
  28. * @param t the current thread
  29. * @param firstValue value for the initial entry of the table.
  30. */
  31. void createMap(Thread t, T firstValue) {
  32. t.inheritableThreadLocals = new ThreadLocalMap( this, firstValue);
  33. }
  34. }
  • java.lang.Thread#init(java.lang.ThreadGroup, java.lang.Runnable, java.lang.String, long, java.security.AccessControlContext, boolean)

   
  1. if (inheritThreadLocals && parent.inheritableThreadLocals != null)
  2. this.inheritableThreadLocals =
  3. ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
  • InheritableThreadLocal操作的是inheritableThreadLocals这个变量,而不是ThreadLocal操作的threadLocals变量。
  • 创建新线程的时候会检查父线程中parent.inheritableThreadLocals变量是否为null,如果不为null则复制一份parent.inheritableThreadLocals的数据到子线程的this.inheritableThreadLocals中去。
  • 因为复写了getMap(Thread)和CreateMap()方法直接操作inheritableThreadLocals,这样就实现了在子线程中获取父线程ThreadLocal值。

现在在使用多线程的时候,都是通过线程池来做的,这个时候用InheritableThreadLocal可以吗?会有什么问题吗?先看下下面的代码的执行情况:

  • test


   
  1. static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
  2. public static void main(String[] args) throws InterruptedException {
  3. ExecutorService executorService = Executors.newFixedThreadPool( 1);
  4. inheritableThreadLocal.set( "i am a inherit parent");
  5. executorService.execute( new Runnable() {
  6. @Override
  7. public void run() {
  8. System.out.println(inheritableThreadLocal.get());
  9. }
  10. });
  11. TimeUnit.SECONDS.sleep( 1);
  12. inheritableThreadLocal.set( "i am a new inherit parent"); // 设置新的值
  13. executorService.execute( new Runnable() {
  14. @Override
  15. public void run() {
  16. System.out.println(inheritableThreadLocal.get());
  17. }
  18. });
  19. }
  20. i am a inherit parent
  21. i am a inherit parent
  22. public static void main(String[] args) throws InterruptedException {
  23. ExecutorService executorService = Executors.newFixedThreadPool( 1);
  24. inheritableThreadLocal.set( "i am a inherit parent");
  25. executorService.execute( new Runnable() {
  26. @Override
  27. public void run() {
  28. System.out.println(inheritableThreadLocal.get());
  29. inheritableThreadLocal.set( "i am a old inherit parent"); // 子线程中设置新的值
  30. }
  31. });
  32. TimeUnit.SECONDS.sleep( 1);
  33. inheritableThreadLocal.set( "i am a new inherit parent"); // 主线程设置新的值
  34. executorService.execute( new Runnable() {
  35. @Override
  36. public void run() {
  37. System.out.println(inheritableThreadLocal.get());
  38. }
  39. });
  40. }
  41. i am a inherit parent
  42. i am a old inherit parent

这里看第一个执行结果,发现主线程第二次设置的值,没有改掉,还是第一次设置的值“i am a inherit parent”,这是什么原因呢?

再看第二个例子的执行结果,发现在第一个任务中设置的“i am a old inherit parent"的值,在第二个任务中打印出来了。这又是什么原因呢?

回过头来看看上面的源码,在线程池的情况下,第一次创建线程的时候会从父线程中copy inheritableThreadLocals中的数据,所以第一个任务成功拿到了父线程设置的”i am a inherit parent“,第二个任务执行的时候复用了第一个任务的线程,并不会触发复制父线程中的inheritableThreadLocals操作,所以即使在主线程中设置了新的值,也会不生效。同时get()方法是直接操作inheritableThreadLocals这个变量的,所以就直接拿到了第一个任务设置的值。

那遇到线程池应该怎么办呢?

2.3 TransmittableThreadLocal

TransmittableThreadLocal(TTL)这个时候就派上用场了。这是阿里开源的一个组件,我们来看看它怎么解决线程池的问题,先来一段代码,在上面的基础上修改一下,使用TransmittableThreadLocal。


  
  1. static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>(); // 使用TransmittableThreadLocal
  2. public static void main(String[] args) throws InterruptedException {
  3. ExecutorService executorService = Executors.newFixedThreadPool( 1);
  4. executorService = TtlExecutors.getTtlExecutorService(executorService); // 用TtlExecutors装饰线程池
  5. transmittableThreadLocal.set( "i am a transmittable parent");
  6. executorService.execute( new Runnable() {
  7. @Override
  8. public void run() {
  9. System.out.println(transmittableThreadLocal.get());
  10. transmittableThreadLocal.set( "i am a old transmittable parent"); // 子线程设置新的值
  11. }
  12. });
  13. System.out.println(transmittableThreadLocal.get());
  14. TimeUnit.SECONDS.sleep( 1);
  15. transmittableThreadLocal.set( "i am a new transmittable parent"); // 主线程设置新的值
  16. executorService.execute( new Runnable() {
  17. @Override
  18. public void run() {
  19. System.out.println(transmittableThreadLocal.get());
  20. }
  21. });
  22. }
  23. i am a transmittable parent
  24. i am a transmittable parent
  25. i am a new transmittable parent

执行代码后发现,使用TransmittableThreadLocalTtlExecutors.getTtlExecutorService(executorService)装饰线程池之后,在每次调用任务的时,都会将当前的主线程的TransmittableThreadLocal数据copy到子线程里面,执行完成后,再清除掉。同时子线程里面的修改回到主线程时其实并没有生效。这样可以保证每次任务执行的时候都是互不干涉的。这是怎么做到的呢?来看源码。

  • TtlExecutors和TransmittableThreadLocal源码


  
  1. private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
  2. this.capturedRef = new AtomicReference<Object>(capture());
  3. this.runnable = runnable;
  4. this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
  5. }
  6. com.alibaba.ttl.TtlRunnable#run
  7. /**
  8. * wrap method {@link Runnable#run()}.
  9. */
  10. @Override
  11. public void run() {
  12. Object captured = capturedRef.get(); // 获取线程的ThreadLocalMap
  13. if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
  14. throw new IllegalStateException( "TTL value reference is released after run!");
  15. }
  16. Object backup = replay(captured); // 暂存当前子线程的ThreadLocalMap到backup
  17. try {
  18. runnable.run();
  19. } finally {
  20. restore(backup); // 恢复线程执行时被改版的Threadlocal对应的值
  21. }
  22. }
  23. com.alibaba.ttl.TransmittableThreadLocal.Transmitter#replay
  24. /**
  25. * Replay the captured {@link TransmittableThreadLocal} values from {@link #capture()},
  26. * and return the backup {@link TransmittableThreadLocal} values in current thread before replay.
  27. *
  28. * @param captured captured {@link TransmittableThreadLocal} values from other thread from {@link #capture()}
  29. * @return the backup {@link TransmittableThreadLocal} values before replay
  30. * @see #capture()
  31. * @since 2.3.0
  32. */
  33. public static Object replay(Object captured) {
  34. @SuppressWarnings("unchecked")
  35. Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
  36. Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
  37. for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
  38. iterator.hasNext(); ) {
  39. Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
  40. TransmittableThreadLocal<?> threadLocal = next.getKey();
  41. // backup
  42. backup.put(threadLocal, threadLocal.get());
  43. // clear the TTL value only in captured
  44. // avoid extra TTL value in captured, when run task.
  45. if (!capturedMap.containsKey(threadLocal)) {
  46. iterator.remove();
  47. threadLocal.superRemove();
  48. }
  49. }
  50. // set value to captured TTL
  51. for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : capturedMap.entrySet()) {
  52. @SuppressWarnings("unchecked")
  53. TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
  54. threadLocal.set(entry.getValue());
  55. }
  56. // call beforeExecute callback
  57. doExecuteCallback( true);
  58. return backup;
  59. }
  60. com.alibaba.ttl.TransmittableThreadLocal.Transmitter#restore
  61. /**
  62. * Restore the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}.
  63. *
  64. * @param backup the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}
  65. * @since 2.3.0
  66. */
  67. public static void restore(Object backup) {
  68. @SuppressWarnings("unchecked")
  69. Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
  70. // call afterExecute callback
  71. doExecuteCallback( false);
  72. for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
  73. iterator.hasNext(); ) {
  74. Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
  75. TransmittableThreadLocal<?> threadLocal = next.getKey();
  76. // clear the TTL value only in backup
  77. // avoid the extra value of backup after restore
  78. if (!backupMap.containsKey(threadLocal)) {
  79. iterator.remove();
  80. threadLocal.superRemove();
  81. }
  82. }
  83. // restore TTL value
  84. for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : backupMap.entrySet()) {
  85. @SuppressWarnings("unchecked")
  86. TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
  87. threadLocal.set(entry.getValue());
  88. }
  89. }

可以看下整个过程的完整时序图:

OK,既然问题都解决了,来看看实际使用吧,有两种使用,先看第一种,涉及HTTP请求、Dubbo请求和 job,采用的是数据级别的隔离。

三、 TTL 在海外商城的实际应用

3.1 不分库,分数据行 + SpringMVC

用户 HTTP 请求,首先我们要从url或者cookie中解析出国家编号,然后在TransmittableThreadLocal中存放国家信息,在 MyBatis 的拦截器中读取国家数据,进行sql改造,最终操作指定的国家数据,多线程场景下用TtlExecutors包装原有自定义线程池,保障在使用线程池的时候能够正确将国家信息传递下去。

  •  HTTP 请求


  
  1. @Bean
  2. public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
  3. return TtlThreadPoolExecutors.getAsyncExecutor();
  4. }
  5. public class TtlThreadPoolExecutors {
  6. private static final String COMMON_BUSINESS = "COMMON_EXECUTOR";
  7. public static final int QUEUE_CAPACITY = 20000;
  8. public static ExecutorService getExecutorService() {
  9. return TtlExecutorServiceMananger.getExecutorService(COMMON_BUSINESS);
  10. }
  11. public static ExecutorService getExecutorService(String threadGroupName) {
  12. return TtlExecutorServiceMananger.getExecutorService(threadGroupName);
  13. }
  14. public static ThreadPoolTaskExecutor getAsyncExecutor() {
  15. // 用TtlExecutors装饰Executor,结合TransmittableThreadLocal解决异步线程threadlocal传递问题
  16. return getTtlThreadPoolTaskExecutor(initTaskExecutor());
  17. }
  18. private static ThreadPoolTaskExecutor initTaskExecutor () {
  19. return initTaskExecutor(TtlThreadPoolFactory.DEFAULT_CORE_SIZE, TtlThreadPoolFactory.DEFAULT_POOL_SIZE, QUEUE_CAPACITY);
  20. }
  21. private static ThreadPoolTaskExecutor initTaskExecutor (int coreSize, int poolSize, int executorQueueCapacity) {
  22. ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  23. taskExecutor.setCorePoolSize(coreSize);
  24. taskExecutor.setMaxPoolSize(poolSize);
  25. taskExecutor.setQueueCapacity(executorQueueCapacity);
  26. taskExecutor.setKeepAliveSeconds( 120);
  27. taskExecutor.setAllowCoreThreadTimeOut( true);
  28. taskExecutor.setThreadNamePrefix( "TaskExecutor-ttl");
  29. taskExecutor.initialize();
  30. return taskExecutor;
  31. }
  32. private static ThreadPoolTaskExecutor getTtlThreadPoolTaskExecutor(ThreadPoolTaskExecutor executor) {
  33. if ( null == executor || executor instanceof ThreadPoolTaskExecutorWrapper) {
  34. return executor;
  35. }
  36. return new ThreadPoolTaskExecutorWrapper(executor);
  37. }
  38. }
  39. /**
  40. * @ClassName : LocaleContextHolder
  41. * @Description : 本地化信息上下文holder
  42. */
  43. public class LocalizationContextHolder {
  44. private static TransmittableThreadLocal<LocalizationContext> localizationContextHolder = new TransmittableThreadLocal<>();
  45. private static LocalizationInfo defaultLocalizationInfo = new LocalizationInfo();
  46. private LocalizationContextHolder(){}
  47. public static LocalizationContext getLocalizationContext() {
  48. return localizationContextHolder.get();
  49. }
  50. public static void resetLocalizationContext () {
  51. localizationContextHolder.remove();
  52. }
  53. public static void setLocalizationContext (LocalizationContext localizationContext) {
  54. if(localizationContext == null) {
  55. resetLocalizationContext();
  56. } else {
  57. localizationContextHolder.set(localizationContext);
  58. }
  59. }
  60. public static void setLocalizationInfo (LocalizationInfo localizationInfo) {
  61. LocalizationContext localizationContext = getLocalizationContext();
  62. String brand = (localizationContext instanceof BrandLocalizationContext ?
  63. ((BrandLocalizationContext) localizationContext).getBrand() : null);
  64. if(StringUtils.isNotEmpty(brand)) {
  65. localizationContext = new SimpleBrandLocalizationContext(localizationInfo, brand);
  66. } else if(localizationInfo != null) {
  67. localizationContext = new SimpleLocalizationContext(localizationInfo);
  68. } else {
  69. localizationContext = null;
  70. }
  71. setLocalizationContext(localizationContext);
  72. }
  73. public static void setDefaultLocalizationInfo(@Nullable LocalizationInfo localizationInfo) {
  74. LocalizationContextHolder.defaultLocalizationInfo = localizationInfo;
  75. }
  76. public static LocalizationInfo getLocalizationInfo () {
  77. LocalizationContext localizationContext = getLocalizationContext();
  78. if(localizationContext != null) {
  79. LocalizationInfo localizationInfo = localizationContext.getLocalizationInfo();
  80. if(localizationInfo != null) {
  81. return localizationInfo;
  82. }
  83. }
  84. return defaultLocalizationInfo;
  85. }
  86. public static String getCountry(){
  87. return getLocalizationInfo().getCountry();
  88. }
  89. public static String getTimezone(){
  90. return getLocalizationInfo().getTimezone();
  91. }
  92. public static String getBrand(){
  93. return getBrand(getLocalizationContext());
  94. }
  95. public static String getBrand(LocalizationContext localizationContext) {
  96. if(localizationContext == null) {
  97. return null;
  98. }
  99. if(localizationContext instanceof BrandLocalizationContext) {
  100. return ((BrandLocalizationContext) localizationContext).getBrand();
  101. }
  102. throw new LocaleException( "unsupported localizationContext type");
  103. }
  104. }
  105. @Override
  106. public LocaleContext resolveLocaleContext(final HttpServletRequest request) {
  107. parseLocaleCookieIfNecessary(request);
  108. LocaleContext localeContext = new TimeZoneAwareLocaleContext() {
  109. @Override
  110. public Locale getLocale() {
  111. return (Locale) request.getAttribute(LOCALE_REQUEST_ATTRIBUTE_NAME);
  112. }
  113. @Override
  114. public TimeZone getTimeZone() {
  115. return (TimeZone) request.getAttribute(TIME_ZONE_REQUEST_ATTRIBUTE_NAME);
  116. }
  117. };
  118. // 设置线程中的国家标志
  119. setLocalizationInfo(request, localeContext.getLocale());
  120. return localeContext;
  121. }
  122. private void setLocalizationInfo(HttpServletRequest request, Locale locale) {
  123. String country = locale!= null?locale.getCountry(): null;
  124. String language = locale!= null?(locale.getLanguage() + "_" + locale.getVariant()): null;
  125. LocaleRequestMessage localeRequestMessage = localeRequestParser.parse(request);
  126. final String countryStr = country;
  127. final String languageStr = language;
  128. final String brandStr = localeRequestMessage.getBrand();
  129. LocalizationContextHolder.setLocalizationContext( new BrandLocalizationContext() {
  130. @Override
  131. public String getBrand() {
  132. return brandStr;
  133. }
  134. @Override
  135. public LocalizationInfo getLocalizationInfo() {
  136. return LocalizationInfoAssembler.assemble(countryStr, languageStr);
  137. }
  138. });
  139. }

对于 Dubbo 接口和无法判断国家信息的 HTTP 接口,在入参部分增加国家信息参数,通过拦截器或者手动set国家信息到TransmittableThreadLocal。

对于定时任务 job,因为所有国家都需要执行,所以会把所有国家进行遍历执行,这也可以通过简单的注解来解决。

这个版本的改造,点检测试也基本通过了,自动化脚本验证也是没问题的,不过因为业务发展问题最终没上线。

3.2 分库 + SpringBoot

后续在建设新的国家商城的时候,分库分表方案调整为每个国家独立数据库,同时整体开发框架升级到SpringBoot,我们把这套方案做了升级,总体思路是一样的,只是在实现细节上略有不同。

SpringBoot 里面的异步一般通过@Async这个注解来实现,通过自定义线程池来包装,使用时在 HTTP 请求判断locale信息的写入国家信息,后续完成切DB的操作。

对于 Dubbo 接口和无法判断国家信息的 HTTP 接口,在入参部分增加国家信息参数,通过拦截器或者手动set国家信息到TransmittableThreadLocal。

对于定时任务job,因为所有国家都需要执行,所以会把所有国家进行遍历执行,这也可以通过简单的注解和AOP来解决。

四、总结

本文从业务拓展的角度阐述了在复杂业务场景下如何通过ThreadLocal,过渡到InheritableThreadLocal,再通过TransmittableThreadLocal解决实际业务问题。因为海外的业务在不断的探索中前进,技术也在不断的探索中演进,面对这种复杂多变的情况,我们的应对策略是先做国际化,再做本地化,more global才能more local,多国家的隔离只是国际化最基本的起点,未来还有很多业务和技术等着我们去挑战。

作者:vivo 官网商城开发团队


转载:https://blog.csdn.net/vivo_tech/article/details/113500269
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场