Redis中Redisson红锁使用原理是什么
为什么使用Redis的红锁
主从结构分布式锁的问题
实现Redis分布式锁的最简单的方法就是在Redis中创建一个key,这个key有一个失效时间(TTL),以保证锁最终会被自动释放掉。当客户端释放资源(解锁)的时候,会删除掉这个key。
从表面上看似乎效果不错,但有一个严重的单点失败问题:如果Redis挂了怎么办?你可能会说,可以通过增加一个slave节点解决这个问题。但这通常是行不通的。Redis的主从同步通常是异步的,因此这么做不能实现资源的独享。
在这种场景(主从结构)中存在明显的竞态:
客户端A从master获取到锁
在master将锁同步到slave之前,master宕掉了。
slave节点被晋级为master节点
客户端B从新的master获取到锁
这个锁对应的资源之前已经被客户端A已经获取到了。安全失效!
程序有时候就是这么巧,比如说当一个节点挂掉时,多个客户端恰好同时获得了锁。只要你能容忍这种低概率的错误,那么采用这个基于复制的解决方案就毫无疑问了。否则的话,我们建议你实现下面描述的解决方案。
解决方案:使用红锁
简介
Redis中针对此种情况,引入了红锁的概念。在红锁系统中,获取或释放锁的成功标志是在超过一半的节点上操作成功。
原理
Assuming there are N Redis masters in the distributed environment of Redis.。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们之前已经阐述了如何在Redis的单实例下安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,我们假设有5个Redis master节点,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
获取当前Unix时间,以毫秒为单位。
依次尝试从N个实例,使用相同的key和随机值获取锁。
向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。
如果您的锁自动失效时间为10秒,那么超时时间应该设置在5-50毫秒之间。防止客户端在Redis服务器已宕机的情况下仍在无休止地等待响应结果。在规定时间内未收到服务器端响应时,客户端应尽快尝试连接其他Redis实例。
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)得到获取锁使用的时间。
成功获取锁的条件是必须从大多数 Redis 节点(3个节点)获取锁,并且使用时间不能超过锁的失效时间。
如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
Redisson红锁实例
官网
官方github:8. 分布式锁和同步器 · redisson/redisson Wik
基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 同时加锁:lock1 lock2 lock3 // 红锁在大部分节点上加锁成功就算成功。 lock.lock(); ... lock.unlock();登录后复制
大家都知道,如果负责储存某些分布式锁的某些Redis节点宕机以后,而且这些锁正好处于锁住的状态时,这些锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
Redisson也可以通过锁定并设置leaseTime参数的方法,来指定锁定的时间。超过这个时间后锁便自动解开了。
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开 lock.lock(10, TimeUnit.SECONDS); // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();登录后复制
Redisson红锁原理
RedissonRedLock extends RedissonMultiLock,所以实际上,redLock.tryLock实际调用:org.redisson.RedissonMultiLock.java#tryLock(),进而调用到其同类的tryLock(long waitTime, long leaseTime, TimeUnit unit) ,入参为:tryLock(-1, -1, null)
org.redisson.RedissonMultiLock.java#tryLock(long waitTime, long leaseTime, TimeUnit unit)源码如下:
final List<RLock> locks = new ArrayList<>(); /** * Creates instance with multiple {@link RLock} objects. * Each RLock object could be created by own Redisson instance. * * @param locks - array of locks */ public RedissonMultiLock(RLock... locks) { if (locks.length == 0) { throw new IllegalArgumentException("Lock objects are not defined"); } this.locks.addAll(Arrays.asList(locks)); } public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { newLeaseTime = unit.toMillis(waitTime)*2; } long time = System.currentTimeMillis(); long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } long lockWaitTime = calcLockWaitTime(remainTime); /** * 1. 允许加锁失败节点个数限制(N-(N/2+1)) */ int failedLocksLimit = failedLocksLimit(); /** * 2. 遍历所有节点通过EVAL命令执行lua加锁 */ List<RLock> acquiredLocks = new ArrayList<>(locks.size()); for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; /** * 3.对节点尝试加锁 */ try { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException e) { // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点 unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception e) { // 抛出异常表示获取锁失败 lockAcquired = false; } if (lockAcquired) { /** *4. 如果获取到锁则添加到已获取锁集合中 */ acquiredLocks.add(lock); } else { /** * 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1)) * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了 * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功 */ if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } if (failedLocksLimit == 0) { unlockInner(acquiredLocks); if (waitTime == -1 && leaseTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } } else { failedLocksLimit--; } } /** * 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false */ if (remainTime != -1) { remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } if (leaseTime != -1) { List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size()); for (RLock rLock : acquiredLocks) { RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } for (RFuture<Boolean> rFuture : futures) { rFuture.syncUninterruptibly(); } } /** * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true */ return true; }登录后复制【本文来源:韩国服务器 http://www.558idc.com/kt.html欢迎留下您的宝贵建议】