多线程对分布式值进行自增+Redission的分布式锁源码解读

10 分钟阅读原创177 次浏览

背景

最近入职了一家新公司,有一道题需要使用redis-lock解决分布式并发问题,所以这篇文章主要研究一下redis-lock是如何实现的。

首先,题目以及解析如下:

java
// 分布式的Lock,写一个多线程,创建一个Map给Map添加一个key-value,三个线程同时对这个key的value进行递增,保证线程安全。
   @Test
    public void testRedisson() throws InterruptedException {
        RLock lock = Redisson.getLock("lock1");
        RMapCache map = Redisson.getCachedMap("count2");
        map.put("count2", 0);
        for (int i = 0; i  {
                for (int j = 0; j

- tryAcquire尝试获取锁,如果获取到返回true

- 获取不到锁说明锁被占用了,订阅解锁消息通知

- 收到解锁消息通知,再次尝试获取锁,如果获取不到重复步骤三,直到超过waitTime获取锁失败

- 不论是否获取锁成功,取消解锁消息订阅。

```java
// 在waitTime时间范围内尝试获取锁,如果获取到锁,则设置锁过期时间leaseTime
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  long threadId = Thread.currentThread().getId();
  // 第一步:尝试获取锁
  Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
  //  ttl为空说明获取到了锁
  if (ttl == null) {
    return true;
  } else {
    // 判断尝试获取锁是否超过waitTime
    time -= System.currentTimeMillis() - current;
    if (time  subscribeFuture = this.subscribe(threadId);
​
      try {
        // 订阅锁释放消息,等待时间超过waitTime,获取锁失败
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
      } catch (TimeoutException var21) {
        if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
          // 如果订阅解锁Future在执行中,等任务执行完后取消订阅锁释放
          subscribeFuture.whenComplete((res, ex) -> {
            if (ex == null) {
              // 取消订阅解锁通知
              this.unsubscribe(res, threadId);
            }
​
          });
        }
​
        this.acquireFailed(waitTime, unit, threadId);
        return false;
      } catch (ExecutionException var22) {
        this.acquireFailed(waitTime, unit, threadId);
        return false;
      }
​
      boolean var16;
      try {
        // 判断尝试获取锁以及订阅解锁消息的时间是否超过waitTime
        time -= System.currentTimeMillis() - current;
        if (time = 0L && ttl  0L);
​
        this.acquireFailed(waitTime, unit, threadId);
        var16 = false;
      } finally {
        // 第四步:取消解锁订阅this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
      }
​
      return var16;
    }
  }
}

抢锁代码

下面是加锁的代码实现,包括抢锁以及看门狗的实现

java
private  RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 这里需要注意的是leaseTime==-1,会触发redisson看门狗机制,此处的方法使用的是lua脚本
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }
​
        // 获取锁成功
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 锁自动续时(看门狗机制)触发条件leaseTime == -1;此处底层也是lua脚本,下面会详述
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

抢锁的lua脚本

在下列代码中的lua脚本操作是原子的,其实lua脚本本身不是原子的,但是redis在执行lua脚本的操作是原子的。

java
// 下列代码中变量含义
// KEYS[1] = "锁key"
// ARGV[1] = "锁过期时间"
// ARGV[2] = "当前连接的UUID + : + 线程id“ 对应后面的getLockName方法返回
​
​
 RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            // 如果key一开始就不存在,则直接创建一个key
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    // 使用 `HINCRB。Y` 将哈希表 `KEYS[1]` 中的字段 `ARGV[2]`(表示当前线程的锁计数器)加 1
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    // 设置该锁的过期时间为 `ARGV[1]`(即 `leaseTime` 转换为毫秒)。
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +

                    // 这里是重入锁的实现,同一个线程多次获取锁只需要在value加1即可,value相当于一个加锁计数器
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    //  `HEXISTS` 用于检查当前线程(由 `ARGV[2]` 标识)是否已经持有锁。
                    // 如果线程已经持有锁(即锁为重入),通过 `HINCRBY` 将计数器加 1,并重置锁的过期时间。
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    // 有其他线程持有锁,加锁失败,返回锁过期时间
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
​

看门狗的实现

续时间的方法解读

java
protected void scheduleExpirationRenewal(long threadId) {
    // 保存当前加锁key有那些线程自动续时,取消自动续时后会清除此对象内部数据
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            // 更新锁过期时间
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}
​

此处是更新时间操作:首先判断锁是否占用,如果占用,则使用定时任务对当前锁进行续时。

java
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    // 定时任务(可以搜io.netty.util.HashedWheelTimer)
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            // 更新锁过期时间(lua脚本)
            RFuture future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                // 更新锁过期时间成功
                if (res) {
                    // 递归调用 如果10秒后依然没有解锁,继续更新锁过期时间
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
        // internalLockLeaseTime在不设置lockWatchdogTimeout情况下默认30s,这里会延迟10s触发此任务
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}
​

看门狗对应的更新过期时间的lua脚本

java
protected RFuture renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 当前线程已持有锁,更新锁过期时间
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}
​

unLock源码解读

java
public void unlock() {
        try {
            // 解锁
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
    }
​

unlockAsync()方法内部会调用lua解锁脚本

Java
protected RFuture unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    // 推送解锁通知
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

加解锁的订阅发布类

在redissonLock中会有一个LockPubSub的实体类,这个就是消息发布订阅对应的实体,

可以看到此实体类,在发现消息时,会比较是否是释放锁的消息,如果是则将队列中的任务进行释放,通过回调的runnable来通知等待的线程

java
public class LockPubSub extends PublishSubscribe {
    public static final Long UNLOCK_MESSAGE = 0L;
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
        super(service);
    }

    protected RedissonLockEntry createEntry(CompletableFuture newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    protected void onMessage(RedissonLockEntry value, Long message) {
        Runnable runnableToExecute;
        if (message.equals(UNLOCK_MESSAGE)) {
            runnableToExecute = (Runnable)value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            while(true) {
                runnableToExecute = (Runnable)value.getListeners().poll();
                if (runnableToExecute == null) {
                    value.getLatch().release(value.getLatch().getQueueLength());
                    break;
                }

                runnableToExecute.run();
            }
        }

    }
}

总结

加锁操作:

  • tryAcquire尝试获取锁,会通过lua脚本判断是否有锁占用,如果已经有锁占用则判断是否是当前客户端,如果是当前客户端,则说明是可重入的锁,当前客户端可以继续获得这把锁,如果获取到返回true。如果没抢到当前锁,则使用看门狗定时任务,每隔10s对持有锁加上过期时间。

  • 获取不到锁说明锁被占用了,订阅解锁消息通知

  • 收到解锁消息通知,再次尝试获取锁,如果获取不到重复步骤三,直到超过waitTime获取锁失败

  • 不论是否获取锁成功,取消解锁消息订阅。

解锁操作:

  • 发布解锁消息通知。

  • 监听器判断为解锁操作时,执行等待队列中的任务,放开对应的锁。

加解锁消息通知

  • 使用的是定义的LockPubSub进行通讯。

  • 加锁时订阅此通道,

  • 解锁时在此通道发布消息。

评论

暂无公开评论。