小言_互联网的博客

SpringBoot集成Redisson实现延迟队列

400人阅读  评论(0)

使用场景

1、下单成功,30分钟未支付。支付超时,自动取消订单

2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评

3、下单成功,商家5分钟未接单,订单取消

4、配送超时,推送短信提醒

......

对于延时比较长的场景、实时性不高的场景,我们可以采用任务调度的方式定时轮询处理。如:xxl-job

今天我们采用一种比较简单、轻量级的方式,使用 Redis 的延迟队列来进行处理。当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案。如:使用消息中间件Kafka、RabbitMQ 的延迟队列

先不讨论其实现原理,直接实战上代码先实现基于 Redis 的延迟队列

1、引入 Redisson 依赖


  
  1. <dependency>
  2. <groupId>org.redisson </groupId>
  3. <artifactId>redisson-spring-boot-starter </artifactId>
  4. <version>3.10.5 </version>
  5. </dependency>

2、Nacos 配置 Redis 连接


  
  1. spring:
  2. redis:
  3. host: 127.0 .0 .1
  4. port: 6379
  5. password: 123456
  6. database: 12
  7. timeout: 3000

3、创建 RedissonConfig 配置


  
  1. /**
  2. * Created by LPB on 2020/04/20.
  3. */
  4. @Configuration
  5. public class RedissonConfig {
  6. @Value("${spring.redis.host}")
  7. private String host;
  8. @Value("${spring.redis.port}")
  9. private int port;
  10. @Value("${spring.redis.database}")
  11. private int database;
  12. @Value("${spring.redis.password}")
  13. private String password;
  14. @Bean
  15. public RedissonClient redissonClient() {
  16. Config config = new Config();
  17. config.useSingleServer()
  18. .setAddress( "redis://" + host + ":" + port)
  19. .setDatabase(database)
  20. .setPassword(password);
  21. return Redisson.create(config);
  22. }
  23. }

4、封装 Redis 延迟队列工具类


  
  1. /**
  2. * redis延迟队列工具
  3. * Created by LPB on 2021/04/20.
  4. */
  5. @Slf4j
  6. @Component
  7. public class RedisDelayQueueUtil {
  8. @Autowired
  9. private RedissonClient redissonClient;
  10. /**
  11. * 添加延迟队列
  12. * @param value 队列值
  13. * @param delay 延迟时间
  14. * @param timeUnit 时间单位
  15. * @param queueCode 队列键
  16. * @param <T>
  17. */
  18. public <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode){
  19. try {
  20. RBlockingDeque< Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
  21. RDelayedQueue< Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
  22. delayedQueue.offer(value, delay, timeUnit);
  23. log.info( "(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");
  24. } catch ( Exception e) {
  25. log. error( "(添加延时队列失败) {}", e.getMessage());
  26. throw new RuntimeException( "(添加延时队列失败)");
  27. }
  28. }
  29. /**
  30. * 获取延迟队列
  31. * @param queueCode
  32. * @param <T>
  33. * @return
  34. * @throws InterruptedException
  35. */
  36. public <T> T getDelayQueue( String queueCode) throws InterruptedException {
  37. RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);
  38. T value = (T) blockingDeque.take();
  39. return value;
  40. }
  41. }

5、创建延迟队列业务枚举


  
  1. /**
  2. * 延迟队列业务枚举
  3. * Created by LPB on 2021/04/20.
  4. */
  5. @Getter
  6. @NoArgsConstructor
  7. @AllArgsConstructor
  8. public enum RedisDelayQueueEnum {
  9. ORDER_PAYMENT_TIMEOUT( "ORDER_PAYMENT_TIMEOUT", "订单支付超时,自动取消订单", "orderPaymentTimeout"),
  10. ORDER_TIMEOUT_NOT_EVALUATED( "ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated");
  11. /**
  12. * 延迟队列 Redis Key
  13. */
  14. private String code;
  15. /**
  16. * 中文描述
  17. */
  18. private String name;
  19. /**
  20. * 延迟队列具体业务实现的 Bean
  21. * 可通过 Spring 的上下文获取
  22. */
  23. private String beanId;
  24. }

6、定义延迟队列执行器


  
  1. /**
  2. * 延迟队列执行器
  3. * Created by LPB on 2021/04/20.
  4. */
  5. public interface RedisDelayQueueHandle< T> {
  6. void execute(T t);
  7. }

7、创建枚举中定义的Bean,并实现延迟队列执行器

  • OrderPaymentTimeout:订单支付超时延迟队列处理类

  
  1. /**
  2. * 订单支付超时处理类
  3. * Created by LPB on 2021/04/20.
  4. */
  5. @Component
  6. @Slf4j
  7. public class OrderPaymentTimeout implements RedisDelayQueueHandle<Map> {
  8. @Override
  9. public void execute(Map map) {
  10. log .info( "(收到订单支付超时延迟消息) {}", map);
  11. // TODO 订单支付超时,自动取消订单处理业务...
  12. }
  13. }
  • OrderTimeoutNotEvaluated:订单超时未评价延迟队列处理类

  
  1. /**
  2. * 订单超时未评价处理类
  3. * Created by LPB on 2021/04/20.
  4. */
  5. @Component
  6. @Slf4j
  7. public class OrderTimeoutNotEvaluated implements RedisDelayQueueHandle<Map> {
  8. @Override
  9. public void execute(Map map) {
  10. log .info( "(收到订单超时未评价延迟消息) {}", map);
  11. // TODO 订单超时未评价,系统默认好评处理业务...
  12. }
  13. }

8、创建延迟队列消费线程,项目启动完成后开启


  
  1. /**
  2. * 启动延迟队列
  3. * Created by LPB on 2021/04/20.
  4. */
  5. @Slf4j
  6. @Component
  7. public class RedisDelayQueueRunner implements CommandLineRunner {
  8. @Autowired
  9. private RedisDelayQueueUtil redisDelayQueueUtil;
  10. @Override
  11. public void run(String... args) {
  12. new Thread(() -> {
  13. while ( true){
  14. try {
  15. RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
  16. for (RedisDelayQueueEnum queueEnum : queueEnums) {
  17. Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
  18. if (value != null) {
  19. RedisDelayQueueHandle redisDelayQueueHandle = SpringUtil.getBean(queueEnum.getBeanId());
  20. redisDelayQueueHandle.execute(value);
  21. }
  22. }
  23. } catch (InterruptedException e) {
  24. log.error( "(Redis延迟队列异常中断) {}", e.getMessage());
  25. }
  26. }
  27. }).start();
  28. log.info( "(Redis延迟队列启动成功)");
  29. }
  30. }

以上步骤,Redis 延迟队列核心代码已经完成,下面我们写一个测试接口,用 PostMan 模拟测试一下

 

9、创建一个测试接口,模拟添加延迟队列


  
  1. /**
  2. * 延迟队列测试
  3. * Created by LPB on 2020/04/20.
  4. */
  5. @RestController
  6. public class RedisDelayQueueController {
  7. @Autowired
  8. private RedisDelayQueueUtil redisDelayQueueUtil;
  9. @PostMapping( "/addQueue")
  10. public void addQueue() {
  11. Map< String, String> map1 = new HashMap<>();
  12. map1.put( "orderId", "100");
  13. map1.put( "remark", "订单支付超时,自动取消订单");
  14. Map< String, String> map2 = new HashMap<>();
  15. map2.put( "orderId", "200");
  16. map2.put( "remark", "订单超时未评价,系统默认好评");
  17. // 添加订单支付超时,自动取消订单延迟队列。为了测试效果,延迟10秒钟
  18. redisDelayQueueUtil.addDelayQueue(map1, 10, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode());
  19. // 订单超时未评价,系统默认好评。为了测试效果,延迟20秒钟
  20. redisDelayQueueUtil.addDelayQueue(map2, 20, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_TIMEOUT_NOT_EVALUATED.getCode());
  21. }
  22. }

10、启动 SpringBoot 项目,用 PostMan 调用接口添加延迟队列

  • 通过 Redis 客户端可看到两个延迟队列已添加成功

  • 查看 IDEA 控制台日志可看到延迟队列已消费成功

 


转载:https://blog.csdn.net/qq_40087415/article/details/115940092
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场