Redisson分布式锁入门使用及源码浅析

作为JAVA开发者,一提到锁,我们第一时间想到的应该是synchronized关键字、ReentrantLock和juc包下的各种并发工具类。随着业务的增长和微服务的流行,系统由单体应用拆分为独立的模块,每个模块服务都运行在独立的JVM中,此时JDK提供的锁就显得力不从心了,于是分布式锁这样一把利器就应运而生了。

分布式锁的特性

  • 互斥

    同一时刻只能有一个客户端持有

  • 可重入

    同一个客户端获取到锁后可以再次加锁

  • 避免死锁

    通过设置过期时间或其他方式保证即使客户端崩溃也不会影响锁的释放

  • 加锁解锁同一人

    每个客户端只能释放自己加的锁

  • 容错

    允许若干个锁节点发生故障仍能正常加锁和解锁

分布式锁实现方式

  • 数据库

    select xxx for update

  • zookeeper

    临时顺序节点

  • redis

    setnx、lua脚本

使用Redis作为分布式锁

伪代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 生成分布式锁的key,使用业务id代替
lockKey = "lock:" + bizId;

// 使用setnx命令进行加锁,setnx即set if not exist,不存在则设置
result = redis.setnx(lockKey, value);

if (result == 1) {
// 加锁成功,设置过期时间
redis.expire(key, 10);

// 执行业务操作
doSomething();

// 释放锁
redis.del(lockKey);
}

存在的问题

  1. 加锁和设置过期时间不是原子操作,如果加锁成功但设置过期时间失败,则锁会一直存在
  2. 锁的过期时间是固定的,如果设置过小,业务耗时较久,锁会提前释放,被其他线程获取,造成业务混乱和异常;如果设置过大,业务操作发生异常,造成锁长时间无法释放且无法获取
  3. 可能存在误释放其他线程的锁,线程1加锁并设置过期时间成功,但是由于业务耗时较久,锁已经过期,所以线程2能够获取到锁并执行业务操作,此时线程1执行完业务后释放了本该属于线程2的锁,然后线程3又获取到了锁……很明显,这种情况页会造成业务的混乱和异常

如何解决

  1. Redis的set命令提供了nx和ex、px参数,相当于将setnx + expire合并为一个操作

    1
    2
    3
    // ex 过期时间单位为秒
    // px 过期时间单位为毫秒
    set key value nx ex|px time
  2. 通过类似TimerTask之类的定时工具类来对锁进行自动续期(Redisson中的看门狗机制),每隔一段时间进行检查,如果锁存在,重新设置过期时间保证业务能够正常完成

  3. 线程加锁时将线程id作为value存入Redis,解锁时判断锁中的value是否和当前线程id一致,一致则删除锁

    伪代码如下:

    1
    2
    3
    4
    5
    6
    7
    threadId = Threads.currentThread().getId();

    ......

    if (threadId == redis.get(lockKey)) {
    redis.del(lockKey);
    }

    这里仍然存在几个问题:

    • 上述删除锁一共三个操作,获取锁的value、判断value和线程id是否相等、删除锁,如果线程1在执行完前两步操作后锁刚好过期,此时线程2成功获取到锁,然后执行业务操作,接着线程1执行删除操作,就会释放掉线程2的锁。因此需要上述操作为原子性操作,可以使用lua脚本将三个操作合在一起执行
    • 在不同的JVM中,线程id是会重复的,因此不能只使用线程id作为锁的value,可以使用UUID或分布式ID(如使用雪花算法生成的id)

到这里,我们设计的分布式锁才算勉强满足我们的需求。

Redis部署方式带来的问题

  1. 上述的分布式锁在Redis单节点环境下是能够正常使用的,但是单节点部署注定无法提供高可用性
  2. Redis除了单节点部署外,还支持主从(哨兵)、分片集群两种部署方式,而这两种方式都是需要进行主从同步的,如果某个master节点在客户端获取锁后下线,而slave节点又未及时同步锁数据,经过选举后slave节点成为新的master节点,但是客户端锁数据已经丢失,其他客户端可以正常获取锁,这样会造成程序和业务上的混乱
  3. 为了解决上述问题,Redis作者提出了一种算法来保证即使在主从(哨兵)、分片集群环境下也能保证分布式锁的正常使用

Redlock算法

简单概括来说,将上述的操作在每个master节点上分别执行,如果在容忍时间内,至少有一半以上的master节点能够加锁成功,则认为分布式锁获取成功

详情查看官网

Redisson分布式锁简单使用

Redisson是一个方便我们连接和使用Redis的客户端,它提供了很多常用功能,其中就包括分布式锁。Redisson分布式锁支持单实例、主从、哨兵、分片集群等部署模式,详见wiki

  1. 创建SpringBoot Web项目,引入redisson依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.7</version>
    </dependency>
  2. 在application.properties配置文件中添加redis连接信息

    1
    2
    3
    spring.redis.host=xxx.xxx.xxx.xxx
    spring.redis.port=6379
    spring.redis.password=password
  3. 编写Redisson配置类,创建RedissonClient对象实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Configuration
    public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String password;

    @Bean
    public RedissonClient redissonClient() {
    Config config = new Config();
    // 这里使用单实例模式
    config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);

    return Redisson.create(config);
    }

    }
  4. 编写controller类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @Slf4j
    @RestController
    @RequestMapping(value = "/lottery")
    public class LotteryController {

    @Resource
    private RedissonClient redissonClient;

    @PostMapping(value = "/draw")
    public String draw() {
    RLock lock = redissonClient.getLock("prize:1001");

    try {
    if (!lock.tryLock()) {
    log.info("===> {} didn't get lock", Thread.currentThread().getName());

    return "离大奖只差一步,别灰心,下次还有机会!";
    }

    log.info("===> {} got lock", Thread.currentThread().getName());
    } finally {
    if (lock.isHeldByCurrentThread()) {
    log.info("===> {} unlock", Thread.currentThread().getName());
    lock.unlock();
    }
    }

    return "你就是天选之子!";
    }

    }
  5. 使用Jmeter工具模拟并发请求

  6. 查看Redis中分布式锁信息

  7. 查看控制台日志

可以看到,在并发场景下,Redisson分布式锁可以保证同一时刻只有一个线程能够获取锁,可以满足我们的业务需求。

源码分析

加锁源码

  • tryLock():==带过期时间参数的tryLock()和lock()方法逻辑和以下类似,多了重试和订阅/取消订阅channel的操作==

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public boolean tryLock() {
    return get(tryLockAsync());
    }

    public RFuture<Boolean> tryLockAsync() {
    return tryLockAsync(Thread.currentThread().getId());
    }

    public RFuture<Boolean> tryLockAsync(long threadId) {
    return tryAcquireOnceAsync(-1, -1, null, threadId);
    }
  • tryAcquireOnceAsync()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Boolean> ttlRemainingFuture;

    // 如果自己设置了锁租借时间
    if (leaseTime != -1) {
    ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
    // 没有设置锁租借时间,默认为30s
    ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }

    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e != null) {
    return;
    }

    // lock acquired
    if (ttlRemaining) {
    // 设置自定义锁租借时间
    if (leaseTime != -1) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    } else {
    // 未设置,开启锁续期定时任务(看门狗)
    scheduleExpirationRenewal(threadId);
    }
    }
    });
    return ttlRemainingFuture;
    }
  • tryLockInnerAsync():==加锁核心逻辑==

    解释一下lua脚本中的参数

    • KEYS[1] prize:1001 锁对应的key,即 Collections.singletonList(getRawName())
    • ARGV[1] 30000 锁过期时间,未设置则默认为30000
    • ARGV[2] d5f6-xxxxx:107 锁的field,即 getLockName(threadId),格式为 uuid:threadId
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 加锁逻辑:
    // 1.如果redis中不存在名为 prize:1001 的key,创建key为 prize:1001 的hash,并将field为 d5f646f3-c3ea-47d7-89fb-199d561f4084:107 的value值加一,最后设置过期时间,返回nil,表示加锁成功
    // 2.如果存在并且该key存在指定的field ,将field对应的value值加一并设置过期时间,返回nil,表示加锁成功
    // 3.如果key存在但不存在指定的field,返回锁的剩余过期时间,表示加锁失败

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
    "if (redis.call('exists', KEYS[1]) == 0) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return nil; " +
    "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return nil; " +
    "end; " +
    "return redis.call('pttl', KEYS[1]);",
    Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

续约源码

  • scheduleExpirationRenewal()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // 需要设置过期续约的锁的集合
    private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();

    protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    // 如果当前锁第一次设置过期续约,向EXPIRATION_RENEWAL_MAP中添加一个续约信息;如果不是第一次设置,返回已有的续约信息
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    // 如果不是第一次设置过期续约,将当前线程id添加到原续约信息的线程id集合中
    if (oldEntry != null) {
    oldEntry.addThreadId(threadId);
    } else {
    // 如果第一次设置,将当前线程id添加到刚创建的续约信息的线程id集合中
    entry.addThreadId(threadId);
    // 开启锁过期续约定时任务,即看门狗任务
    try {
    renewExpiration();
    } finally {
    // 如果检测到线程中断,取消续期
    if (Thread.currentThread().isInterrupted()) {
    cancelExpirationRenewal(threadId);
    }
    }
    }
    }
  • renewExpiration():==看门狗核心逻辑==

    Timeout是netty工具包下的一个关于定时任务的接口,它可以设置定时任务和定时信息

    TimerTask同上,它是定时任务的抽象接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    private void renewExpiration() {
    // 获取锁续约信息,不存在则返回
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
    return;
    }

    // 创建定时任务,周期为 internalLockLeaseTime/3 ,未设置锁租借时间,默认为 10s
    // Config.java: private long lockWatchdogTimeout = 30 * 1000;
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
    // 获取锁续约信息(线程id+过期时间)
    ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ent == null) {
    return;
    }

    // 获取第一个添加的线程id
    Long threadId = ent.getFirstThreadId();
    if (threadId == null) {
    return;
    }

    // 执行锁续期lua脚本
    RFuture<Boolean> future = renewExpirationAsync(threadId);
    future.onComplete((res, e) -> {
    // 续期失败,删除续约信息
    if (e != null) {
    log.error("Can't update lock " + getRawName() + " expiration", e);
    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    return;
    }
    // 续期成功后自己调用自己,即重复创建并执行看门狗任务
    if (res) {
    // reschedule itself
    renewExpiration();
    } else {
    // 续期失败后取消续期
    cancelExpirationRenewal(null);
    }
    });
    }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
    }
  • renewExpirationAsync():==锁续期核心逻辑==

    lua脚本参数参考这里

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 续期逻辑:
    // 1.判断锁中指定的field是否存在,如果存在,使用pexpire指令重新设置过期时间(单位毫秒),返回1,表示续期成功
    // 2.如果指定的field不存在,返回0,表示续期失败

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return 1; " +
    "end; " +
    "return 0;",
    Collections.singletonList(getRawName()),
    internalLockLeaseTime, getLockName(threadId));
    }
  • cancelExpirationRenewal():取消续约

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    protected void cancelExpirationRenewal(Long threadId) {
    // 获取续约信息
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
    return;
    }

    // 若线程id存在,则从续约信息的线程id集合中移除
    if (threadId != null) {
    task.removeThreadId(threadId);
    }

    // 如果线程id不存在或者续约信息中无线程id,取消定时任务和其关联,从过期续约集合中移除该续约信息
    if (threadId == null || task.hasNoThreads()) {
    Timeout timeout = task.getTimeout();
    if (timeout != null) {
    timeout.cancel();
    }
    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
    }

解锁源码

  • unlock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public void unlock() {
    try {
    get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
    if (e.getCause() instanceof IllegalMonitorStateException) {
    throw (IllegalMonitorStateException) e.getCause();
    } else {
    throw e;
    }
    }
    }
  • unlockAsync()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<>();
    // 实际解锁操作
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
    // 解锁完成后取消续约,这部分逻辑参考之前的解锁源码部分
    cancelExpirationRenewal(threadId);

    if (e != null) {
    result.tryFailure(e);
    return;
    }

    if (opStatus == null) {
    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
    + id + " thread-id: " + threadId);
    result.tryFailure(cause);
    return;
    }

    result.trySuccess(null);
    });

    return result;
    }
  • unlockInnerAsync():==解锁核心逻辑==

    解释一下lua脚本中的参数

    • KEYS[1] prize:1001 锁对应的key,即 getRawName()
    • KEYS[2] redisson_lock__channel:{prize:1001} 锁的channel
    • ARGV[1] 0 锁的channel发布的信息
    • ARGV[2] 30000 锁过期时间
    • ARGV[3] d5f6-xxxxx:107 锁的field,即 getLockName(threadId),格式为 uuid:threadId
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // 解锁逻辑:
    // 1.如果redis中不存在指定的key和field,返回nil,表示解锁成功(锁已经不存在了)
    // 2.如果存在,将field对应的value值减一,如果减一后的value值大于0,使用pexpire指令重新设置过期时间(单位毫秒),即重入次数减一,返回0
    // 3.如果减一后的value值小于或等于0,删除指定的key,发布解锁信息到指定的channel,订阅了该channel并设置了监听事件的客户端会执行回调和释放信号量,唤醒等待的线程重新获取锁(这部分内容下次我会详细写一篇文章进行分析),返回1

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
    "return nil;" +
    "end; " +
    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
    "if (counter > 0) then " +
    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
    "return 0; " +
    "else " +
    "redis.call('del', KEYS[1]); " +
    "redis.call('publish', KEYS[2], ARGV[1]); " +
    "return 1; " +
    "end; " +
    "return nil;",
    Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

总结

​ 从零开始使用Redis作为分布式锁会遇到各种各样的问题,通过一步步的分析,我们慢慢摸索到了分布式锁的雏形,而Redlock算法就是为了解决这些问题而提出的。但是它仍然不是完美的,比如过渡依赖时钟、网络、设计过重等,具体可以网上搜索Redis作者Antirez和分布式系统专家Martin的论战。在实际使用时,应该根据我们的业务场景和需求来决定采用何种解决方案。