飞道的博客

RedisSon分布式锁 源码解析,在 java 中使用 redis + lua 做秒杀

223人阅读  评论(0)

1. RedisSon 分布式锁

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.17.0</version>
</dependency>
spring:
  profiles:
    active: dev
  redis:
    cluster:
      nodes: 192.168.0.150:6379,192.168.0.151:6379,192.168.0.152:6379,192.168.0.153:6379,192.168.0.154:6379,192.168.0.155:6379
@Autowired
private RedisProperties redisProperties;

@Bean
public Redisson redisson(){
   
    Config config = new Config();
    List<String> nodes = redisProperties.getCluster().getNodes();
    List<String> nodeList = new ArrayList<>();
    for (String node : nodes) {
   
        nodeList.add("redis://"+node);
    }
    config.useClusterServers().setNodeAddresses(nodeList);
    return (Redisson) Redisson.create(config);
}

1.1 加锁

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
   
   //获取线程id
    long threadId = Thread.currentThread().getId();
    //获取锁,有就加锁,有自己的锁就重入,value值+1
    //这个方法在下边文章 1.1.1中会有解析
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // 如果返回null 表示加锁成功,锁续命成功
    if (ttl == null) {
   
        return;
    }
	//订阅线程id
    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    RedissonLockEntry entry;
    if (interruptibly) {
   
        entry = commandExecutor.getInterrupted(future);
    } else {
   
        entry = commandExecutor.get(future);
    }

    try {
   
       //不断循环来获取锁
        while (true) {
   
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
   
                break;
            }
            if (ttl >= 0) {
   
                try {
   
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
   
                    if (interruptibly) {
   
                        throw e;
                    }
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
   
                if (interruptibly) {
   
                    entry.getLatch().acquire();
                } else {
   
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
   
        unsubscribe(entry, threadId);
    }
}

 

1.1.1 尝试获取锁

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
   
    RFuture<Long> ttlRemainingFuture;
    //锁过期时间,如果是-1 采用看门狗的 30*1000 的时间,在 1.1.1.1 中有截图
    if (leaseTime != -1) {
   
        //尝试获取锁方法,在1.1.1.2部分中解析
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
   
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
   
        // lock acquired
        if (ttlRemaining == null) {
   
            if (leaseTime != -1) {
   
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
   
                //增加调度任务,来给锁续命
                //在1.1.1.3 部分解析
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

 

1.1.1.1 默认 看门狗时间 30*1000


1.1.1.2 尝试获取锁

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
   
     return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
             //进行判断是否存在这个key,如果不存在返回0
             "if (redis.call('exists', KEYS[1]) == 0) then " +
             //设置类型为hashmap类型,查询的key为第一个key(KEYS[1]),hashmap的key是线程id(ARGV[2]),值为1
             "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
             //设置过期时间
             "redis.call('pexpire', KEYS[1], ARGV[1]); " +
             "return nil; " +
             "end; " +
             //通过key还有线程id查询如果有 返回 1
             "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
             //在原基础上在加1,处理重入锁问题
             "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
             //设置过期时间
             "redis.call('pexpire', KEYS[1], ARGV[1]); " +
             "return nil; " +
             "end; " +
             //返回获取的信息,如果是  -1表示没有设置过期时间,-2表示没有这个key,大于0表示剩余的过期时间(单位毫秒)
             "return redis.call('pttl', KEYS[1]);",
             //第一个key                              //第一个参数                   //第二个参数     
             Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
 }

 

1.1.1.3 锁续命


protected RFuture<Boolean> renewExpirationAsync(long threadId) {
   
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //判断这个key 和 线程id存不存在,存在返回 1
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            //重新设置过期时间
             "redis.call('pexpire', KEYS[1], ARGV[1]); " +
             "return 1; " +
             "end; " +
             "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

1.2 解锁


protected RFuture<Boolean> unlockInnerAsync(long threadId) {
   
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //判断有没有这个key 和 线程id的hashmap ,如果是0 表示没有
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
            "end; " +
            //hashmap的这个key和线程id的值 减1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            //如果还大于0,说明重入锁,没有完全解锁
            "if (counter > 0) then " +
            //设置过期时间
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
            "else " +
            //删除这个key
            "redis.call('del', KEYS[1]); " +
            //发送给其他争抢锁的线程,告诉他们 可以继续争抢锁了
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
            "end; " +
            "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

 

2. java项目中 实现redis+lua

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.3.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.3.7.RELEASE</version>
</dependency>

2.1 请注意 在lua中存在两个重要的形参 KEYS 、ARGV

类型 描述
nil 无效值,类似于java的null,在条件表达式中表示false
boolean false或true
number 双精度浮点型
string 字符串,有单引号或双引号括起来
function 函数,类似于linux的函数一样
table 1.数组类型 2.json类型
local name = KEYS[1] -- 第一个key
local value = ARGV[1] -- 第一个参数
if tonumber(redis.call('EXISTS',name)) > 0 then -- 是否存在这个key
else
redis.call('SET',name,value);  -- 不存在 就在创建一个
end;
return redis.call('GET',name);  -- 返回这个key的value

用于java中表示为

 @Autowired
 private StringRedisTemplate stringRedisTemplate;

 @GetMapping("/saveRedis")
 public String saveRedis(){
   
     StringBuilder builder = new StringBuilder();
     builder.append(" local name = KEYS[1] ");
     builder.append(" local value = ARGV[1] ");
     builder.append(" if tonumber(redis.call('EXISTS',name)) > 0 then ");
     builder.append(" else   ");
     builder.append(" redis.call('SET',name,value);   ");
     builder.append(" end; ");
     builder.append(" return redis.call('GET',name); ");
     RedisScript<String> script = RedisScript.of(builder.toString(), String.class);
     String string = UUID.randomUUID().toString();
     String result = stringRedisTemplate.execute(script, Arrays.asList("b"), string);
     return result;
 }

 

执行接口,发现已经在redis生成了一个 key为 aaa的数据

2.2 模拟秒杀,扣减redis中的库存

不包含其他业务,仅用作讲述lua脚本如何编写

@GetMapping("/seckill")
public String seckill(String shopName,String userName){
   
    StringBuilder builder = new StringBuilder();
    builder.append(" local shopName = KEYS[1] ");//商品名称
    builder.append(" local shopUser = shopName .. 'User' ");//参与秒杀的用户 集合key
    builder.append(" local userName = ARGV[1] ");//参与秒杀的用户
    //判断如果没有商品秒杀的数量,就初始化一个数量为100的,这一步为了省事才这么做,生产环境不要这样写,切记、切记!
    builder.append(" if (not redis.call('GET',shopName)) then  ");
    builder.append("    redis.call('SET',shopName,100);  ");
    builder.append(" end;   ");
    //创建一个函数,用来秒杀商品数量减1,参数秒杀用户集合增加
    builder.append(" local function seckill(shopUser,userName)   ");
    builder.append("    redis.call('SADD',shopUser,userName);   ");
    builder.append("    redis.call('DECRBY',shopName,1);   ");
    builder.append(" end;   ");
    //判断如果 秒杀的用户集合没有就调用函数创建一下
    builder.append(" if tonumber(redis.call('EXISTS',shopUser)) == 0 then   ");
    builder.append("    if tonumber(redis.call('GET',shopName)) > 0 then ");
    builder.append("       seckill(shopUser,userName)   ");
    builder.append("       return 0 ");
    builder.append("    end;   ");
    builder.append(" end;   ");
    //当前用户已秒杀过,则返回1,提示不允许重复秒杀
    builder.append(" if tonumber(redis.call('SISMEMBER',shopUser,userName)) > 0 then ");
    builder.append("    return 1   ");
    builder.append(" end;   ");
    //如果剩余商品数量大于0,就扣减库存,存储秒杀用户
    builder.append(" if tonumber(redis.call('GET',shopName)) > 0 then ");
    builder.append("    seckill(shopUser,userName)   ");
    builder.append("    return 0 ");
    builder.append(" end; ");
    builder.append(" return redis.call('GET',shopName); ");
    RedisScript<Long> script = RedisScript.of(builder.toString(), Long.class);
    String string = UUID.randomUUID().toString();
    Long result = stringRedisTemplate.execute(script, Arrays.asList(shopName), userName);
    if(result == 1){
   
        return "请勿重复下单";
    }else {
   
        return "秒杀成功";
    }
}

 

2.3 测试脚本

2.3.1 调用秒杀接口
秒杀成功,商品数量减1,用户秒杀集合增加当前用户




2.3.2 重复秒杀 直接返回失败

秒杀失败后,库存不变


转载:https://blog.csdn.net/weixin_47752736/article/details/128374633
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场