切換語言為:簡體
解讀 Redisson 分散式鎖的原始碼

解讀 Redisson 分散式鎖的原始碼

  • 爱糖宝
  • 2024-07-03
  • 2114
  • 0
  • 0

之前秒殺專案中就用到了這個 Redisson 分散式鎖 👇,這篇就一起來看看原始碼吧!

解讀 Redisson 分散式鎖的原始碼

tryLock 加鎖 流程

解讀 Redisson 分散式鎖的原始碼

// RedissonLock.java
@Override
public boolean tryLock() {
    return get(tryLockAsync());
}

@Override
public RFuture<Boolean> tryLockAsync() {
    return tryLockAsync(Thread.currentThread().getId());
}

@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
    return tryAcquireOnceAsync(-1, -1, null, threadId);
}

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Boolean> acquiredFuture;
    // 續租時間:鎖的過期時間(沒有設定的話就用預設的 internalLockLeaseTime 看門狗時間)
    if (leaseTime > 0) {
        acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
        acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                           TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }

    CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
        // lock acquired
        if (acquired) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 沒配置過期時間就執行這裏
                scheduleExpirationRenewal(threadId);
            }
        }
        return acquired;
    });
    return new CompletableFutureWrapper<>(f);
}

程式碼很長,主要看 tryLockInnerAsyncscheduleExpirationRenewal 方法。

前置知識

解讀 Redisson 分散式鎖的原始碼

解讀 Redisson 分散式鎖的原始碼

// EVAL 命令,用於在 Redis 伺服器端執行 Lua 指令碼。
RedisStrictCommand<Boolean> EVAL_NULL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanNullReplayConvertor());

// BooleanNullReplayConvertor 判斷是不是 NULL。
public class BooleanNullReplayConvertor implements Convertor<Boolean> {
    @Override
    public Boolean convert(Object obj) {        return obj == null;     }
}

tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    // getRawName 即 鎖的名稱
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          // 鎖不存在,新增 hash 資料,可重入次數加一,毫秒級別過期時間,返回 null
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          // 鎖存在,可重入次數加一,毫秒級別過期時間,返回 null
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          // 鎖被別人佔有, 返回毫秒級別過期時間
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

ARGV[1] 過期時間

ARGV[2] 即 getLockName(threadId) ,這裏是 redisson 客戶端id + 這個執行緒 ID , 如下 👇

解讀 Redisson 分散式鎖的原始碼

scheduleExpirationRenewal (看門狗機制)

上面加鎖完,就來到這段程式碼。

沒有設定過期時間的話,預設給你設定 30 s 過期,並每隔 10s 自動續期,確保鎖不會在使用過程中過期。

同時,防止客戶端宕機,留下死鎖。

解讀 Redisson 分散式鎖的原始碼

// RedissonBaseLock.java

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            // 看這裏 
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 延時任務,10s 續期一次。
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
		   
            // 續期操作
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
        // 三分之一時間,30s /3= 10s
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

// 續期指令碼
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getRawName()),
                          internalLockLeaseTime, getLockName(threadId));
}

get

上面的加鎖操作,最終返回的是 return new CompletableFutureWrapper<>(f);  這個非同步操作。

還記得上面的 BooleanNullReplayConvertor 嗎,當 eval 執行加鎖指令碼時,成功會返回 null,並在這裏轉成 True 。

@Override
public <V> V get(RFuture<V> future) {
    if (Thread.currentThread().getName().startsWith("redisson-netty")) {
        throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
    }

    try {
        return future.toCompletableFuture().get();
    } catch (InterruptedException e) {
        future.cancel(true);
        Thread.currentThread().interrupt();
        throw new RedisException(e);
    } catch (ExecutionException e) {
        throw convertException(e);
    }
}

那麼,加鎖的部分到這裏就結束, 解鎖 的就簡單過一下 👇

unlock 解鎖

解讀 Redisson 分散式鎖的原始碼

// RedissonLock.java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          // 不存在,直接返回 null
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          // 減一
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          // 大於0,設定毫秒級過期時間,並返回0
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          // 刪除鎖,並向指定channel釋出 0 這個訊息,並返回1
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          // 返回 null
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

KEYS[1] 為鎖名,KEYS[2] channel 名 👇

解讀 Redisson 分散式鎖的原始碼

ARGV[1] 為0 👇, ARGV[2] 過期時間,ARGV[3] 為 redisson 客戶端id + 這個執行緒 ID

解讀 Redisson 分散式鎖的原始碼

解鎖後,取消續期任務。

解讀 Redisson 分散式鎖的原始碼

結尾

透過原始碼,我們瞭解到上文提到的 redisson 框架的幾個特點:自動續期可重入鎖lua指令碼

當然,它不止這些功能,小夥伴們可以在這裏查閱 👇

github.com/redisson/re…

解讀 Redisson 分散式鎖的原始碼

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.