Redis集群客户端路由与Smart Client
Smart Client是Redis集群的高效客户端实现,通过本地路由缓存减少重定向开销,提升访问性能。
客户端路由原理
槽位计算
Bash
槽位 = CRC16(key) % 16384
示例:
CRC16("user:1000") % 16384 = 1234
CRC16("order:2000") % 16384 = 5678
哈希标签
Python
{tag}部分参与槽位计算
user:{1000}:profile → 只计算"1000"的槽位
order:{1000}:items → 与上面相同槽位
路由流程
Python
1. 客户端计算key的槽位
2. 查本地路由表找到目标节点
3. 发送请求到目标节点
4. 若槽位已迁移,收到重定向
5. 更新路由缓存
6. 重新发送请求
重定向机制
MOVED重定向
Python
场景:槽位已永久迁移到其他节点
响应:MOVED <slot> <ip>:<port>
处理:客户端更新路由缓存,后续请求直接发新节点
ASK重定向
Java
场景:槽位正在迁移过程中
响应:ASK <slot> <ip>:<port>
处理:临时重定向,不更新路由缓存
流程:
1. 收到ASK重定向
2. 发送ASKING命令
3. 发送实际请求
ASKING命令
Python
# ASKING标记临时访问权限
ASKING
GET mykey
MOVED vs ASK对比
| 特性 | MOVED | ASK |
|---|---|---|
| 场景 | 槽位永久迁移 | 迁移过程中 |
| 缓存 | 更新路由缓存 | 不更新缓存 |
| 持续性 | 永久 | 一次性 |
| ASKING | 不需要 | 需要 |
Smart Client实现
核心功能
Go
1. 路由缓存:本地维护槽位到节点的映射
2. 连接池:管理到各节点的连接
3. 重定向处理:自动处理MOVED/ASK
4. 故障转移:自动刷新路由,连接新节点
路由缓存结构
JavaScript
# 伪代码
slots_map = {
0: {"host": "192.168.1.101", "port": 7000, "node_id": "xxx"},
1: {"host": "192.168.1.101", "port": 7000, "node_id": "xxx"},
# ...
5461: {"host": "192.168.1.102", "port": 7000, "node_id": "yyy"},
# ...
}
连接池管理
Python
# 连接池示例
class RedisClusterPool:
def __init__(self, nodes):
self.pools = {}
for node in nodes:
self.pools[node['id']] = ConnectionPool(
host=node['host'],
port=node['port'],
max_connections=100
)
def get_connection(self, node_id):
return self.pools[node_id].get_connection()
请求处理流程
Python
def execute_command(key, command):
slot = crc16(key) % 16384
node = get_node_from_cache(slot)
try:
conn = get_connection(node)
return conn.execute(command)
except MovedError as e:
# MOVED重定向
update_routing_table(e.slot, e.node)
return execute_command(key, command)
except AskError as e:
# ASK重定向
conn = get_connection(e.node)
conn.execute("ASKING")
return conn.execute(command)
主流客户端实现
Jedis (Java)
Python
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("192.168.1.101", 7000));
nodes.add(new HostAndPort("192.168.1.102", 7000));
JedisCluster cluster = new JedisCluster(nodes,
new JedisPoolConfig());
// 自动处理路由
cluster.set("user:1000", "Alice");
String value = cluster.get("user:1000");
redis-py (Python)
Bash
from rediscluster import RedisCluster
startup_nodes = [
{"host": "192.168.1.101", "port": 7000},
{"host": "192.168.1.102", "port": 7000}
]
rc = RedisCluster(startup_nodes=startup_nodes)
# 自动处理路由
rc.set("user:1000", "Alice")
value = rc.get("user:1000")
go-redis (Go)
Python
import "github.com/redis/go-redis/v9"
rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{
"192.168.1.101:7000",
"192.168.1.102:7000",
},
})
// 自动处理路由
rdb.Set(ctx, "user:1000", "Alice", 0)
val := rdb.Get(ctx, "user:1000").Val()
ioredis (Node.js)
Java
const Redis = require('ioredis');
const cluster = new Redis.Cluster([
{ host: '192.168.1.101', port: 7000 },
{ host: '192.168.1.102', port: 7000 }
]);
// 自动处理路由
await cluster.set('user:1000', 'Alice');
const value = await cluster.get('user:1000');
路由缓存更新
初始化
Python
def initialize_routing():
# 连接任意节点获取路由表
conn = connect(startup_node)
slots = conn.execute("CLUSTER SLOTS")
for slot_range in slots:
start, end = slot_range[0], slot_range[1]
node = slot_range[2] # (ip, port, node_id)
for slot in range(start, end + 1):
update_slot_mapping(slot, node)
增量更新
Python
def handle_moved_error(error):
# 解析MOVED响应
slot = error.slot
node = parse_node(error.message)
# 更新单个槽位映射
update_slot_mapping(slot, node)
def handle_ask_error(error):
# ASK不更新缓存
# 临时重定向到目标节点
return get_connection(error.node)
全量刷新
Python
def refresh_routing_table():
try:
conn = connect_any_node()
slots = conn.execute("CLUSTER SLOTS")
rebuild_routing_table(slots)
except Exception:
# 尝试其他节点
pass
多键操作处理
限制条件
text
多键操作要求所有key在同一槽位:
- MGET/MSET
- SUNION/SINTER/SDIFF
- Lua脚本涉及多key
- 事务(MULTI/EXEC)
哈希标签解决
text
# 不使用哈希标签(可能失败)
MGET user:1000:name user:1000:email # 可能跨槽位
# 使用哈希标签(保证同槽位)
MGET user:{1000}:name user:{1000}:email # 同槽位
客户端实现
text
def mget_with_tag(keys):
# 提取tag并重构key
tagged_keys = [f"{{tag}}:{key}" for key in keys]
# 确保同槽位
slot = crc16("tag") % 16384
node = get_node(slot)
return node.execute("MGET", tagged_keys)
连接池优化
连接池配置
text
// Jedis连接池配置
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(100); // 最大连接数
poolConfig.setMaxIdle(50); // 最大空闲连接
poolConfig.setMinIdle(10); // 最小空闲连接
poolConfig.setMaxWaitMillis(3000); // 最大等待时间
poolConfig.setTestOnBorrow(true); // 借出时测试
连接池策略
text
1. 每个节点独立连接池
2. 根据访问频率动态调整
3. 空闲连接定期检测
4. 故障节点连接池重建
故障转移处理
客户端重试策略
text
def execute_with_retry(command, max_retries=3):
for i in range(max_retries):
try:
return execute_command(command)
except (ConnectionError, TimeoutError):
if i == max_retries - 1:
raise
refresh_routing_table()
sleep(0.1 * (i + 1)) # 指数退避
故障转移感知
text
1. 连接失败时刷新路由表
2. 获取新主节点信息
3. 重建连接池
4. 重试请求
性能优化
批量操作优化
text
# 使用Pipeline减少RTT
pipe = cluster.pipeline()
for key in keys:
pipe.get(key)
results = pipe.execute()
# 注意:Pipeline内key需同槽位或使用哈希标签
本地缓存
text
# 缓存热点数据
# 减少对集群的访问
local_cache = LRUCache(maxsize=10000)
def get_with_cache(key):
if key in local_cache:
return local_cache[key]
value = cluster.get(key)
local_cache[key] = value
return value
要点总结
- Smart Client本地维护槽位路由缓存,减少重定向开销
- MOVED是永久重定向,更新缓存;ASK是临时重定向,不更新缓存
- 使用哈希标签{tag}保证多键操作在同一槽位
- 路由缓存初始化后增量更新,故障时全量刷新
- 连接池按节点独立管理,故障转移时重建
- 多键操作、事务、Lua脚本需要key在同槽位
- 合理配置连接池参数,实现故障重试和指数退避
- 客户端库自动处理MOVED/ASK重定向,开发者无需手动处理
📝 发现内容有误?点击此处直接编辑