在使用 Redis 作为缓存或存储系统时,并发访问可能会导致数据竞争和不一致性问题。为了确保数据的一致性和操作的原子性,我们需要一种机制来管理对 Redis 的并发访问。本文将介绍如何在 Node.js 中实现 Redis 分布式锁,并分享我的解决方案和实现代码。
背景
在我们UI自动化测试的项目中,后端使用Nodejs,并且使用 Redis 作为缓存层。然而,当多个进程同时访问 Redis 时,会遇到并发问题,导致数据不一致。当时(2021年8月)网上关于如何在 Node.js 中处理 Redis 并发问题的资料并不多,因此我花了两天时间研究并实现了一个基于 Redis 的分布式锁机制。
实现方案
我们使用 Redis 的 SET
命令和 Lua 脚本来实现分布式锁。具体实现包括以下几个部分:
初始化 RedisLock:设置锁的默认过期时间和超时时间。
上锁:尝试获取锁,如果失败则重试,直到超时。
释放锁:检查锁是否属于当前持有者,如果是则释放。
代码实现
class RedisLock { /** * 初始化 RedisLock * @param {*} client Redis 客户端实例 * @param {*} options 配置选项 */ constructor(client, options = {}) { if (!client) { throw new Error('client 不存在'); } if (client.status !== 'connecting') { throw new Error('client 未正常链接'); } this.lockLeaseTime = options.lockLeaseTime || 2; // 默认锁过期时间 2 秒 this.lockTimeout = options.lockTimeout || 5; // 默认锁超时时间 5 秒 this.expiryMode = options.expiryMode || 'EX'; this.setMode = options.setMode || 'NX'; this.client = client; } /** * 上锁 * @param {*} key 锁的键 * @param {*} val 锁的值 * @param {*} expire 锁的过期时间 */ async lock(key, val, expire) { const start = Date.now(); const self = this; return (async function intranetLock() { try { const result = await self.client.set(key, val, self.expiryMode, expire || self.lockLeaseTime, self.setMode); // 上锁成功 if (result === 'OK') { return true; } // 锁超时 if (Math.floor((Date.now() - start) / 1000) > self.lockTimeout) { global.nts && global.nts({ title: `上锁重试超时结束`, source: 'server/n-slave/redis.js', flag: Math.random(), detail: { key, val } }); return false; } // 循环等待重试 await sleep(); return intranetLock(); } catch (err) { global.nts && global.nts({ title: `上锁重试超时结束进入catch函数`, source: 'server/n-slave/redis.js', flag: Math.random(), detail: { err } }); // 重试机制 if (Math.floor((Date.now() - start) / 1000) <= self.lockTimeout) { await sleep(); return intranetLock(); } else { throw new Error(err); } } })(); } /** * 释放锁 * @param {*} key 锁的键 * @param {*} val 锁的值 */ async unLock(key, val) { const self = this; const script = ` if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end `; try { const result = await self.client.eval(script, 1, key, val); if (result === 1) { return true; } return false; } catch (err) { global.nts && global.nts({ title: `释放锁进入catch函数`, source: 'server/n-slave/redis.js', flag: Math.random(), detail: { err } }); // 备用方案:再次尝试解锁 try { const result = await self.client.eval(script, 1, key, val); return result === 1; } catch (retryErr) { throw new Error(retryErr); } } } } // 初始化 Redis 客户端 const ioRedis = new redis(config.redisConfig); const redisLock = new RedisLock(ioRedis); /** * 设置原生任务函数 * @param {*} key 锁的键 * @param {*} cb 回调函数 * @param {*} i 回调函数参数 */ async function setNativeTasksFn(key, cb, i) { try { const id = uuid.v1(); await redisLock.lock(key, id, 200); await cb(i); const unLock = await redisLock.unLock(key, id); } catch (err) { global.nts && global.nts({ title: `上锁失败`, source: 'server/n-slave/redis.js', flag: Math.random(), detail: { key, err } }); } } // sleep 函数实现,用于等待重试 function sleep(ms = 100) { return new Promise(resolve => setTimeout(resolve, ms)); }
代码解析
初始化 RedisLock:
构造函数中初始化 Redis 客户端,并设置锁的默认过期时间和超时时间。
确保客户端正常连接,否则抛出错误。
上锁:
lock
方法尝试获取锁,使用 Redis 的SET
命令,设置过期时间和互斥模式。如果锁获取失败,则循环等待重试,直到超时。
使用自调用函数
intranetLock
实现锁重试机制。在
catch
语句中增加重试机制,确保在锁超时之前多次尝试获取锁。释放锁:
unLock
方法使用 Lua 脚本检查并删除锁,确保只有持有锁的客户端才能释放锁。使用 Redis 的
EVAL
命令执行 Lua 脚本,确保原子操作。在
catch
语句中增加备用方案,再次尝试解锁,确保系统的稳定性和可靠性。设置任务函数:
setNativeTasksFn
方法封装了锁的获取和释放逻辑,在执行任务前后进行锁操作,确保任务的原子性。
小知识点讲解
获取分布式锁 Redis SET 命令选项解析:expiryMode 和 setMode
await self.client.set(key, val, self.expiryMode, expire || self.lockLeaseTime, self.setMode);
expiryMode (过期模式)
EX: 单位:秒。适用于会话数据、一次性验证码、短期授权令牌等。
PX:单位:毫秒。适用于实时性要求高的应用场景。
setMode (设置模式)
NX:只有当键不存在时才会设置键。适用于分布式锁、唯一性约束。
XX:只有当键已经存在时才会设置键(更新该键的值)。适用于只更新已存在的记录。
组合应用场景
EX + NX:实现带有超时机制的分布式锁,确保锁在一定时间后自动释放。
PX + NX:实现需要精确控制过期时间的分布式锁或临时缓存。
EX + XX:延长已存在键的过期时间,确保数据在需要时得到更新。
PX + XX:精确控制已存在键的过期时间,用于需要精确更新过期时间的场景。
注意事项
并发访问高峰期,频繁的锁操作可能导致 Redis 性能下降。
优化建议:优化锁的重试机制,减少不必要的重试操作,结合业务场景调整锁的超时时间和重试策略。
锁的持有时间设定不当可能导致锁过期过快或持有时间过长。
优化建议:根据具体业务场景合理设定锁的持有时间,并在必要时动态调整。
当前错误处理较为简单,只是记录日志和抛出错误。
优化建议:在错误处理过程中,增加重试机制或备用方案,确保系统的稳定性和可靠性。
结论
通过上述方案,我们在 Node.js 中实现了 Redis 分布式锁,有效解决了并发访问 Redis 时的数据一致性问题。尽管在实现过程中遇到了一些挑战,但通过合理的设计和优化,我们成功构建了一个稳定、高效的锁机制。