切换语言为:繁体

解决 Node.js 中 Redis 并发问题:使用 RedisLock 实现分布式锁

  • 爱糖宝
  • 2024-07-26
  • 2115
  • 0
  • 0

在使用 Redis 作为缓存或存储系统时,并发访问可能会导致数据竞争和不一致性问题。为了确保数据的一致性和操作的原子性,我们需要一种机制来管理对 Redis 的并发访问。本文将介绍如何在 Node.js 中实现 Redis 分布式锁,并分享我的解决方案和实现代码。

背景

在我们UI自动化测试的项目中,后端使用Nodejs,并且使用 Redis 作为缓存层。然而,当多个进程同时访问 Redis 时,会遇到并发问题,导致数据不一致。当时(2021年8月)网上关于如何在 Node.js 中处理 Redis 并发问题的资料并不多,因此我花了两天时间研究并实现了一个基于 Redis 的分布式锁机制。

实现方案

我们使用 Redis 的 SET 命令和 Lua 脚本来实现分布式锁。具体实现包括以下几个部分:

  1. 初始化 RedisLock:设置锁的默认过期时间和超时时间。

  2. 上锁:尝试获取锁,如果失败则重试,直到超时。

  3. 释放锁:检查锁是否属于当前持有者,如果是则释放。

代码实现

class RedisLock {
  /**
   * 初始化 RedisLock
   * @param {*} client Redis 客户端实例
   * @param {*} options 配置选项
   */
  constructor(client, options = {}) {
    if (!client) {
      throw new Error('client 不存在');
    }

    if (client.status !== 'connecting') {
      throw new Error('client 未正常链接');
    }

    this.lockLeaseTime = options.lockLeaseTime || 2; // 默认锁过期时间 2 秒
    this.lockTimeout = options.lockTimeout || 5; // 默认锁超时时间 5 秒
    this.expiryMode = options.expiryMode || 'EX';
    this.setMode = options.setMode || 'NX';
    this.client = client;
  }

  /**
   * 上锁
   * @param {*} key 锁的键
   * @param {*} val 锁的值
   * @param {*} expire 锁的过期时间
   */
  async lock(key, val, expire) {
    const start = Date.now();
    const self = this;

    return (async function intranetLock() {
      try {
        const result = await self.client.set(key, val, self.expiryMode, expire || self.lockLeaseTime, self.setMode);

        // 上锁成功
        if (result === 'OK') {
          return true;
        }

        // 锁超时
        if (Math.floor((Date.now() - start) / 1000) > self.lockTimeout) {
          global.nts && global.nts({
            title: `上锁重试超时结束`,
            source: 'server/n-slave/redis.js',
            flag: Math.random(),
            detail: { key, val }
          });
          return false;
        }

        // 循环等待重试
        await sleep();
        return intranetLock();
      } catch (err) {
        global.nts && global.nts({
          title: `上锁重试超时结束进入catch函数`,
          source: 'server/n-slave/redis.js',
          flag: Math.random(),
          detail: { err }
        });

        // 重试机制
        if (Math.floor((Date.now() - start) / 1000) <= self.lockTimeout) {
          await sleep();
          return intranetLock();
        } else {
          throw new Error(err);
        }
      }
    })();
  }

  /**
   * 释放锁
   * @param {*} key 锁的键
   * @param {*} val 锁的值
   */
  async unLock(key, val) {
    const self = this;
    const script = `
      if redis.call('get',KEYS[1]) == ARGV[1] then
        return redis.call('del',KEYS[1])
      else
        return 0
      end
    `;

    try {
      const result = await self.client.eval(script, 1, key, val);

      if (result === 1) {
        return true;
      }

      return false;
    } catch (err) {
      global.nts && global.nts({
        title: `释放锁进入catch函数`,
        source: 'server/n-slave/redis.js',
        flag: Math.random(),
        detail: { err }
      });

      // 备用方案:再次尝试解锁
      try {
        const result = await self.client.eval(script, 1, key, val);
        return result === 1;
      } catch (retryErr) {
        throw new Error(retryErr);
      }
    }
  }
}

// 初始化 Redis 客户端
const ioRedis = new redis(config.redisConfig);
const redisLock = new RedisLock(ioRedis);

/**
 * 设置原生任务函数
 * @param {*} key 锁的键
 * @param {*} cb 回调函数
 * @param {*} i 回调函数参数
 */
async function setNativeTasksFn(key, cb, i) {
  try {
    const id = uuid.v1();
    await redisLock.lock(key, id, 200);
    await cb(i);
    const unLock = await redisLock.unLock(key, id);
  } catch (err) {
    global.nts && global.nts({
      title: `上锁失败`,
      source: 'server/n-slave/redis.js',
      flag: Math.random(),
      detail: { key, err }
    });
  }
}

// sleep 函数实现,用于等待重试
function sleep(ms = 100) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

代码解析

  • 初始化 RedisLock

    • 构造函数中初始化 Redis 客户端,并设置锁的默认过期时间和超时时间。

    • 确保客户端正常连接,否则抛出错误。

  • 上锁

    • lock 方法尝试获取锁,使用 Redis 的 SET 命令,设置过期时间和互斥模式。

    • 如果锁获取失败,则循环等待重试,直到超时。

    • 使用自调用函数 intranetLock 实现锁重试机制。

    • catch 语句中增加重试机制,确保在锁超时之前多次尝试获取锁。

  • 释放锁

    • unLock 方法使用 Lua 脚本检查并删除锁,确保只有持有锁的客户端才能释放锁。

    • 使用 Redis 的 EVAL 命令执行 Lua 脚本,确保原子操作。

    • catch 语句中增加备用方案,再次尝试解锁,确保系统的稳定性和可靠性。

  • 设置任务函数

    • setNativeTasksFn 方法封装了锁的获取和释放逻辑,在执行任务前后进行锁操作,确保任务的原子性。

小知识点讲解

获取分布式锁 Redis SET 命令选项解析:expiryMode 和 setMode

await self.client.set(key, val, self.expiryMode, expire || self.lockLeaseTime, self.setMode);

  • expiryMode (过期模式)

    • EX: 单位:秒。适用于会话数据、一次性验证码、短期授权令牌等。

    • PX:单位:毫秒。适用于实时性要求高的应用场景。

  • setMode (设置模式)

    • NX:只有当键不存在时才会设置键。适用于分布式锁、唯一性约束。

    • XX:只有当键已经存在时才会设置键(更新该键的值)。适用于只更新已存在的记录。

  • 组合应用场景

    • EX + NX:实现带有超时机制的分布式锁,确保锁在一定时间后自动释放。

    • PX + NX:实现需要精确控制过期时间的分布式锁或临时缓存。

    • EX + XX:延长已存在键的过期时间,确保数据在需要时得到更新。

    • PX + XX:精确控制已存在键的过期时间,用于需要精确更新过期时间的场景。

注意事项

  • 并发访问高峰期,频繁的锁操作可能导致 Redis 性能下降。

    优化建议:优化锁的重试机制,减少不必要的重试操作,结合业务场景调整锁的超时时间和重试策略。

  • 锁的持有时间设定不当可能导致锁过期过快或持有时间过长。

    优化建议:根据具体业务场景合理设定锁的持有时间,并在必要时动态调整。

  • 当前错误处理较为简单,只是记录日志和抛出错误。

    优化建议:在错误处理过程中,增加重试机制或备用方案,确保系统的稳定性和可靠性。

结论

通过上述方案,我们在 Node.js 中实现了 Redis 分布式锁,有效解决了并发访问 Redis 时的数据一致性问题。尽管在实现过程中遇到了一些挑战,但通过合理的设计和优化,我们成功构建了一个稳定、高效的锁机制。


0条评论

您的电子邮件等信息不会被公开,以下所有项均必填

OK! You can skip this field.