飞道的博客

ThreadLocal、多线程、CountDownLatch使用实例

525人阅读  评论(0)

  
  1. @Service
  2. @Slf4j
  3. public class WeatherServiceImpl implements IWeatherService {
  4. @Resource
  5. private WeatherServiceApi weatherServiceApi;
  6. @Resource
  7. private RedisTemplate redisTemplate;
  8. @Value( "${redis.weather}")
  9. private Integer redisWeather;
  10. private ThreadLocal<WeatherHourlyAndLifestyleNow> hourlyThreadLocal = new ThreadLocal();
  11. private ThreadLocal<WeatherDailyForecastAndLifestyleNow> dailyThreadLocal = new ThreadLocal<>();
  12. @Override
  13. public WeatherHourlyAndLifestyleNow queryCurrentDayWeather(String location,String lifestyle) {
  14. Long st = System.currentTimeMillis();
  15. String weatherKey = RedisEnum.CpspWeather.LOCATION_LIFESTYLE_SAVE_HOURLY_WEATHER.getKey(location,lifestyle);
  16. WeatherHourlyAndLifestyleNow data = (WeatherHourlyAndLifestyleNow)redisTemplate.opsForValue().get(weatherKey);
  17. if ( null != data){
  18. return data;
  19. }
  20. //data = new WeatherHourlyAndLifestyleNow();
  21. OnTheDayHourlyWeatherThreadLocal onTheDayHourlyWeatherThreadLocal = new OnTheDayHourlyWeatherThreadLocal(location,weatherServiceApi,redisTemplate,redisWeather, null, null, true);
  22. hourlyThreadLocal.set(onTheDayHourlyWeatherThreadLocal.queryCurrentDayWeather(location, lifestyle));
  23. data = hourlyThreadLocal.get();
  24. // CountDownLatch countDownLatch = new CountDownLatch(hourlyCountDownLatch);
  25. // FutureTask weatherHourlyTask = new FutureTask(new WeatherHourlyTask(countDownLatch,location,weatherServiceApi,redisTemplate,redisWeather));
  26. // FutureTask lifestyleTask = new FutureTask(new LifestyleTask(countDownLatch,location,weatherServiceApi,redisTemplate,redisWeather,lifestyle));
  27. // FutureTask weatherForecastTask = new FutureTask(new WeatherForecastTask(countDownLatch,location,weatherServiceApi,redisTemplate,redisWeather,null,null,true));
  28. // FutureTask airQualityTask = new FutureTask(new AirQualityTask(countDownLatch,location,weatherServiceApi,redisTemplate,redisWeather,null,null,true));
  29. // ExecutorService executorService = Executors.newFixedThreadPool(hourlyCountDownLatch);
  30. // executorService.submit(weatherHourlyTask);
  31. // executorService.submit(lifestyleTask);
  32. // executorService.submit(weatherForecastTask);
  33. // executorService.submit(airQualityTask);
  34. // Long ft = System.currentTimeMillis();
  35. // log.debug("==> ft run consume time :{}, countDownLatch :{}",ft-st,countDownLatch);
  36. // try {
  37. // countDownLatch.await();
  38. // data.setHourlyList((List<WeatherHourly>) weatherHourlyTask.get());
  39. // data.setLifestyleNows((List<WeatherLifestyleNow>) lifestyleTask.get());
  40. // WeatherDailyForecastVo dailyForecastVo = (WeatherDailyForecastVo)weatherForecastTask.get();
  41. // if (null != dailyForecastVo){
  42. // data.setDailyForecast(dailyForecastVo.getDailyForecastList().get(0));
  43. // data.setBaseInfoVo(dailyForecastVo.getBaseInfoVo());
  44. // }
  45. // List<AirQuality> airQualityList = (List<AirQuality>)airQualityTask.get();
  46. // if(!CollectionUtils.isEmpty(airQualityList)){
  47. // data.setAirQuality(airQualityList.get(0));
  48. // }
  49. // } catch (InterruptedException | ExecutionException e) {
  50. // log.error("error:",e);
  51. // throw ExceptionEnum.QUERY_WEATHER_ERROR.getException();
  52. // }finally {
  53. // executorService.shutdown();
  54. // }
  55. if ( null == data){
  56. redisTemplate.opsForValue().set(weatherKey,data,redisWeather,TimeUnit.SECONDS);
  57. }
  58. log.debug( "");
  59. Long ed = System.currentTimeMillis();
  60. log.debug( "===> weatherServiceImpl queryCurrentDayWeather consume time total : {}",ed - st);
  61. log.debug( "");
  62. return data;
  63. }
  64. }

 


  
  1. @Slf4j
  2. public class OnTheDayHourlyWeatherThreadLocal {
  3. private String location;
  4. private WeatherServiceApi weatherServiceApi;
  5. private RedisTemplate redisTemplate;
  6. private Integer redisWeather;
  7. private String startDate;
  8. private String endDate;
  9. private Boolean onTheDay;
  10. private int hourlyCountDownLatch = 4;
  11. private static ThreadLocal<FutureTask> threadLocal = new ThreadLocal();
  12. private CountDownLatch countDownLatch = new CountDownLatch( 4);
  13. public OnTheDayHourlyWeatherThreadLocal(String location, WeatherServiceApi weatherServiceApi,
  14. RedisTemplate redisTemplate, Integer redisWeather, String startDate, String endDate,
  15. Boolean onTheDay){
  16. this.location = location;
  17. this.weatherServiceApi = weatherServiceApi;
  18. this.redisTemplate = redisTemplate;
  19. this.redisWeather = redisWeather;
  20. this.startDate = startDate;
  21. this.endDate = endDate;
  22. this.onTheDay = onTheDay;
  23. }
  24. public WeatherHourlyAndLifestyleNow queryCurrentDayWeather(String location, String lifestyle) {
  25. Long st = System.currentTimeMillis();
  26. String weatherKey = RedisEnum.CpspWeather.LOCATION_LIFESTYLE_SAVE_HOURLY_WEATHER.getKey(location,lifestyle);
  27. WeatherHourlyAndLifestyleNow data = (WeatherHourlyAndLifestyleNow)redisTemplate.opsForValue().get(weatherKey);
  28. if ( null != data){
  29. return data;
  30. }
  31. data = new WeatherHourlyAndLifestyleNow();
  32. FutureTask weatherHourlyTask = new FutureTask( new WeatherHourlyTask(countDownLatch,location,weatherServiceApi,redisTemplate,redisWeather));
  33. FutureTask lifestyleTask = new FutureTask( new LifestyleTask(countDownLatch,location,weatherServiceApi,redisTemplate,redisWeather,lifestyle));
  34. FutureTask weatherForecastTask = new FutureTask( new WeatherForecastTask(countDownLatch,location,weatherServiceApi,redisTemplate,redisWeather, null, null, true));
  35. FutureTask airQualityTask = new FutureTask( new AirQualityTask(countDownLatch,location,weatherServiceApi,redisTemplate,redisWeather, null, null, true));
  36. ExecutorService executorService = Executors.newFixedThreadPool(hourlyCountDownLatch);
  37. executorService.submit(weatherHourlyTask);
  38. executorService.submit(lifestyleTask);
  39. executorService.submit(weatherForecastTask);
  40. executorService.submit(airQualityTask);
  41. try {
  42. countDownLatch.await( 3,TimeUnit.SECONDS);
  43. data.setHourlyList((List<WeatherHourly>) weatherHourlyTask.get());
  44. data.setLifestyleNows((List<WeatherLifestyleNow>) lifestyleTask.get());
  45. WeatherDailyForecastVo dailyForecastVo = (WeatherDailyForecastVo)weatherForecastTask.get();
  46. if ( null != dailyForecastVo){
  47. data.setDailyForecast(dailyForecastVo.getDailyForecastList().get( 0));
  48. data.setBaseInfoVo(dailyForecastVo.getBaseInfoVo());
  49. }
  50. List<AirQuality> airQualityList = (List<AirQuality>)airQualityTask.get();
  51. if(!CollectionUtils.isEmpty(airQualityList)){
  52. data.setAirQuality(airQualityList.get( 0));
  53. }
  54. } catch (InterruptedException | ExecutionException e) {
  55. log.error( "error:",e);
  56. throw ExceptionEnum.QUERY_WEATHER_ERROR.getException();
  57. } finally {
  58. executorService.shutdown();
  59. }
  60. Long ed = System.currentTimeMillis();
  61. log.debug( "===> WeatherHourlyThreadLocal queryCurrentDayWeather consume time total : {}",ed - st);
  62. log.debug( "");
  63. return data;
  64. }
  65. }

 

其中一个FutureTask


  
  1. @Slf4j
  2. public class AirQualityTask implements Callable<List<AirQuality>> {
  3. private CountDownLatch countDownLatch;
  4. private String location;
  5. private WeatherServiceApi weatherServiceApi;
  6. private RedisTemplate redisTemplate;
  7. private Integer redisWeather;
  8. private String startDate;
  9. private String endDate;
  10. private Boolean onTheDay;
  11. public AirQualityTask (CountDownLatch countDownLatch,String location,WeatherServiceApi weatherServiceApi,
  12. RedisTemplate redisTemplate,Integer redisWeather,String startDate,String endDate,Boolean onTheDay){
  13. this.countDownLatch = countDownLatch;
  14. this.location = location;
  15. this.weatherServiceApi = weatherServiceApi;
  16. this.redisTemplate = redisTemplate;
  17. this.redisWeather = redisWeather;
  18. this.startDate = startDate;
  19. this.endDate = endDate;
  20. this.onTheDay = onTheDay;
  21. }
  22. @Override
  23. public List<AirQuality> call() throws Exception {
  24. try{
  25. Long st = System.currentTimeMillis();
  26. List<AirQuality> forecastAirQuality = null;
  27. if (onTheDay){
  28. startDate = LocalDate.now().toString();
  29. forecastAirQuality = queryAirQualityForecast(location,startDate,startDate);
  30. } else{
  31. forecastAirQuality = queryAirQualityForecast(location,startDate,startDate);
  32. }
  33. Long ed = System.currentTimeMillis();
  34. log.debug( "");
  35. log.debug( "==> AirQualityTask consume time :{},countDownLatch:{}",ed-st,countDownLatch);
  36. return forecastAirQuality;
  37. } catch (Exception e){
  38. throw e;
  39. } finally {
  40. countDownLatch.countDown();
  41. }
  42. }
  43. /**
  44. * 获取未来几天空气质量预报 (第三方文档只写了 3-7天)
  45. *
  46. * @param location
  47. * @param startDate 开始日期
  48. * @param endDate 结束日期
  49. */
  50. public List<AirQuality> queryAirQualityForecast(String location,String startDate, String endDate) {
  51. LocalDate queryStart = DateUtil.stringTransLocalDate(startDate);
  52. LocalDate queryEnd = DateUtil.stringTransLocalDate(endDate);
  53. return queryForecastAir(location, queryStart, queryEnd);
  54. }
  55. private List<AirQuality> queryForecastAir(String location, LocalDate queryStart, LocalDate queryEnd) {
  56. Long period = DateUtil.localDatePeriod(queryStart, queryEnd);
  57. if (period > 9) {
  58. throw new BusinessException(ExceptionEnum.QUERY_OUT_RANGE);
  59. }
  60. boolean noCache = false;
  61. String redisKey = null;
  62. LocalDate startDate = null;
  63. List<AirQuality> list = new ArrayList<>();
  64. for ( int index = 0; index < period; index++) {
  65. startDate = queryStart;
  66. startDate = startDate.plusDays(index);
  67. redisKey = RedisEnum.CpspWeather.LOCATION_DATE_SAVE_FORECAST_AIR_QUALITY.getKey(location, startDate.toString());
  68. AirQuality airQuality = (AirQuality) redisTemplate.opsForValue().get(redisKey);
  69. if (airQuality == null) {
  70. list = new ArrayList<>();
  71. noCache = true;
  72. break;
  73. } else {
  74. list.add(airQuality);
  75. }
  76. }
  77. if (noCache) {
  78. List<AirQuality> aqList = getAirQualityForecast(location, ServiceConstant.AIR_QUALITY_FORECAST);
  79. if (CollectionUtils.isEmpty(aqList)) {
  80. log.info( "==> queryCacheDailyAir no data back!");
  81. throw ExceptionEnum.CPSP_AIR_PARSE_ERROR.getException();
  82. }
  83. for (AirQuality aq : aqList) {
  84. for ( int day = 0; day < period; day++) {
  85. startDate = queryStart;
  86. startDate = startDate.plusDays(day);
  87. if (startDate.toString().equals(aq.getQualityDate())) {
  88. list.add(aq);
  89. }
  90. }
  91. //setRedisForecastAir(aq,location);
  92. redisKey = RedisEnum.CpspWeather.LOCATION_DATE_SAVE_FORECAST_AIR_QUALITY.getKey(location, aq.getQualityDate());
  93. RedisOpsForValueAsync redisOpsForValueAsync = new RedisOpsForValueAsync(redisTemplate,redisWeather,redisKey,aq);
  94. WeatherExecutorService executorService = new WeatherExecutorService();
  95. executorService.submit(redisOpsForValueAsync);
  96. }
  97. }
  98. return list;
  99. }
  100. 子线程的子线程这个注解是不起作用的,只有主线程的才会被spring管理
  101. // @Async
  102. // public void setRedisForecastAir(AirQuality aq,String location){
  103. // String redisKey = RedisEnum.CpspWeather.LOCATION_DATE_SAVE_FORECAST_AIR_QUALITY.getKey(location, aq.getQualityDate());
  104. // redisTemplate.opsForValue().set(redisKey, aq, redisWeather, TimeUnit.SECONDS);
  105. // }
  106. /**
  107. * 查询未来几天空气质量 不包含当天
  108. *
  109. * @param location
  110. * @param queryType
  111. * @return
  112. */
  113. private List<AirQuality> getAirQualityForecast(String location, String queryType) {
  114. JSONObject response = weatherServiceApi.queryAirQualityForecast(location);
  115. //log.debug("==> cpsp getAirQualityForecast response : {}", response);
  116. if (Objects.isNull(response)) {
  117. log.error( "==> cpsp response is null!");
  118. throw new BusinessException(ExceptionEnum.CPSP_RESPONSE_TIMEOUT);
  119. }
  120. return CommonWeather.setAirQualityForecast(response,location);
  121. }
  122. }

 

 


转载:https://blog.csdn.net/datszhang/article/details/105599489
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场