Lettuce之RedisClusterClient使用以及源码分析
Redis 集群的数据分片
redis集群并没有使用一致性hash算法而引入了哈希槽概念,Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽.集群的每个节点负责一部分hash槽.也就是说如果key是不变的对应的slot也是不变的
可以通过cluster info 命名查看
cluster info cluster_state:ok cluster_slots_assigned:16384 cluster_slots_ok:16384 cluster_slots_pfail:0 cluster_slots_fail:0 cluster_known_nodes:12
通过cluster nodes命令查看当前节点以及该节点分配的slot,如下图可以发现当前redis集群有12个节点,每个节点大约管理1365个slot
xx.xxx.xxx.xx:6959> cluster nodes 45abb8663c0cdb25ed17c29521bf6fda98e913ea xx.xxx.xxx.xx:6961 master - 0 1529229636724 11 connected 13653-15018 e40080f32a3fb89e34b7622038ce490682428fdf xx.xxx.xxx.xx:6960 master - 0 1529229633723 10 connected 12288-13652 a749bba5614680dea9f47e3c8fe595aa8be71a2c xx.xxx.xxx.xx:6954 master - 0 1529229639230 4 connected 4096-5460 1096e2a8737401b66c7d4ee0addcb10d7ff14088 xx.xxx.xxx.xx:6952 master - 0 1529229636224 2 connected 1365-2730 fbc76f3481271241c1a89fabeb5139905e1ec2a6 xx.xxx.xxx.xx:6962 master - 0 1529229638230 12 connected 15019-16383 85601fa67820a5af0de0cc21d102d72575709ec6 xx.xxx.xxx.xx:6959 myself,master - 0 0 9 connected 10923-12287 c00d86999c98f97d697f3a2b33ba26fbf50e46eb xx.xxx.xxx.xx:6955 master - 0 1529229634724 5 connected 5461-6826 0b09a5c4c9e9158520389dd2672bd711d55085c6 xx.xxx.xxx.xx:6953 master - 0 1529229637227 3 connected 2731-4095 9f26d208fa8772449d5c322eb63786a1cf9937e0 xx.xxx.xxx.xx:6958 master - 0 1529229635224 8 connected 9557-10922 274294a88758fcb674e1a0292db0e36a66a0bf48 xx.xxx.xxx.xx:6951 master - 0 1529229634223 1 connected 0-1364 369780bdf56d483a0f0a92cb2baab786844051f3 xx.xxx.xxx.xx:6957 master - 0 1529229640232 7 connected 8192-9556 71ed0215356c664cc56d4579684e86a83dba3a92 xx.xxx.xxx.xx:6956 master - 0 1529229635724 6 connected 6827-8191
请求重定向
由于每个节点只负责部分slot,以及slot可能从一个节点迁移到另一节点,造成客户端有可能会向错误的节点发起请求。因此需要有一种机制来对其进行发现和修正,这就是请求重定向。有两种不同的重定向场景:
- MOVED
声明的是slot所有权的转移,收到的客户端需要更新其key-node映射关系
- ASK
申明的是一种临时的状态,所有权还并没有转移,客户端并不更新其映射关系。前面的加的ASKING命令也是申明其理解当前的这种临时状态
通过集群查询数据key为test的值
xx.xxx.xxx.xx:6959> get test (error) MOVED 6918 xx.xxx.xx.xxx:6956
此时返回的结果表示该key在6956这个实例上,通过这个实例可以获取到缓存值
xx.xxx.xx.xxx:6956> get test "cluster"
通过上文的示例可以发现获取缓存值的过程需要访问cluster两次,既然key到slot值的算法是已知的,如果可以通过key直接计算slot,在通过每个节点的管理的slot范围就可以知道这个key对应哪个节点了,这样不就可以一次获取到了吗?其实lettuce中就是这样处理的.
Lettuce使用
@Bean(name="clusterRedisURI")
RedisURI clusterRedisURI(){
return RedisURI.builder().withHost("xx.xx.xxx.xx").withPort(6954).build();
}
@Bean
ClusterClientOptions clusterClientOptions(){
return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1).build();
}
@Bean
RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){
RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI);
redisClusterClient.setOptions(clusterClientOptions);
return redisClusterClient;
}
/**
* 集群模式
*/
@Bean(destroyMethod = "close")
StatefulRedisClusterConnection<String,String> statefulRedisClusterConnection(RedisClusterClient redisClusterClient){
return redisClusterClient.connect();
}
Lettuce相关源码
在创建连接时就会主动发现集群图谱信息
<K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) {
//如果分区信息为null则初始化分区信息
if (partitions == null) {
initializePartitions();
}
//如果需要就激活拓扑刷新
activateTopologyRefreshIfNeeded();
protected void initializePartitions() {
this.partitions = loadPartitions();
}
protected Partitions loadPartitions() {
//获取拓扑刷新信息,
Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource();
String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource;
try {
//加载拓扑信息
Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());
public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) {
//获取超时时间,默认60秒
long commandTimeoutNs = getCommandTimeoutNs(seed);
Connections connections = null;
try {
//获取所有种子连接
connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS);
Requests requestedTopology = connections.requestTopology();
Requests requestedClients = connections.requestClients();
//获取节点拓扑视图
NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs);
if (discovery) {//是否查找额外节点
//获取集群节点
Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes();
//排除种子节点,得到需要发现节点
Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed));
//如果需要发现节点不为空
if (!discoveredNodes.isEmpty()) {
//需要发现节点连接
Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs,
TimeUnit.NANOSECONDS);
//合并连接
connections = connections.mergeWith(discoveredConnections);
//合并请求
requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology());
requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients());
//获取节点视图
nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs);
//返回uri对应分区信息
return nodeSpecificViews.toMap();
}
}
return nodeSpecificViews.toMap();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RedisCommandInterruptedException(e);
} finally {
if (connections != null) {
connections.close();
}
}
}
这样在创建connection的时候就已经知道集群中的所有有效节点.根据之前的文章可以知道对于集群命令的处理是在ClusterDistributionChannelWriter中处理的.其中有一些信息在初始化writer的时候就初始化了
class ClusterDistributionChannelWriter implements RedisChannelWriter {
//默认写入器
private final RedisChannelWriter defaultWriter;
//集群事件监听器
private final ClusterEventListener clusterEventListener;
private final int executionLimit;
//集群连接提供器
private ClusterConnectionProvider clusterConnectionProvider;
//异步集群连接提供器
private AsyncClusterConnectionProvider asyncClusterConnectionProvider;
//是否关闭
private boolean closed = false;
//分区信息
private volatile Partitions partitions;
写命令的处理如下,会根据key计算出slot,进而找到这个slot对应的node,直接访问这个node,这样可以有效减少访问cluster次数
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
LettuceAssert.notNull(command, "Command must not be null");
//如果连接已经关闭则抛出异常
if (closed) {
throw new RedisException("Connection is closed");
}
//如果是集群命令且命令没有处理完毕
if (command instanceof ClusterCommand && !command.isDone()) {
//类型转换, 转换为ClusterCommand
ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command;
if (clusterCommand.isMoved() || clusterCommand.isAsk()) {
HostAndPort target;
boolean asking;
//如果集群命令已经迁移,此时通过ClusterCommand中到重试操作进行到此
if (clusterCommand.isMoved()) {
//获取命令迁移目标节点
target = getMoveTarget(clusterCommand.getError());
//触发迁移事件
clusterEventListener.onMovedRedirection();
asking = false;
} else {//如果是ask
target = getAskTarget(clusterCommand.getError());
asking = true;
clusterEventListener.onAskRedirection();
}
command.getOutput().setError((String) null);
//连接迁移后的目标节点
CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = asyncClusterConnectionProvider
.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, target.getHostText(), target.getPort());
//成功建立连接,则向该节点发送命令
if (isSuccessfullyCompleted(connectFuture)) {
writeCommand(command, asking, connectFuture.join(), null);
} else {
connectFuture.whenComplete((connection, throwable) -> writeCommand(command, asking, connection, throwable));
}
return command;
}
}
//不是集群命令就是RedisCommand,第一个请求命令就是非ClusterCommand
//将当前命令包装为集群命令
ClusterCommand<K, V, T> commandToSend = getCommandToSend(command);
//获取命令参数
CommandArgs<K, V> args = command.getArgs();
//排除集群路由的cluster命令
if (args != null && !CommandType.CLIENT.equals(commandToSend.getType())) {
//获取第一个编码后的key
ByteBuffer encodedKey = args.getFirstEncodedKey();
//如果encodedKey不为null
if (encodedKey != null) {
//获取slot值
int hash = getSlot(encodedKey);
//根据命令类型获取命令意图 是读还是写
ClusterConnectionProvider.Intent intent = getIntent(command.getType());
//根据意图和slot获取连接
CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider)
.getConnectionAsync(intent, hash);
//如果成功获取连接
if (isSuccessfullyCompleted(connectFuture)) {
writeCommand(commandToSend, false, connectFuture.join(), null);
} else {//如果连接尚未处理完,或有异常,则添加完成处理器
connectFuture.whenComplete((connection, throwable) -> writeCommand(commandToSend, false, connection,
throwable));
}
return commandToSend;
}
}
writeCommand(commandToSend, defaultWriter);
return commandToSend;
}
但是如果计算出的slot因为集群扩展导致这个slot已经不在这个节点上lettuce是如何处理的呢?通过查阅ClusterCommand源码可以发现在complete方法中对于该问题进行了处理;如果响应是MOVED则会继续访问MOVED目标节点,这个重定向的此时可以指定的,默认为5次,通过上文的配置可以发现,在配置中只允许一次重定向
@Override
public void complete() {
//如果响应是MOVED或ASK
if (isMoved() || isAsk()) {
//如果最大重定向次数大于当前重定向次数则可以进行重定向
boolean retryCommand = maxRedirections > redirections;
//重定向次数自增
redirections++;
if (retryCommand) {
try {
//重定向
retry.write(this);
} catch (Exception e) {
completeExceptionally(e);
}
return;
}
}
super.complete();
completed = true;
}
如果是ask向重定向目标发送命令前需要同步发送asking
private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, boolean asking,
StatefulRedisConnection<K, V> connection, Throwable throwable) {
if (throwable != null) {
command.completeExceptionally(throwable);
return;
}
try {
//如果需要asking则发送asking
if (asking) {
connection.async().asking();
}
//发送命令
writeCommand(command, ((RedisChannelHandler<K, V>) connection).getChannelWriter());
} catch (Exception e) {
command.completeExceptionally(e);
}
}