Redis 分布式锁
分布式锁用于多节点环境下的并发控制,保证同一时刻只有一个节点能执行特定操作。
锁的基本需求
分布式锁特性
Bash
- 互斥性:同一时刻只有一个持有者
- 高可用:锁服务稳定可靠
- 防死锁:锁必须能释放(超时自动释放)
- 防误删:只能删除自己持有的锁
锁的实现
基本实现(有问题)
Bash
# 加锁
SET lock:resource "1" NX EX 30
# 业务操作
# 解锁
DEL lock:resource
问题:客户端A的锁可能被客户端B删除(A超时后B获得锁,A执行完删除B的锁)。
正确实现:带标识的锁
Python
# 加锁:设置唯一标识
SET lock:resource "uuid:A" NX EX 30
# 业务操作
# 解锁:检查标识后删除(Lua保证原子)
EVAL "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end" 1 lock:resource uuid:A
Lua脚本保证检查和删除的原子性,防止误删其他客户端的锁。
锁续期(看门狗)
问题场景
Python
- 锁设置30秒过期
- 业务执行超过30秒
- 锁自动释放,其他客户端获得锁
- 导致并发问题
解决方案:自动续期
Python
# 看门狗模式
def acquire_lock_with_watchdog(key, uuid, lease_time=30):
# 加锁
if redis.set(key, uuid, nx=True, ex=lease_time):
# 启动续期线程
start_watchdog(key, uuid, lease_time)
return True
return False
def start_watchdog(key, uuid, lease_time):
# 每10秒检查并续期(lease_time的1/3)
while True:
sleep(lease_time / 3)
# 检查是否还持有锁
if redis.get(key) == uuid:
# 续期
redis.expire(key, lease_time)
else:
# 锁已被释放,停止续期
break
RedLock续期
Python
# 使用后台线程定时续期
# 业务完成后停止续期并释放锁
锁释放
正确释放流程
Python
def release_lock(key, uuid):
# Lua脚本保证原子性
script = "
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
"
return redis.eval(script, 1, key, uuid)
释放时机
Python
- 业务执行完成后立即释放
- 异常时也要释放(try-finally)
- 看门狗停止后再释放
锁超时设置
超时时间选择
Python
原则:
- 大于业务执行时间
- 短于业务容忍等待时间
建议:
- 根据业务执行时间估算
- 预留安全余量
- 结合看门狗续期
获取锁失败处理
Python
def try_lock(key, uuid, timeout=5, lease_time=30):
end_time = time.time() + timeout
while time.time() < end_time:
if redis.set(key, uuid, nx=True, ex=lease_time):
return True
sleep(0.1)
return False # 超时未获取到锁
集群锁(RedLock)
单节点问题
Python
- 主节点设置锁成功
- 锁未同步到从节点
- 主节点故障,从节点升级
- 新主节点无锁,其他客户端获得锁
- 导致锁失效
RedLock算法
text
1. 获取当前时间戳
2. 按顺序向多个Redis节点请求加锁
3. 计算获取锁消耗的时间
4. 若在(N/2+1)个节点成功且耗时<锁有效期,则成功
5. 失败则向所有节点释放锁
RedLock实现
text
def redlock_acquire(resource, uuid, ttl=30, retry=3):
quorum = len(redis_nodes) // 2 + 1
for _ in range(retry):
acquired = 0
start_time = time.time()
for node in redis_nodes:
try:
if node.set(f"lock:{resource}", uuid, nx=True, ex=ttl):
acquired += 1
except:
pass
elapsed = time.time() - start_time
# 判断是否成功
if acquired >= quorum and elapsed < ttl:
return True
# 失败,释放所有锁
for node in redis_nodes:
release_lock_on_node(node, resource, uuid)
sleep(0.1)
return False
锁的应用场景
1. 库存扣减
text
def deduct_stock(product_id, quantity):
lock_key = f"lock:stock:{product_id}"
uuid = generate_uuid()
if acquire_lock(lock_key, uuid, lease_time=10):
try:
# 检查库存
stock = redis.get(f"stock:{product_id}")
if stock >= quantity:
# 扣减
redis.decrby(f"stock:{product_id}", quantity)
return True
return False
finally:
release_lock(lock_key, uuid)
return False
2. 定时任务调度
text
def scheduled_task():
lock_key = "lock:scheduled:task"
uuid = generate_uuid()
if acquire_lock(lock_key, uuid, lease_time=60):
try:
# 执行任务
do_task()
finally:
release_lock(lock_key, uuid)
3. 分布式事务
text
def transfer(from_user, to_user, amount):
# 获取两个用户的锁(按固定顺序避免死锁)
lock1 = f"lock:user:{min(from_user, to_user)}"
lock2 = f"lock:user:{max(from_user, to_user)}"
uuid1 = acquire_lock(lock1, uuid(), 30)
uuid2 = acquire_lock(lock2, uuid(), 30)
if uuid1 and uuid2:
try:
# 执行转账
do_transfer(from_user, to_user, amount)
finally:
release_lock(lock1, uuid1)
release_lock(lock2, uuid2)
锁的问题与优化
死锁预防
text
- 设置超时时间,自动释放
- 按固定顺序获取多个锁
- 使用超时机制获取锁
性能优化
text
# 短时间等待获取锁
def try_lock_with_timeout(key, uuid, timeout=3):
for _ in range(timeout * 10):
if redis.set(key, uuid, nx=True, ex=30):
return True
sleep(0.1)
return False
注意事项
锁的正确使用
text
- 加锁时设置唯一标识
- 释放时检查标识(Lua脚本)
- 设置合理的超时时间
- 异常情况必须释放锁
- 集群使用RedLock算法
不适用场景
text
- 极高并发场景(Redis单节点瓶颈)
- 强一致性要求(考虑数据库锁)
- 长时间持有锁(看门狗可能失效)
要点总结
- SET key value NX EX实现加锁,NX不存在才设置
- 锁必须设置唯一标识(uuid),防止误删其他客户端的锁
- Lua脚本保证检查标识和删除锁的原子性
- 锁续期(看门狗):定时检查并续期,防止业务超时释放
- RedLock算法:多数节点成功才算成功,解决集群锁问题
- 获取多个锁时按固定顺序,避免死锁
- 设置超时时间:大于业务执行时间,短于容忍等待时间
- 异常必须释放锁(try-finally)
- 极高并发和强一致性场景不适合Redis分布式锁
📝 发现内容有误?点击此处直接编辑