全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-12 10 分钟 ✍️ juanwangdev

Redis 分布式锁

分布式锁用于多节点环境下的并发控制,保证同一时刻只有一个节点能执行特定操作。

锁的基本需求

分布式锁特性

Bash
- 互斥性:同一时刻只有一个持有者
- 高可用:锁服务稳定可靠
- 防死锁:锁必须能释放(超时自动释放)
- 防误删:只能删除自己持有的锁

锁的实现

基本实现(有问题)

Bash
# 加锁
SET lock:resource "1" NX EX 30

# 业务操作

# 解锁
DEL lock:resource

问题:客户端A的锁可能被客户端B删除(A超时后B获得锁,A执行完删除B的锁)。

正确实现:带标识的锁

Python
# 加锁:设置唯一标识
SET lock:resource "uuid:A" NX EX 30

# 业务操作

# 解锁:检查标识后删除(Lua保证原子)
EVAL "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end" 1 lock:resource uuid:A

Lua脚本保证检查和删除的原子性,防止误删其他客户端的锁。

锁续期(看门狗)

问题场景

Python
- 锁设置30秒过期
- 业务执行超过30秒
- 锁自动释放,其他客户端获得锁
- 导致并发问题

解决方案:自动续期

Python
# 看门狗模式
def acquire_lock_with_watchdog(key, uuid, lease_time=30):
    # 加锁
    if redis.set(key, uuid, nx=True, ex=lease_time):
        # 启动续期线程
        start_watchdog(key, uuid, lease_time)
        return True
    return False

def start_watchdog(key, uuid, lease_time):
    # 每10秒检查并续期(lease_time的1/3)
    while True:
        sleep(lease_time / 3)

        # 检查是否还持有锁
        if redis.get(key) == uuid:
            # 续期
            redis.expire(key, lease_time)
        else:
            # 锁已被释放,停止续期
            break

RedLock续期

Python
# 使用后台线程定时续期
# 业务完成后停止续期并释放锁

锁释放

正确释放流程

Python
def release_lock(key, uuid):
    # Lua脚本保证原子性
    script = "
    if redis.call('GET', KEYS[1]) == ARGV[1] then
        return redis.call('DEL', KEYS[1])
    else
        return 0
    end
    "
    return redis.eval(script, 1, key, uuid)

释放时机

Python
- 业务执行完成后立即释放
- 异常时也要释放(try-finally)
- 看门狗停止后再释放

锁超时设置

超时时间选择

Python
原则:
- 大于业务执行时间
- 短于业务容忍等待时间

建议:
- 根据业务执行时间估算
- 预留安全余量
- 结合看门狗续期

获取锁失败处理

Python
def try_lock(key, uuid, timeout=5, lease_time=30):
    end_time = time.time() + timeout

    while time.time() < end_time:
        if redis.set(key, uuid, nx=True, ex=lease_time):
            return True
        sleep(0.1)

    return False  # 超时未获取到锁

集群锁(RedLock)

单节点问题

Python
- 主节点设置锁成功
- 锁未同步到从节点
- 主节点故障,从节点升级
- 新主节点无锁,其他客户端获得锁
- 导致锁失效

RedLock算法

text
1. 获取当前时间戳
2. 按顺序向多个Redis节点请求加锁
3. 计算获取锁消耗的时间
4. 若在(N/2+1)个节点成功且耗时<锁有效期,则成功
5. 失败则向所有节点释放锁

RedLock实现

text
def redlock_acquire(resource, uuid, ttl=30, retry=3):
    quorum = len(redis_nodes) // 2 + 1

    for _ in range(retry):
        acquired = 0
        start_time = time.time()

        for node in redis_nodes:
            try:
                if node.set(f"lock:{resource}", uuid, nx=True, ex=ttl):
                    acquired += 1
            except:
                pass

        elapsed = time.time() - start_time

        # 判断是否成功
        if acquired >= quorum and elapsed < ttl:
            return True

        # 失败,释放所有锁
        for node in redis_nodes:
            release_lock_on_node(node, resource, uuid)

        sleep(0.1)

    return False

锁的应用场景

1. 库存扣减

text
def deduct_stock(product_id, quantity):
    lock_key = f"lock:stock:{product_id}"
    uuid = generate_uuid()

    if acquire_lock(lock_key, uuid, lease_time=10):
        try:
            # 检查库存
            stock = redis.get(f"stock:{product_id}")
            if stock >= quantity:
                # 扣减
                redis.decrby(f"stock:{product_id}", quantity)
                return True
            return False
        finally:
            release_lock(lock_key, uuid)
    return False

2. 定时任务调度

text
def scheduled_task():
    lock_key = "lock:scheduled:task"
    uuid = generate_uuid()

    if acquire_lock(lock_key, uuid, lease_time=60):
        try:
            # 执行任务
            do_task()
        finally:
            release_lock(lock_key, uuid)

3. 分布式事务

text
def transfer(from_user, to_user, amount):
    # 获取两个用户的锁(按固定顺序避免死锁)
    lock1 = f"lock:user:{min(from_user, to_user)}"
    lock2 = f"lock:user:{max(from_user, to_user)}"

    uuid1 = acquire_lock(lock1, uuid(), 30)
    uuid2 = acquire_lock(lock2, uuid(), 30)

    if uuid1 and uuid2:
        try:
            # 执行转账
            do_transfer(from_user, to_user, amount)
        finally:
            release_lock(lock1, uuid1)
            release_lock(lock2, uuid2)

锁的问题与优化

死锁预防

text
- 设置超时时间,自动释放
- 按固定顺序获取多个锁
- 使用超时机制获取锁

性能优化

text
# 短时间等待获取锁
def try_lock_with_timeout(key, uuid, timeout=3):
    for _ in range(timeout * 10):
        if redis.set(key, uuid, nx=True, ex=30):
            return True
        sleep(0.1)
    return False

注意事项

锁的正确使用

text
- 加锁时设置唯一标识
- 释放时检查标识(Lua脚本)
- 设置合理的超时时间
- 异常情况必须释放锁
- 集群使用RedLock算法

不适用场景

text
- 极高并发场景(Redis单节点瓶颈)
- 强一致性要求(考虑数据库锁)
- 长时间持有锁(看门狗可能失效)

要点总结

  • SET key value NX EX实现加锁,NX不存在才设置
  • 锁必须设置唯一标识(uuid),防止误删其他客户端的锁
  • Lua脚本保证检查标识和删除锁的原子性
  • 锁续期(看门狗):定时检查并续期,防止业务超时释放
  • RedLock算法:多数节点成功才算成功,解决集群锁问题
  • 获取多个锁时按固定顺序,避免死锁
  • 设置超时时间:大于业务执行时间,短于容忍等待时间
  • 异常必须释放锁(try-finally)
  • 极高并发和强一致性场景不适合Redis分布式锁

📝 发现内容有误?点击此处直接编辑

← 上一篇 Redis 位图与布隆过滤器
下一篇 → Redis 延迟队列
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库