飞道的博客

冷饭新炒:理解Redisson中分布式锁的实现

338人阅读  评论(0)

前提

在很早很早之前,写过一篇文章介绍过Redis中的red lock的实现,但是在生产环境中,笔者所负责的项目使用的分布式锁组件一直是RedissonRedisson是具备多种内存数据网格特性的基于Java编写的Redis客户端框架(Redis Java Client with features of In-Memory Data Grid),基于Redis的基本数据类型扩展出很多种实现的高级数据结构,具体见其官方的简介图:

本文要分析的R(ed)Lock实现,只是其中一个很小的模块,其他高级特性可以按需选用。下面会从基本原理、源码分析和基于Jedis仿实现等内容进行展开。本文分析的Redisson源码是2020-01左右Redisson项目的main分支源码,对应版本是3.14.1

基本原理

red lock的基本原理其实就"光明正大地"展示在Redis官网的首页文档中(具体链接是https://redis.io/topics/distlock):

摘录一下简介进行翻译:在许多环境中不同进程必须以互斥方式使用共享资源进行操作时,分布式锁是一个非常有用的原语。此试图提供一种更规范的算法来实现Redis的分布式锁。我们提出了一种称为Redlock的算法,它实现了DLM(猜测是Distributed Lock Manager的缩写,分布式锁管理器),我们认为它比普通的单实例方法更安全。

算法的三个核心特征(三大最低保证):

  • Safety property(安全性):互斥。确保在任何给定时刻下,只有一个客户端可以持有锁

  • Liveness property A(活性A):无死锁。即使存在曾经锁定资源的客户端崩溃或者出现网络分区异常,确保锁总是能够成功获取

  • Liveness property B(活性B):容错性。只要大多数Redis节点处于正常运行状态,客户端就可以获取和释放锁

文档中还指出了目前算法对于故障转移的实现还存在明显的竞态条件问题(描述的应该是Redis主从架构下的问题):

  • 客户端A获取Redis主节点中的锁(假设锁定的资源为X

  • Redis主节点把KEY同步到Redis从节点之前,Redis主节点崩溃

  • Redis从节点因为故障晋升为主节点

  • 此时,客户端B获取资源X的锁成功,问题是资源X的锁在前面已经被客户端A获取过,这样就出现了并发问题

算法的实现很简单,单个Redis实例下加锁命令如下:

SET $resource_name $random_value NX PX $ttl

这里的NxPXSET命令的增强参数,自从Redis2.6.12版本起,SET命令已经提供了可选的复合操作符:

  • EX:设置超时时间,单位是秒

  • PX:设置超时时间,单位是毫秒

  • NXIF NOT EXIST的缩写,只有KEY不存在的前提下才会设置K-V,设置成功返回1,否则返回0

  • XXIF EXIST的缩写,只有在KEY存在的前提下才会设置K-V,设置成功返回1,否则返回0

单个Redis实例下解锁命令如下:


   
  1. # KEYS[ 1] = $resource_name
  2. # ARGV[ 1] = $random_value
  3. if redis.call( "get",KEYS[ 1]) == ARGV[ 1] then
  4.      return redis.call( "del",KEYS[ 1])
  5. else
  6.      return  0
  7. end

使用Redisson中的RLock

使用RLock要先实例化RedissonRedisson已经适配了Redis的哨兵、集群、普通主从和单机模式,因为笔者本地只安装了单机Redis,所以这里使用单机模式配置进行演示。实例化RedissonClient


   
  1. static RedissonClient REDISSON;
  2. @BeforeClass
  3. public static void beforeClass() throws Exception {
  4.     Config config =  new Config();
  5.      // 单机
  6.     config.useSingleServer()
  7.             .setTimeout( 10000)
  8.             .setAddress( "redis://127.0.0.1:6379");
  9.     REDISSON = Redisson.create(config);
  10. //        // 主从
  11. //        config.useMasterSlaveServers()
  12. //                .setMasterAddress("主节点连接地址")
  13. //                .setSlaveAddresses(Sets.newHashSet("从节点连接地址"));
  14. //        REDISSON = Redisson.create(config);
  15. //        // 哨兵
  16. //        config.useSentinelServers()
  17. //                .setMasterName("Master名称")
  18. //                .addSentinelAddress(new String[]{"哨兵连接地址"});
  19. //        REDISSON = Redisson.create(config);
  20. //        // 集群
  21. //        config.useClusterServers()
  22. //                .addNodeAddress(new String[]{"集群节点连接地址"});
  23. //        REDISSON = Redisson.create(config);
  24. }

加锁和解锁:


   
  1. @Test
  2. public void testLockAndUnLock() throws Exception {
  3.     String resourceName =  "resource:x";
  4.     RLock lock = REDISSON.getLock(resourceName);
  5.     Thread threadA =  new Thread(() -> {
  6.         try {
  7.             lock.lock();
  8.             process(resourceName);
  9.         } finally {
  10.             lock.unlock();
  11.             System.out. println(String.format( "线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
  12.         }
  13.     },  "threadA");
  14.     Thread threadB =  new Thread(() -> {
  15.         try {
  16.             lock.lock();
  17.             process(resourceName);
  18.         } finally {
  19.             lock.unlock();
  20.             System.out. println(String.format( "线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
  21.         }
  22.     },  "threadB");
  23.     threadA.start();
  24.     threadB.start();
  25.     Thread.sleep(Long.MAX_VALUE);
  26. }
  27. private void process(String resourceName) {
  28.     String threadName = Thread.currentThread().getName();
  29.     System.out. println(String.format( "线程%s获取到资源%s的锁", threadName, resourceName));
  30.     try {
  31.         Thread.sleep( 1000);
  32.     } catch (InterruptedException ignore) {
  33.     }
  34. }
  35. // 某次执行的输出结果
  36. 线程threadB获取到资源resource:x的锁
  37. 线程threadB释放资源resource:x的锁
  38. 线程threadA获取到资源resource:x的锁
  39. 线程threadA释放资源resource:x的锁

更多的时候,我们会选用带等待时间周期和锁最大持有时间的API


   
  1. @Test
  2. public void testTryLockAndUnLock() throws Exception {
  3.     String resourceName =  "resource:x";
  4.      int waitTime =  500;
  5.      int leaseTime =  1000;
  6.     Thread threadA =  new Thread(() -> {
  7.         process(resourceName, waitTime, leaseTime);
  8.     },  "threadA");
  9.     Thread threadB =  new Thread(() -> {
  10.         process(resourceName, waitTime, leaseTime);
  11.     },  "threadB");
  12.     threadA.start();
  13.     threadB.start();
  14.     Thread.sleep(Long.MAX_VALUE);
  15. }
  16. private void process(String resourceName,  int waitTime,  int leaseTime) {
  17.     RLock lock = REDISSON.getLock(resourceName);
  18.     try {
  19.         String threadName = Thread.currentThread().getName();
  20.         boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
  21.          if (tryLock) {
  22.             try {
  23.                 System.out. println(String.format( "线程%s获取到资源%s的锁", threadName, resourceName));
  24.                 Thread.sleep( 800);
  25.             } finally {
  26.                 lock.unlock();
  27.                 System.out. println(String.format( "线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
  28.             }
  29.         }  else {
  30.             System.out. println(String.format( "线程%s获取资源%s的锁失败,等待时间:%d ms", threadName, resourceName, waitTime));
  31.         }
  32.     } catch (InterruptedException e) {
  33.         Thread.currentThread().interrupt();
  34.     }
  35. }
  36. // 某次执行的输出结果
  37. 线程threadA获取到资源resource:x的锁
  38. 线程threadB获取资源resource:x的锁失败,等待时间: 500 ms
  39. 线程threadA释放资源resource:x的锁

为了使用的时候更加简单,可以参考spring-tx中的编程式事务那样进行轻度封装:


   
  1. @RequiredArgsConstructor
  2. private static class RedissonLockProvider {
  3.     private final RedissonClient redissonClient;
  4.     public <T> T executeInLock(String resourceName, LockAction lockAction) {
  5.         RLock lock = redissonClient.getLock(resourceName);
  6.         try {
  7.             lock.lock();
  8.             lockAction.onAcquire(resourceName);
  9.              return lockAction.doInLock(resourceName);
  10.         } finally {
  11.             lock.unlock();
  12.             lockAction.onExit(resourceName);
  13.         }
  14.     }
  15.     public <T> T executeInLock(String resourceName,  int waitTime,  int leaseTime, LockAction lockAction) throws InterruptedException {
  16.         RLock lock = redissonClient.getLock(resourceName);
  17.         boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
  18.          if (tryLock) {
  19.             try {
  20.                 lockAction.onAcquire(resourceName);
  21.                  return lockAction.doInLock(resourceName);
  22.             } finally {
  23.                 lock.unlock();
  24.                 lockAction.onExit(resourceName);
  25.             }
  26.         }
  27.          return null;
  28.     }
  29.     public void executeInLockWithoutResult(String resourceName,  int waitTime,  int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException {
  30.         RLock lock = redissonClient.getLock(resourceName);
  31.         boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
  32.          if (tryLock) {
  33.             try {
  34.                 lockAction.onAcquire(resourceName);
  35.                 lockAction.doInLock(resourceName);
  36.             } finally {
  37.                 lock.unlock();
  38.                 lockAction.onExit(resourceName);
  39.             }
  40.         }
  41.     }
  42.     public void executeInLockWithoutResult(String resourceName, LockActionWithoutResult lockAction) {
  43.         RLock lock = redissonClient.getLock(resourceName);
  44.         try {
  45.             lock.lock();
  46.             lockAction.onAcquire(resourceName);
  47.             lockAction.doInLock(resourceName);
  48.         } finally {
  49.             lock.unlock();
  50.             lockAction.onExit(resourceName);
  51.         }
  52.     }
  53. }
  54. @FunctionalInterface
  55. interface LockAction {
  56.      default void onAcquire(String resourceName) {
  57.     }
  58.     <T> T doInLock(String resourceName);
  59.      default void onExit(String resourceName) {
  60.     }
  61. }
  62. @FunctionalInterface
  63. interface LockActionWithoutResult {
  64.      default void onAcquire(String resourceName) {
  65.     }
  66.     void doInLock(String resourceName);
  67.      default void onExit(String resourceName) {
  68.     }
  69. }

使用RedissonLockProvider(仅供参考):


   
  1. @Test
  2. public void testRedissonLockProvider() throws Exception {
  3.     RedissonLockProvider provider =  new RedissonLockProvider(REDISSON);
  4.     String resourceName =  "resource:x";
  5.     Thread threadA =  new Thread(() -> {
  6.         provider.executeInLockWithoutResult(resourceName,  new LockActionWithoutResult() {
  7.             @Override
  8.             public void onAcquire(String resourceName) {
  9.                 System.out. println(String.format( "线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
  10.             }
  11.             @Override
  12.             public void doInLock(String resourceName) {
  13.                 try {
  14.                     Thread.sleep( 800);
  15.                 } catch (InterruptedException ignore) {
  16.                 }
  17.             }
  18.             @Override
  19.             public void onExit(String resourceName) {
  20.                 System.out. println(String.format( "线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
  21.             }
  22.         });
  23.     },  "threadA");
  24.     Thread threadB =  new Thread(() -> {
  25.         provider.executeInLockWithoutResult(resourceName,  new LockActionWithoutResult() {
  26.             @Override
  27.             public void onAcquire(String resourceName) {
  28.                 System.out. println(String.format( "线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
  29.             }
  30.             @Override
  31.             public void doInLock(String resourceName) {
  32.                 try {
  33.                     Thread.sleep( 800);
  34.                 } catch (InterruptedException ignore) {
  35.                 }
  36.             }
  37.             @Override
  38.             public void onExit(String resourceName) {
  39.                 System.out. println(String.format( "线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
  40.             }
  41.         });
  42.     },  "threadB");
  43.     threadA.start();
  44.     threadB.start();
  45.     Thread.sleep(Long.MAX_VALUE);
  46. }
  47. // 某次执行结果
  48. 线程threadA获取到资源resource:x的锁
  49. 线程threadA释放资源resource:x的锁
  50. 线程threadB获取到资源resource:x的锁
  51. 线程threadB释放资源resource:x的锁

Redisson中RLock的实现原理

RedissonRLock的实现是基本参照了Redisred lock算法进行实现,不过在原始的red lock算法下进行了改良,主要包括下面的特性:

  • 互斥

  • 无死锁

  • 可重入,类似于ReentrantLock,同一个线程可以重复获取同一个资源的锁(一般使用计数器实现),锁的重入特性一般情况下有利于提高资源的利用率

  • 「续期」,这个是一个比较前卫解决思路,也就是如果一个客户端对资源X永久锁定,那么并不是直接对KEY生存周期设置为-1,而是通过一个守护线程每隔固定周期延长KEY的过期时间,这样就能实现「在守护线程不被杀掉的前提下,避免客户端崩溃导致锁无法释放长期占用资源的问题」

  • 锁状态变更订阅,依赖于org.redisson.pubsub.LockPubSub,用于订阅和通知锁释放事件

  • 不是完全参考red lock算法的实现,数据类型选用了HASH,配合Lua脚本完成多个命令的原子性

续期或者说延长KEY的过期时间在Redisson使用watch dog实现,理解为用于续期的守护线程,底层依赖于Netty的时间轮HashedWheelTimer和任务io.netty.util.Timeout实现,「俗称看门狗」,下面会详细分析。

先看RLock的类图:

这里有一个疑惑点,RedissonRedLock(RedissonMultiLock的子类)的注释中提到RedLock locking algorithm implementation for multiple locks. It manages all locks as one. 但从直观上看,RedissonLock才是整个锁体系的核心,里面的实现思路也是遵从red lock算法的。

RedissonLock就是RLock的直接实现,也是分布式锁实现的核心类,从源码中看到Redisson#getLock()就是直接实例化RedissonLock


   
  1. public class Redisson implements RedissonClient {
  2.     
  3.      // ...... 省略其他代码
  4.     @Override
  5.     public RLock getLock(String name) {
  6.          return  new RedissonLock(connectionManager.getCommandExecutor(), name);
  7.     }
  8.      // ...... 省略其他代码
  9. }

因此只需要围绕RedissonLock的源码进行分析即可。RedissonLock的类继承图如下:

这里需要有几点认知:

  • RedissonLock实现了java.util.concurrent.locks.Lock接口中除了newCondition()方法外的所有方法,也就是可以基本无缝适配Lock接口,对于习惯Lock接口的API的使用者来说是一个福音

  • RedissonLock基本所有同步API都依赖于异步API的实现,也就是RLock的实现依赖于RLockAsync的实现,底层依赖的是Nettyio.netty.util.concurrent.Promise,具体见RedissonPromise,如果用过JUC中的Future的开发者应该比较熟悉Future#get(),这里的做法类似

  • 右边的几个父类的简单功能描述如下:

    • RObjectAsync:所有Redisson对象的基础接口,提供一些内存测量、对象拷贝、移动等的异步方法

    • RObjectRObjectAsync的同步版本

    • RExpirableAsync:提供对象TTL相关的异步方法

    • RExpirableRExpirableAsync的同步版本

    • RedissonObject:直接实现类RObject接口中的方法

    • RedissonExpirable:主要是实现了RExpirable接口中的方法

接着先看RedissonLock的构造函数和核心属性:


   
  1. // 存放entryName -> ExpirationEntry,用于获取当前entryName的线程重入计数器和续期任务
  2. private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP =  new ConcurrentHashMap<>();
  3. // 内部的锁持有的最大时间,来源于参数Config#lockWatchdogTimeout,用于控制续期的周期
  4. protected long internalLockLeaseTime;
  5. // ID,唯一标识,是一个UUID
  6. final String id;
  7. // 
  8. final String entryName;
  9. // 锁释放事件订阅发布相关
  10. protected final LockPubSub pubSub;
  11. // 命令异步执行器实例
  12. final CommandAsyncExecutor commandExecutor;
  13. /**
  14.  * CommandAsyncExecutor是命令的异步执行器,里面的方法是相对底层的面向通讯框架的方法,包括异步写、异步读和同步结果获取等
  15.  * name参数就是getLock()时候传入的参数,其实就是最终同步到Redis中的KEY
  16.  */
  17. public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
  18.     super(commandExecutor, name);
  19.     this.commandExecutor = commandExecutor;
  20.      // 这里的ID为外部初始化的UUID实例,调用toString()
  21.     this.id = commandExecutor.getConnectionManager().getId();
  22.     this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
  23.      // 这里的entryName = uuid值 + : + 外部传进来的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
  24.     this.entryName = id +  ":" + name;
  25.      // 初始化LockPubSub实例,用于订阅和发布锁释放的事件
  26.     this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
  27. }
  28. // RedissonLock内部类ExpirationEntry,存放着线程重入的计数器和续期的Timeout任务
  29. public static class ExpirationEntry {
  30.     
  31.      // 线程ID -> 线程重入的次数
  32.     private final Map<Long, Integer> threadIds =  new LinkedHashMap<>();
  33.     private volatile Timeout timeout;
  34.     
  35.     public ExpirationEntry() {
  36.         super();
  37.     }
  38.     
  39.      // 这个方法主要记录线程重入的计数
  40.     public void addThreadId(long threadId) {
  41.         Integer counter = threadIds.get(threadId);
  42.          if (counter == null) {
  43.             counter =  1;
  44.         }  else {
  45.             counter++;
  46.         }
  47.         threadIds.put(threadId, counter);
  48.     }
  49.     public boolean hasNoThreads() {
  50.          return threadIds.isEmpty();
  51.     }
  52.     public Long getFirstThreadId() {
  53.          if (threadIds.isEmpty()) {
  54.              return null;
  55.         }
  56.          return threadIds.keySet().iterator().next();
  57.     }
  58.     public void removeThreadId(long threadId) {
  59.         Integer counter = threadIds.get(threadId);
  60.          if (counter == null) {
  61.              return;
  62.         }
  63.         counter--;
  64.          if (counter ==  0) {
  65.             threadIds.remove(threadId);
  66.         }  else {
  67.             threadIds.put(threadId, counter);
  68.         }
  69.     }
  70.     
  71.     public void setTimeout(Timeout timeout) {
  72.         this.timeout = timeout;
  73.     }
  74.     public Timeout getTimeout() {
  75.          return timeout;
  76.     }
  77. }

这里需要关注一下Config中的lockWatchdogTimeout参数:

翻译一下大意:lockWatchdogTimeout参数只有在没有使用leaseTimeout参数定义的成功获取到锁的场景(简单来说就是不设置时限的加锁)下生效,如果看门狗在下一个lockWatchdogTimeout周期内不进行续期,那么锁就会过期释放(「从源码上看,每三分之一lockWatchdogTimeout就会执行一次续期任务,每次通过pexpireKEY的存活周期延长lockWatchdogTimeout),lockWatchdogTimeout的默认值为30000,也就是30秒。

这里先列举一下RedissonLock中获取名称的方法,以便后面分析这些名称作为K-V结构的KEY时候使用:

  • id:由配置实例化时候实例化的UUID实例生成,从源码上分析每个连接方式的Redisson实例有唯一的UUIDConnectionManager初始化的时候会调用UUID id = UUID.randomUUID(),笔者认为可以理解为Redisson实例在某个应用程序进程中的唯一标识,毕竟一般情况下,一个应用程序应该只会应用一种Redisson的连接方式

  • getEntryName():返回的是UUID + : + $KEY,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x

  • getName():返回的是$KEY,例如resource:x

  • getChannelName():返回的是redisson_lock__channel:{$KEY},例如redisson_lock__channel:{resource:x}

  • getLockName(long threadId):返回的是UUID + : + $threadId,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1

接着看加锁的方法,核心实现主要是:

  • private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedExceptionlock方法体系

  • public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedExceptiontryLock方法体系

先看只包含锁最大持有时间的lock()方法体系:


   
  1. /**
  2.  * 获取锁,不指定等待时间,只指定锁的最大持有时间
  3.  * 通过interruptibly参数配置支持中断
  4.  */
  5. private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  6.     long threadId = Thread.currentThread().getId();
  7.      // 尝试获取锁,返回的ttl为空代表获取锁成功,返回的ttl代表已经存在的KEY的剩余存活时间
  8.     Long ttl = tryAcquire(leaseTime, unit, threadId);
  9.      // lock acquired
  10.      if (ttl == null) {
  11.          return;
  12.     }
  13.      // 订阅redisson_lock__channel:{$KEY},其实本质的目的是为了客户端通过Redis的订阅发布,感知到解锁的事件
  14.      // 这个方法会在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry实例中存放着RPromise<RedissonLockEntry>结果,一个信号量形式的锁和订阅方法重入计数器
  15.      // 下面的死循环中的getEntry()或者RPromise<RedissonLockEntry>#getNow()就是从这个映射中获取的
  16.     RFuture<RedissonLockEntry> future = subscribe(threadId);
  17.      // 同步订阅执行,获取注册订阅Channel的响应,区分是否支持中断
  18.      if (interruptibly) {
  19.         commandExecutor.syncSubscriptionInterrupted(future);
  20.     }  else {
  21.         commandExecutor.syncSubscription(future);
  22.     }
  23.      // 走到下面的for循环说明返回的ttl不为空,也就是Redis已经存在对应的KEY,有其他客户端已经获取到锁,此客户端线程的调用需要阻塞等待获取锁
  24.     try {
  25.         while ( true) {
  26.              // 死循环中尝试获取锁,这个是后面会分析的方法
  27.             ttl = tryAcquire(leaseTime, unit, threadId);
  28.              // 返回的ttl为空,说明获取到锁,跳出死循环,这个死循环或者抛出中断异常,或者获取到锁成功break跳出,没有其他方式
  29.              if (ttl == null) {
  30.                  break;
  31.             }
  32.              // 这个ttl来源于等待存在的锁的KEY的存活时间,直接使用许可为0的信号量进行阻塞等待,下面的几个分支判断都是大同小异,只是有的支持超时时间,有的支持中断
  33.              // 有的是永久阻塞直到锁释放事件订阅LockPubSub的onMessage()方法回调激活getLatch().release()进行解锁才会往下走
  34.              // 这里可以学到一个特殊的技巧,Semaphore(0),信号量的许可设置为0,首个调用acquire()的线程会被阻塞,直到其他线程调用此信号量的release()方法才会解除阻塞,类似于一个CountDownLatch(1)的效果
  35.              if (ttl >=  0) {
  36.                 try {
  37.                     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  38.                 } catch (InterruptedException e) {
  39.                      if (interruptibly) {
  40.                         throw e;
  41.                     }
  42.                     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  43.                 }
  44.             }  else {
  45.                  if (interruptibly) {
  46.                     future.getNow().getLatch().acquire();
  47.                 }  else {
  48.                     future.getNow().getLatch().acquireUninterruptibly();
  49.                 }
  50.             }
  51.         }
  52.     } finally {
  53.          // 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件
  54.         unsubscribe(future, threadId);
  55.     }
  56. }
  57. // 这是一个异步转同步的方法,类似于FutureTask#get(),关键看调用的tryAcquireAsync()方法
  58. private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
  59.      return get(tryAcquireAsync(leaseTime, unit, threadId));
  60. }
  61. /**
  62.  * 通过传入锁持有的最大时间和线程ID异步获取锁
  63.  */
  64. private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
  65.      // 锁持有最大时间不为-1,也就是明确锁的持有时间,不是永久持有的场景
  66.      if (leaseTime !=  -1) {
  67.          return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  68.     }
  69.      // 走到这里说明是leaseTime == -1,KEY不设置过期时间的分支,需要启动看门狗机制。尝试内部异步获取锁,注意这里的lockWatchdogTimeout是从配置中获取传进去,不是内部的internalLockLeaseTime属性,这里的默认值还是30000毫秒
  70.     RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  71.     ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
  72.          // 执行异常场景直接返回
  73.          if (e != null) {
  74.              return;
  75.         }
  76.          // 成功获取到锁的场景,需要基于线程ID启用看门狗,通过时间轮指定定时任务进行续期
  77.          if (ttlRemaining == null) {
  78.              // 定时调度进行续期操作
  79.             scheduleExpirationRenewal(threadId);
  80.         }
  81.     });
  82.      return ttlRemainingFuture;
  83. }
  84. /**
  85.  * 转换锁持有最大时间,通过参数进行加锁的LUA脚本调用 
  86.  * getName()就是传入的KEY,如resource:x getLockName()就是锁的名称,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
  87.  * internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout
  88.  */
  89. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  90.      // 时间转换为毫秒,注意一点这里的internalLockLeaseTime是类内的属性,被重新赋值了
  91.     internalLockLeaseTime = unit.toMillis(leaseTime);
  92.      // 底层向Redis服务执行LUA脚本
  93.      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
  94.                  "if (redis.call('exists', KEYS[1]) == 0) then " +
  95.                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  96.                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  97.                      "return nil; " +
  98.                  "end; " +
  99.                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  100.                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  101.                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  102.                      "return nil; " +
  103.                  "end; " +
  104.                  "return redis.call('pttl', KEYS[1]);",
  105.                 Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
  106. }

「先留意一下属性internalLockLeaseTime,它在tryLockInnerAsync()方法内被重新赋值,在leaseTime == -1L的前提下,它被赋值为lockWatchdogTimeout,这个细节很重要,决定了后面续期方法(看门口)的调度频率。另外,leaseTime != -1L不会进行续期,也就是不会启动看门狗机制。」

接着需要仔细分析一下tryLockInnerAsync()中执行的LUA脚本,笔者把它提取出来通过注释进行描述:


   
  1. -- KEYS[ 1] == getName() --> $KEY --> resource:x
  2. -- ARGV[ 1] == internalLockLeaseTime -->  30000
  3. -- ARGV[ 2] == getLockName(threadId) -->  559cc9df-bad8 -4f6c -86a4-ffa51b7f1c36: 1
  4. -- 第一段代码是判断锁定的资源KEY不存在的时候进行相应值的设置,代表资源没有被锁定,首次获取锁成功
  5. if (redis.call( 'exists', KEYS[ 1]) ==  0) then
  6.     -- 这里是设置调用次数,可以理解为延长KEY过期时间的调用次数
  7.     redis.call( 'hset', KEYS[ 1], ARGV[ 2],  1);
  8.     -- 设置KEY的过期时间
  9.     redis.call( 'pexpire', KEYS[ 1], ARGV[ 1]);
  10.      return  nil;
  11. end;
  12. -- 第二段代码是判断HASH的field是否存在,如果存在说明是同一个线程重入的情况,这个时候需要延长KEY的TTL,并且HASH的field对应的value加 1,记录延长ttl的次数
  13. if (redis.call( 'hexists', KEYS[ 1], ARGV[ 2]) ==  1) then
  14.     -- 这里是增加调用次数,可以理解为增加延长KEY过期时间的调用次数
  15.     redis.call( 'hincrby', KEYS[ 1], ARGV[ 2],  1);
  16.     -- 延长KEY的过期时间
  17.     redis.call( 'pexpire', KEYS[ 1], ARGV[ 1]);
  18.      return  nil;
  19. end;
  20. -- 第三段代码是兜底的,走到这里说明当前线程获取锁失败,锁已经被其他(进程中的)线程占有,返回当前KEY被占用资源的ttl,用来确定需要休眠的最大时间
  21. return redis.call( 'pttl', KEYS[ 1]);

这里画一个图演示一下这个Lua脚本中三段代码出现的逻辑:

剩下一个scheduleExpirationRenewal(threadId)方法还没有分析,里面的逻辑就是看门狗的定期续期逻辑:


   
  1. // 基于线程ID定时调度和续期
  2. private void scheduleExpirationRenewal(long threadId) {
  3.      // 如果需要的话新建一个ExpirationEntry记录线程重入计数,同时把续期的任务Timeout对象保存在属性中
  4.     ExpirationEntry entry =  new ExpirationEntry();
  5.     ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
  6.      if (oldEntry != null) {
  7.          // 当前进行的当前线程重入加锁
  8.         oldEntry.addThreadId(threadId);
  9.     }  else {
  10.          // 当前进行的当前线程首次加锁
  11.         entry.addThreadId(threadId);
  12.          // 首次新建ExpirationEntry需要触发续期方法,记录续期的任务句柄
  13.         renewExpiration();
  14.     }
  15. }
  16. // 处理续期
  17. private void renewExpiration() {
  18.      // 根据entryName获取ExpirationEntry实例,如果为空,说明在cancelExpirationRenewal()方法已经被移除,一般是解锁的时候触发
  19.     ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  20.      if (ee == null) {
  21.          return;
  22.     }
  23.      // 新建一个定时任务,这个就是看门狗的实现,io.netty.util.Timeout是Netty结合时间轮使用的定时任务实例
  24.     Timeout task = commandExecutor.getConnectionManager().newTimeout( new TimerTask() {
  25.         @Override
  26.         public void run(Timeout timeout) throws Exception {
  27.              // 这里是重复外面的那个逻辑,
  28.             ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  29.              if (ent == null) {
  30.                  return;
  31.             }
  32.              // 获取ExpirationEntry中首个线程ID,如果为空说明调用过cancelExpirationRenewal()方法清空持有的线程重入计数,一般是锁已经释放的场景
  33.             Long threadId = ent.getFirstThreadId();
  34.              if (threadId == null) {
  35.                  return;
  36.             }
  37.              // 向Redis异步发送续期的命令
  38.             RFuture<Boolean> future = renewExpirationAsync(threadId);
  39.             future.onComplete((res, e) -> {
  40.                  // 抛出异常,续期失败,只打印日志和直接终止任务
  41.                  if (e != null) {
  42.                     log.error( "Can't update lock " + getName() +  " expiration", e);
  43.                      return;
  44.                 }
  45.                  // 返回true证明续期成功,则递归调用续期方法(重新调度自己),续期失败说明对应的锁已经不存在,直接返回,不再递归
  46.                  if (res) {
  47.                      // reschedule itself
  48.                     renewExpiration();
  49.                 }
  50.             });
  51.         }
  52.     }, 
  53.      // 这里的执行频率为leaseTime转换为ms单位下的三分之一,由于leaseTime初始值为-1的情况下才会进入续期逻辑,那么这里的执行频率为lockWatchdogTimeout的三分之一
  54.     internalLockLeaseTime /  3, TimeUnit.MILLISECONDS); 
  55.     
  56.      // ExpirationEntry实例持有调度任务实例
  57.     ee.setTimeout(task);
  58. }
  59. // 调用Redis,执行Lua脚本,进行异步续期
  60. protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  61.      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  62.              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  63.                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  64.                  "return 1; " +
  65.              "end; " +
  66.              "return 0;",
  67.         Collections.<Object>singletonList(getName()), 
  68.          //  这里根据前面的分析,internalLockLeaseTime在leaseTime的值为-1的前提下,对应值为lockWatchdogTimeout
  69.         internalLockLeaseTime, getLockName(threadId));  
  70. }

基于源码推断出续期的机制由入参leaseTime决定:

  • leaseTime == -1的前提下(一般是lock()lockInterruptibly()这类方法调用),续期任务的调度周期为lockWatchdogTimeout / 3,锁的最大持有时间(KEY的过期时间)被刷新为lockWatchdogTimeout

  • leaseTime != -1的前提下(一般是lock(long leaseTime, TimeUnit unit)lockInterruptibly(long leaseTime, TimeUnit unit)这类方法调用指定leaseTime不为-1),这种情况下会直接设置锁的过期时间为输入值转换为ms单位的时间量,不会启动续期机制

提取续期的Lua脚本如下:


   
  1. -- KEYS[ 1] == getName() --> $KEY --> resource:x
  2. -- ARGV[ 1] == internalLockLeaseTime -->  30000
  3. -- ARGV[ 2] == getLockName(threadId) -->  559cc9df-bad8 -4f6c -86a4-ffa51b7f1c36: 1
  4. if (redis.call( 'hexists', KEYS[ 1], ARGV[ 2]) ==  1) then
  5.     redis.call( 'pexpire', KEYS[ 1], ARGV[ 1]);
  6.      return  1;
  7. end;
  8. return  0;

到此为止,不带waitTime参数的加锁和续期逻辑基本分析完毕,而带waitTime参数的tryLock(long waitTime, long leaseTime, TimeUnit unit)实现其实和只存在leaseTime参数的lock(long leaseTime, TimeUnit unit, boolean interruptibly)实现底层调用的方法是一致的,最大的区别是会在尝试获取锁操作之后基于前后的System.currentTimeMillis()计算出时间差和waitTime做对比,决定需要阻塞等待还是直接超时获取锁失败返回,处理阻塞等待的逻辑是客户端本身的逻辑,这里就不做详细展开,因为源码实现也不是十分优雅(太多long currentTime = System.currentTimeMillis()的代码段了)。接着花点功夫分析一下解锁的实现,包括一般情况下的解锁unlock()和强制解锁forceUnlockAsync()


   
  1. //  一般情况下的解锁
  2. @Override
  3. public void unlock() {
  4.     try {
  5.         get(unlockAsync(Thread.currentThread().getId()));
  6.     } catch (RedisException e) {
  7.          // IllegalMonitorStateException一般是A线程加锁,B线程解锁,内部判断线程状态不一致抛出的
  8.          if (e.getCause() instanceof IllegalMonitorStateException) {
  9.             throw (IllegalMonitorStateException) e.getCause();
  10.         }  else {
  11.             throw e;
  12.         }
  13.     }
  14. }
  15. @Override
  16. public RFuture<Void> unlockAsync() {
  17.      // 获取当前调用解锁操作的线程ID
  18.     long threadId = Thread.currentThread().getId();
  19.      return unlockAsync(threadId);
  20. }
  21. @Override
  22. public RFuture<Void> unlockAsync(long threadId) {
  23.      // 构建一个结果RedissonPromise
  24.     RPromise<Void> result =  new RedissonPromise<Void>();
  25.      // 返回的RFuture如果持有的结果为true,说明解锁成功,返回NULL说明线程ID异常,加锁和解锁的客户端线程不是同一个线程
  26.     RFuture<Boolean> future = unlockInnerAsync(threadId);
  27.     future.onComplete((opStatus, e) -> {
  28.          // 这是内部的异常,说明解锁异常,需要取消看门狗的续期任务
  29.          if (e != null) {
  30.             cancelExpirationRenewal(threadId);
  31.             result.tryFailure(e);
  32.              return;
  33.         }
  34.          // 这种情况说明线程ID异常,加锁和解锁的客户端线程不是同一个线程,抛出IllegalMonitorStateException异常
  35.          if (opStatus == null) {
  36.             IllegalMonitorStateException cause =  new IllegalMonitorStateException( "attempt to unlock lock, not locked by current thread by node id: "
  37.                     + id +  " thread-id: " + threadId);
  38.             result.tryFailure(cause);
  39.              return;
  40.         }
  41.          // 走到这里说明正常解锁,取消看门狗的续期任务
  42.         cancelExpirationRenewal(threadId);
  43.         result.trySuccess(null);
  44.     });
  45.      return result;
  46. }
  47. // 真正的内部解锁的方法,执行解锁的Lua脚本
  48. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  49.      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  50.              "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  51.                  "return nil;" +
  52.              "end; " +
  53.              "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  54.              "if (counter > 0) then " +
  55.                  "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  56.                  "return 0; " +
  57.              "else " +
  58.                  "redis.call('del', KEYS[1]); " +
  59.                  "redis.call('publish', KEYS[2], ARGV[1]); " +
  60.                  "return 1; "+
  61.              "end; " +
  62.              "return nil;",
  63.             Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
  64. }
  65. // 取消续期任务
  66. void cancelExpirationRenewal(Long threadId) {
  67.      // 这里说明ExpirationEntry已经被移除,一般是基于同一个线程ID多次调用解锁方法导致的(并发解锁)
  68.     ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  69.      if (task == null) {
  70.          return;
  71.     }
  72.      // 传入的线程ID不为NULL,从ExpirationEntry中移除线程ID,如果持有的线程ID对应的线程重入计数不为0,会先递减到0,等于0的前提下才会进行删除
  73.      if (threadId != null) {
  74.         task.removeThreadId(threadId);
  75.     }
  76.      // 这里threadId == null的情况是为了满足强制解锁的场景,强制解锁需要直接删除锁所在的KEY,不需要理会传入的线程ID(传入的线程ID直接为NULL)
  77.      // 后者task.hasNoThreads()是为了说明当前的锁没有被任何线程持有,对于单线程也确定在移除线程ID之后重入计数器已经为0,从ExpirationEntry中移除,这个时候获取ExpirationEntry的任务实例进行取消即可
  78.      if (threadId == null || task.hasNoThreads()) {
  79.         Timeout timeout = task.getTimeout();
  80.          if (timeout != null) {
  81.             timeout.cancel();
  82.         }
  83.          // EntryName -> ExpirationEntry映射中移除当前锁的相关实例ExpirationEntry
  84.         EXPIRATION_RENEWAL_MAP.remove(getEntryName());
  85.     }
  86. }
  87. // 强制解锁
  88. @Override
  89. public boolean forceUnlock() {
  90.      return get(forceUnlockAsync());
  91. }
  92. @Override
  93. public RFuture<Boolean> forceUnlockAsync() {
  94.      // 线程ID传入为NULL,取消当前的EntryName对应的续期任务
  95.     cancelExpirationRenewal(null);
  96.      // 执行Lua脚本强制删除锁所在的KEY并且发布解锁消息
  97.      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  98.              "if (redis.call('del', KEYS[1]) == 1) then "
  99.             +  "redis.call('publish', KEYS[2], ARGV[1]); "
  100.             +  "return 1 "
  101.             +  "else "
  102.             +  "return 0 "
  103.             +  "end",
  104.             Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
  105. }

这里列出一般情况下解锁和强制解锁的Lua脚本,分析如下:


   
  1. -- unlockInnerAsync方法的lua脚本
  2. -- KEYS[ 1] == getName() --> $KEY --> resource:x
  3. -- KEYS[ 2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x}
  4. -- ARGV[ 1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值 0
  5. -- ARGV[ 2] == internalLockLeaseTime -->  30000或者具体的锁最大持有时间
  6. -- ARGV[ 3] == getLockName(threadId) -->  559cc9df-bad8 -4f6c -86a4-ffa51b7f1c36: 1
  7. -- 第一个IF分支判断如果锁所在的哈希的field不存在,说明当前线程ID未曾获取过对应的锁,返回NULL表示解锁失败
  8. if (redis.call( 'hexists', KEYS[ 1], ARGV[ 3]) ==  0) then
  9.      return  nil;
  10. end;
  11. -- 走到这里通过hincrby进行线程重入计数 -1,返回计数值
  12. local counter = redis.call( 'hincrby', KEYS[ 1], ARGV[ 3],  -1);
  13. -- 计数值大于 0,说明线程重入加锁,这个时候基于internalLockLeaseTime对锁所在KEY进行续期
  14. if (counter >  0) then
  15.     redis.call( 'pexpire', KEYS[ 1], ARGV[ 2]);
  16.      return  0;
  17. else
  18.     -- 计数值小于或等于 0,说明可以解锁,删除锁所在的KEY,并且向redisson_lock__channel:{$KEY}发布消息,内容是 0(常量数值)
  19.     redis.call( 'del', KEYS[ 1]);
  20.     redis.call( 'publish', KEYS[ 2], ARGV[ 1]);
  21.      return  1;
  22. end;
  23. -- 最后的 return  nil;在IDEA中提示是不会到达的语句,估计这里是开发者笔误写上去的,前面的 if- else都有返回语句,这里应该是不可达的
  24. return  nil;
  25. -------------------------------------------------- 不怎么华丽的分割线 -------------------------------------------------
  26. -- forceUnlockAsync方法的lua脚本
  27. -- KEYS[ 1] == getName() --> $KEY --> resource:x
  28. -- KEYS[ 2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x}
  29. -- ARGV[ 1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值 0
  30. -- 强制删除锁所在的KEY,如果删除成功向redisson_lock__channel:{$KEY}发布消息,内容是 0(常量数值)
  31. if (redis.call( 'del', KEYS[ 1]) ==  1) then
  32.     redis.call( 'publish', KEYS[ 2], ARGV[ 1]);
  33.      return  1
  34. else
  35.      return  0
  36. end

其他辅助方法都相对简单,这里弄个简单的"流水账"记录一番:

  • isLocked():基于getName()调用RedisEXISTS $KEY命令判断是否加锁

  • isHeldByThread(long threadId)isHeldByCurrentThread():基于getName()getLockName(threadId)调用RedisHEXISTS $KEY $LOCK_NAME命令判断HASH中对应的field-value是否存在,存在则说明锁被对应线程ID的线程持有

  • getHoldCount():基于getName()getLockName(threadId)调用RedisHGET $KEY $LOCK_NAME命令,用于获取线程对于某一个锁的持有量(注释叫holds,其实就是同一个线程对某一个锁的KEY的续期次数)

「订阅和发布」部分设计到大量Netty组件使用相关的源码,这里不详细展开,这部分的逻辑简单附加到后面这个流程图中。最后,通过一个比较详细的图分析一下Redisson的加锁和解锁流程。

  • 不带waitTime参数的加锁流程:

  • 带有waitTime参数的加锁流程(图右边的流程基本不变,主要是左边的流程每一步都要计算时间间隔):

  • 解锁流程:

假设不同进程的两个不同的线程XY去竞争资源RESOURCE的锁,那么可能的流程如下:

最后再概括一下Redisson中实现red lock算法使用的HASH数据类型:

  • KEY代表的就是资源或者锁,「创建、存在性判断,延长生存周期和删除操作总是针对KEY进行的」

  • FIELD代表的是锁名称lockName(),但是其实它由Redisson连接管理器实例的初始化UUID拼接客户端线程ID组成,严格来说应该是获取锁的客户端线程唯一标识

  • VALUE代表的是客户端线程对于锁的持有量,从源码上看应该是KEY被续期的次数

基于Jedis实现类似Redisson的分布式锁功能

前面的章节已经比较详细分析了Redisson中分布式锁的实现原理,这里使用Jedis和多线程技巧做一个类似的实现。为了简单起见,这里只实现一个无入参的lock()方法(类似于RedissonleaseTime == -1的场景)和unlock()方法。定义接口RedLock


   
  1. public  interface RedLock {
  2.     void lock(String resource) throws InterruptedException;
  3.     void unlock(String resource);
  4. }

为了简单起见,笔者把所有实现逻辑都写在实现类RedisRedLock中:


   
  1. @RequiredArgsConstructor
  2. public class RedisRedLock implements RedLock {
  3.     private final JedisPool jedisPool;
  4.     private final String uuid;
  5.     private static final String WATCH_DOG_TIMEOUT_STRING =  "30000";
  6.     private static final long WATCH_DOG_TASK_DURATION =  10000L;
  7.     private static final String CHANNEL_PREFIX =  "__red__lock:";
  8.     private static final String UNLOCK_STATUS_STRING =  "0";
  9.     private static final String LOCK_LUA =  "if (redis.call('exists', KEYS[1]) == 0) then\n" +
  10.              "    redis.call('hset', KEYS[1], ARGV[2], 1);\n" +
  11.              "    redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
  12.              "    return nil;\n" +
  13.              "end;\n" +
  14.              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then\n" +
  15.              "    redis.call('hincrby', KEYS[1], ARGV[2], 1);\n" +
  16.              "    redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
  17.              "    return nil;\n" +
  18.              "end;\n" +
  19.              "return redis.call('pttl', KEYS[1]);";
  20.     private static final String UNLOCK_LUA =  "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then\n" +
  21.              "    return nil;\n" +
  22.              "end;\n" +
  23.              "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);\n" +
  24.              "if (counter > 0) then\n" +
  25.              "    redis.call('pexpire', KEYS[1], ARGV[2]);\n" +
  26.              "    return 0;\n" +
  27.              "else\n" +
  28.              "    redis.call('del', KEYS[1]);\n" +
  29.              "    redis.call('publish', KEYS[2], ARGV[1]);\n" +
  30.              "    return 1;\n" +
  31.              "end;";
  32.     private static final String RENEW_LUA =  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  33.              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  34.              "return 1; " +
  35.              "end; " +
  36.              "return 0;";
  37.     private static final ExecutorService SUB_PUB_POOL = Executors.newCachedThreadPool();
  38.     private static final ScheduledExecutorService WATCH_DOG_POOL =  new ScheduledThreadPoolExecutor(
  39.             Runtime.getRuntime().availableProcessors() *  2
  40.     );
  41.     private static class ThreadEntry {
  42.         private final ConcurrentMap<Long, Integer> threadCounter = Maps.newConcurrentMap();
  43.         private volatile WatchDogTask watchDogTask;
  44.         public synchronized void addThreadId(long threadId) {
  45.             Integer counter = threadCounter.get(threadId);
  46.              if (counter == null) {
  47.                 counter =  1;
  48.             }  else {
  49.                 counter++;
  50.             }
  51.             threadCounter.put(threadId, counter);
  52.         }
  53.         public synchronized boolean hasNoThreads() {
  54.              return threadCounter.isEmpty();
  55.         }
  56.         public synchronized Long getFirstThreadId() {
  57.              if (threadCounter.isEmpty()) {
  58.                  return null;
  59.             }
  60.              return threadCounter.keySet().iterator().next();
  61.         }
  62.         public synchronized void removeThreadId(long threadId) {
  63.             Integer counter = threadCounter.get(threadId);
  64.              if (counter == null) {
  65.                  return;
  66.             }
  67.             counter--;
  68.              if (counter ==  0) {
  69.                 threadCounter.remove(threadId);
  70.             }  else {
  71.                 threadCounter.put(threadId, counter);
  72.             }
  73.         }
  74.         public void setWatchDogTask(WatchDogTask watchDogTask) {
  75.             this.watchDogTask = watchDogTask;
  76.         }
  77.         public WatchDogTask getWatchDogTask() {
  78.              return watchDogTask;
  79.         }
  80.     }
  81.     @Getter
  82.     private static class SubPubEntry {
  83.         private final String key;
  84.         private final Semaphore latch;
  85.         private final SubscribeListener subscribeListener;
  86.         public SubPubEntry(String key) {
  87.             this.key = key;
  88.             this.latch =  new Semaphore( 0);
  89.             this.subscribeListener =  new SubscribeListener(key, latch);
  90.         }
  91.     }
  92.     private static final ConcurrentMap<String, ThreadEntry> THREAD_ENTRY_MAP = Maps.newConcurrentMap();
  93.     @Override
  94.     public void lock(String resource) throws InterruptedException {
  95.         long threadId = Thread.currentThread().getId();
  96.         String lockName = uuid +  ":" + threadId;
  97.         String entryName = uuid +  ":" + resource;
  98.          // 获取锁
  99.         Long ttl = acquire(resource, lockName, threadId, entryName);
  100.          // 加锁成功直接返回
  101.          if (Objects.isNull(ttl)) {
  102.              return;
  103.         }
  104.          // 订阅
  105.         SubPubEntry subPubEntry = subscribeAsync(resource);
  106.         try {
  107.              for (; ; ) {
  108.                 ttl = acquire(resource, lockName, threadId, entryName);
  109.                  // 加锁成功直接返回
  110.                  if (Objects.isNull(ttl)) {
  111.                      return;
  112.                 }
  113.                  if (ttl >  0L) {
  114.                     subPubEntry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  115.                 }
  116.             }
  117.         } finally {
  118.             unsubscribeSync(subPubEntry);
  119.         }
  120.     }
  121.     private Long acquire(String key, String lockName, long threadId, String entryName) {
  122.         Object result = execute0(jedis -> jedis.eval(LOCK_LUA, Lists.newArrayList(key),
  123.                 Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
  124.          if (Objects.nonNull(result)) {
  125.              return Long.parseLong(String.valueOf(result));
  126.         }
  127.          // 启动看门狗
  128.         ThreadEntry entry =  new ThreadEntry();
  129.         ThreadEntry oldEntry = THREAD_ENTRY_MAP.putIfAbsent(entryName, entry);
  130.          if (oldEntry != null) {
  131.             oldEntry.addThreadId(threadId);
  132.         }  else {
  133.             entry.addThreadId(threadId);
  134.             Runnable renewAction = () -> executeWithoutResult(jedis -> jedis.eval(RENEW_LUA, Lists.newArrayList(key),
  135.                     Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
  136.             WatchDogTask watchDogTask =  new WatchDogTask( new AtomicReference<>(renewAction));
  137.             entry.setWatchDogTask(watchDogTask);
  138.             WATCH_DOG_POOL.scheduleWithFixedDelay(watchDogTask,  0, WATCH_DOG_TASK_DURATION, TimeUnit.MILLISECONDS);
  139.         }
  140.          return null;
  141.     }
  142.     private SubPubEntry subscribeAsync(String key) {
  143.         SubPubEntry subPubEntry =  new SubPubEntry(key);
  144.         SUB_PUB_POOL.submit(() -> {
  145.             SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
  146.             executeWithoutResult(jedis -> jedis.subscribe(subscribeListener, subscribeListener.getChannelName()));
  147.              return null;
  148.         });
  149.          return subPubEntry;
  150.     }
  151.     private void unsubscribeSync(SubPubEntry subPubEntry) {
  152.         SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
  153.         subscribeListener.unsubscribe(subscribeListener.getChannelName());
  154.     }
  155.     @Override
  156.     public void unlock(String resource) {
  157.         long threadId = Thread.currentThread().getId();
  158.         String entryName = uuid +  ":" + resource;
  159.         String lockName = uuid +  ":" + threadId;
  160.         String channelName = CHANNEL_PREFIX + resource;
  161.         Object result = execute0(jedis -> jedis.eval(UNLOCK_LUA, Lists.newArrayList(resource, channelName),
  162.                 Lists.newArrayList(UNLOCK_STATUS_STRING, WATCH_DOG_TIMEOUT_STRING, lockName)));
  163.         ThreadEntry threadEntry = THREAD_ENTRY_MAP.get(entryName);
  164.          if (Objects.nonNull(threadEntry)) {
  165.             threadEntry.removeThreadId(threadId);
  166.              if (threadEntry.hasNoThreads() && Objects.nonNull(threadEntry.getWatchDogTask())) {
  167.                 threadEntry.getWatchDogTask().cancel();
  168.             }
  169.         }
  170.          if (Objects.isNull(result)) {
  171.             throw  new IllegalMonitorStateException();
  172.         }
  173.     }
  174.     private static class SubscribeListener extends JedisPubSub {
  175.         @Getter
  176.         private final String key;
  177.         @Getter
  178.         private final String channelName;
  179.         @Getter
  180.         private final Semaphore latch;
  181.         public SubscribeListener(String key, Semaphore latch) {
  182.             this.key = key;
  183.             this.channelName = CHANNEL_PREFIX + key;
  184.             this.latch = latch;
  185.         }
  186.         @Override
  187.         public void onMessage(String channel, String message) {
  188.              if (Objects.equals(channelName, channel) && Objects.equals(UNLOCK_STATUS_STRING, message)) {
  189.                 latch.release();
  190.             }
  191.         }
  192.     }
  193.     @RequiredArgsConstructor
  194.     private static class WatchDogTask implements Runnable {
  195.         private final AtomicBoolean running =  new AtomicBoolean( true);
  196.         private final AtomicReference<Runnable> actionReference;
  197.         @Override
  198.         public void run() {
  199.              if (running.get() && Objects.nonNull(actionReference.get())) {
  200.                 actionReference.get().run();
  201.             }  else {
  202.                 throw  new WatchDogTaskStopException( "watch dog cancel");
  203.             }
  204.         }
  205.         public void cancel() {
  206.             actionReference.set(null);
  207.             running.set( false);
  208.         }
  209.     }
  210.     private <T> T execute0(Function<Jedis, T> function) {
  211.         try (Jedis jedis = jedisPool.getResource()) {
  212.              return function.apply(jedis);
  213.         }
  214.     }
  215.      interface Action {
  216.         void apply(Jedis jedis);
  217.     }
  218.     private void executeWithoutResult(Action action) {
  219.         try (Jedis jedis = jedisPool.getResource()) {
  220.             action.apply(jedis);
  221.         }
  222.     }
  223.     private static class WatchDogTaskStopException extends RuntimeException {
  224.         @Override
  225.         public synchronized Throwable fillInStackTrace() {
  226.              return this;
  227.         }
  228.     }
  229.     public static void main(String[] args) throws Exception {
  230.         String resourceName =  "resource:x";
  231.         RedLock redLock =  new RedisRedLock( new JedisPool( new GenericObjectPoolConfig()), UUID.randomUUID().toString());
  232.         Thread threadA =  new Thread(() -> {
  233.             try {
  234.                 redLock.lock(resourceName);
  235.                 process(resourceName);
  236.             } catch (InterruptedException e) {
  237.                 e.printStackTrace();
  238.             } finally {
  239.                 redLock.unlock(resourceName);
  240.                 System.out. println(String.format( "线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
  241.             }
  242.         },  "threadA");
  243.         Thread threadB =  new Thread(() -> {
  244.             try {
  245.                 redLock.lock(resourceName);
  246.                 process(resourceName);
  247.             } catch (InterruptedException e) {
  248.                 e.printStackTrace();
  249.             } finally {
  250.                 redLock.unlock(resourceName);
  251.                 System.out. println(String.format( "线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
  252.             }
  253.         },  "threadB");
  254.         threadA.start();
  255.         threadB.start();
  256.         Thread.sleep(Long.MAX_VALUE);
  257.     }
  258.     private static void process(String resourceName) {
  259.         String threadName = Thread.currentThread().getName();
  260.         System.out. println(String.format( "线程%s获取到资源%s的锁", threadName, resourceName));
  261.         try {
  262.             Thread.sleep( 1000);
  263.         } catch (InterruptedException ignore) {
  264.         }
  265.     }
  266. }

上面的实现短时间内编写完,没有做详细的DEBUG,可能会有纰漏。某次执行结果如下:


   
  1. 线程threadB获取到资源resource:x的锁
  2. 线程threadB释放资源resource:x的锁
  3. 线程threadA获取到资源resource:x的锁
  4. 线程threadA释放资源resource:x的锁

小结

Redisson中的red lock实现,应用到下面的核心技术:

  • 合理应用Redis的基本数据类型HASH

  • Redis的订阅发布

  • Lua脚本的原子性

  • Netty中的Promise实现

  • Netty中的时间轮HashedWheelTimer和对应的定时任务(HashedWheel)Timeout

  • Semaphore进行带期限、永久或者可中断的阻塞以及唤醒,替代CountDownLatch中的无等待期限阻塞

上面的核心技术相对合理地应用,才能实现一个高效而且容错能力相对比较高的分布式锁方案,但是从目前来看,Redisson仍未解决red lock算法中的故障转移缺陷,笔者认为这个有可能是Redis实现分布式锁方案的一个底层缺陷,「此方案在Redis单实例中是相对完善」,一旦应用在Redis集群(普通主从、哨兵或者Cluster),有几率会出现前文提到的节点角色切换导致多个不同客户端获取到同一个资源对应的锁的问题。暂时无解。

参考资料:

  • Redisson开源版本源码

  • Redis官方文档

画图用的是ProcessOnhttps://www.processon.com/view/link/5ffc540de0b34d2060d2d715

(c-2-w e-a-20210110 2021年的第一篇文章,希望这一年不要这么鸽,这个系列的下一篇是《冷饭新炒:理解JDK中UUID的底层实现》)


转载:https://blog.csdn.net/zjcsuct/article/details/112504950
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场