飞道的博客

填坑之旅:手撕分布式zookeeper锁

363人阅读  评论(0)

作者:潘吉祥


欢迎读者跟着一同来填坑,不知道大家对于分布式锁有没有大概的了解,手撕之前,还是总体性地介绍一下。

分布式锁:用来解决分布式环境中共享数据的正确性。

首先我们来想一下分布式环境下遇到的一个问题:我们希望生成一个全局唯一id,该如何做呢?注意,我们强调的是分布式环境中(不是有雪花算法吗?呃,咱们是抱着学习的心态来的哈)。

传统的单机环境很容易:UUID加上时间戳就OK了。单是分布式环境是绝对不行的,我们来看图:

那么怎么解决这种问题呢,此时传统的锁是毫无用武之地的,那么就需要分布式锁的概念了,还是看图:

上图中所有机器同时指向zookeeper,他们要进行我们预期的id生成,步骤如下:

1.所有机器同时往zookeeper中创建同一个名字的临时节点,但zookeeper能够保证节点的唯一性,已经存在的节点,再去创建就会报错,因此只会有一个机器创建成功

2.创建节点失败的机器需要订阅这个节点变化的消息,并进入阻塞

3.创建节点成功的机器在生成id后,关闭和zookeeper的连接,zookeeper会自动删除它所创建的临时节点

4.当临时节点删除后会发送广播消息,此时订阅了节点变化消息的机器(即创建节点失败的机器)监听到消息,打断当前的阻塞,再次去尝试创建节点(回到第一步),在这批机器中又只会有一个创建成功,剩下的机器又重复上面的步骤。

因此整个流程下来最后能够保证一个时刻只能有一个机器执行生成id的逻辑,因此就能够生成全局的唯一id。

(这里说明一下关于zookeeper不能创同名节点的问题,zookeeper除了临时节点和持久节点之外,还有顺序节点的概念,即你在创建节点的时候可以选择临时的或持久的顺序节点,此时的name可以相同,zookeeper会自动添加递增的序号,本质上也是不同名字的,只是zookeeper帮你做了。。)

下面进入代码实战阶段:

我们定义一个zookeeper锁对象,里面定义lock和unlock方法,lock方法执行的就是创建锁的逻辑,unlock执行的就是关闭zookeeper连接的逻辑。


   
  1. public class ZkLock {
  2.     private ZkClient zkClient =  new ZkClient( "192.168.1.23:2181");
  3.      //锁节点
  4.     private String zkLock =  "/zk_lock";
  5.      //并发工具
  6.     private CountDownLatch countDownLatch = null;
  7.      //获取锁,即创建节点
  8.     public void lock(){
  9.         try {
  10.              //创建锁节点
  11.             zkClient.createEphemeral(zkLock);
  12.         }catch (Exception e){
  13.              //精确点的异常应该是ZkNodeExistsException
  14.              //所有创建失败的都会进入此逻辑
  15.              //创建监听
  16.             IZkDataListener listener =  new IZkDataListener() {
  17.                 @Override
  18.                 public void handleDataChange(String s, Object o) throws Exception { }
  19.                  //如果节点删除了就不再阻塞
  20.                 @Override
  21.                 public void handleDataDeleted(String s) throws Exception {
  22.                      if(countDownLatch != null){
  23.                         countDownLatch.countDown();
  24.                     }
  25.                 }
  26.             };
  27.              //订阅节点变化消息
  28.             zkClient.subscribeDataChanges(zkLock,listener);
  29.             countDownLatch =  new CountDownLatch( 1);
  30.             try {
  31.                  //在节点未被删除之前都处于阻塞状态
  32.                 countDownLatch.await();
  33.             } catch (InterruptedException interruptedException) {
  34.                 interruptedException.printStackTrace();
  35.             }
  36.              //当执行到此,说明监听已经使用,已经完成一轮操作,要清除监听
  37.             zkClient.unsubscribeDataChanges(zkLock,listener);
  38.              //上一次没抢到的节点此时已经释放,可以重新抢着创建,递归
  39.             lock();
  40.         }
  41.     }
  42.      //释放锁
  43.     public void unlock(){
  44.          if(zkClient != null){
  45.              //当执行了close,临时节点救护自动被删除
  46.             zkClient. close();
  47.             System.out. println( "释放锁");
  48.         }
  49.     }
  50. }

测试:


   
  1. public class Test{
  2.     public static void main(String[] args) {
  3.          //开启十个线程模拟十台不同的机器在生产id
  4.          for ( int i =  0; i <  10; i++) {
  5.              new Thread(() ->{
  6.                  //创建我们定义的分布式锁
  7.                 ZkLock zkLock =  new ZkLock();
  8.                  //加锁
  9.                 zkLock.lock();
  10.                 String id =  new SimpleDateFormat( "yyyy-MM-dd-HH-mm-ss").format( new Date());
  11.                 try {
  12.                     Thread.sleep( 1000);
  13.                 } catch (InterruptedException e) {
  14.                     e.printStackTrace();
  15.                 }
  16.                 System.out. println(id);
  17.                  //完成业务逻辑之后释放锁
  18.                 zkLock.unlock();
  19.             }).start();
  20.         }
  21.          //没有加zookeeper锁的
  22. //        for (int i = 0; i < 10; i++) {
  23. //            new Thread(() ->{
  24. //
  25. //                String id = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date());
  26. //                try {
  27. //                    Thread.sleep(1000);
  28. //                } catch (InterruptedException e) {
  29. //                    e.printStackTrace();
  30. //                }
  31. //                System.out.println(id);
  32. //
  33. //            }).start();
  34. //        }
  35.     }
  36. }

加锁的输出:


   
  1. 2020 -04 -07 -16 -28 -56
  2. 释放锁
  3. 2020 -04 -07 -16 -28 -57
  4. 释放锁
  5. ……
  6. 释放锁
  7. 2020 -04 -07 -16 -29 -04
  8. 释放锁
  9. 2020 -04 -07 -16 -29 -05
  10. 释放锁

未加锁:


   
  1. 2020 -04 -07 -17 -50 -02
  2. 2020 -04 -07 -17 -50 -02
  3. 2020 -04 -07 -17 -50 -02
  4. ……
  5. 2020 -04 -07 -17 -50 -02

这里用多个线程模拟了多个机器,注意我们没有使用任何Java锁,而达到了顺序执行的效果。

关于zookeeper分布式锁,我们重在理解分布式锁产生的背景和如何利用zookeeper达到分布式锁的四个步骤,怎么样,你也赶紧试试吧!

Ps:代码中用到了一个JUC工具CountDownLatch,使用它可以让一个线程等待其它指定的线程走完逻辑之后再执行自己的方法。

与它类似的还有CyclicBarrier工具,如需推文学习,留言即可哈,让你秒懂。

Git地址:https://github.com/rockit-ba/zookeeper-lock.git)

END

【推荐阅读

原创 | 全网最实在的docker入门教程一

原创 | 全网最实在的docker入门教程二

原创 | 全网最实在的docker入门教程三

原创 | 全网最实在的docker入门教程四

手撕zookeeper:作为注册中心负载均衡

老王,快给你的SpringBoot做个埋点监控吧!

没说过这句“口头禅”,不配做程序员!

贼好用的Java工具类库,GitHub星标10k+,你在用吗?


   
  1. 感谢阅读,请扫码关注
  2. 明天见

转载:https://blog.csdn.net/woniu211111/article/details/105480037
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场