提交 ab71d42c 编写于 作者: N Nikita Koksharov

Feature - resolve hostnames used in Redis Cluster topology #3771

上级 f707f10a
......@@ -16,7 +16,6 @@
package org.redisson.cluster;
import io.netty.resolver.AddressResolver;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
......@@ -32,8 +31,12 @@ import org.redisson.config.ClusterServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.*;
import org.redisson.connection.CRC16;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry;
import org.redisson.misc.AsyncCountDownLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
......@@ -94,7 +97,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
if (cfg.getNodeAddresses().size() == 1 && NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null) {
if (cfg.getNodeAddresses().size() == 1 && !addr.isIP()) {
configEndpointHostName = addr.getHost();
}
......@@ -113,7 +116,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
lastClusterNode = addr;
Collection<ClusterPartition> partitions = parsePartitions(nodes);
RFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
Collection<ClusterPartition> partitions = partitionsFuture.syncUninterruptibly().getNow();
List<RFuture<Void>> masterFutures = new ArrayList<>();
for (ClusterPartition partition : partitions) {
if (partition.isMasterFail()) {
......@@ -466,14 +470,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
log.debug("cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
}
Collection<ClusterPartition> newPartitions = parsePartitions(nodes);
RFuture<Void> masterFuture = checkMasterNodesChange(cfg, newPartitions);
checkSlaveNodesChange(newPartitions);
masterFuture.onComplete((res, ex) -> {
checkSlotsMigration(newPartitions);
checkSlotsChange(newPartitions);
getShutdownLatch().release();
scheduleClusterChangeCheck(cfg);
RFuture<Collection<ClusterPartition>> newPartitionsFuture = parsePartitions(nodes);
newPartitionsFuture.onComplete((newPartitions, ex) -> {
RFuture<Void> masterFuture = checkMasterNodesChange(cfg, newPartitions);
checkSlaveNodesChange(newPartitions);
masterFuture.onComplete((res, exc) -> {
checkSlotsMigration(newPartitions);
checkSlotsChange(newPartitions);
getShutdownLatch().release();
scheduleClusterChangeCheck(cfg);
});
});
});
}
......@@ -764,12 +770,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return natMapper.map(address);
}
private Collection<ClusterPartition> parsePartitions(List<ClusterNodeInfo> nodes) {
private RFuture<Collection<ClusterPartition>> parsePartitions(List<ClusterNodeInfo> nodes) {
Map<String, ClusterPartition> partitions = new HashMap<>();
AsyncCountDownLatch latch = new AsyncCountDownLatch();
int counter = 0;
for (ClusterNodeInfo clusterNodeInfo : nodes) {
if (clusterNodeInfo.containsFlag(Flag.NOADDR)
|| clusterNodeInfo.containsFlag(Flag.HANDSHAKE)
|| clusterNodeInfo.getAddress() == null) {
|| clusterNodeInfo.getAddress() == null
|| (clusterNodeInfo.getSlotRanges().isEmpty() && clusterNodeInfo.containsFlag(Flag.MASTER))) {
// skip it
continue;
}
......@@ -786,32 +795,45 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
continue;
}
RedisURI address = applyNatMap(clusterNodeInfo.getAddress());
if (clusterNodeInfo.containsFlag(Flag.SLAVE)) {
ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId));
ClusterPartition slavePartition = partitions.computeIfAbsent(clusterNodeInfo.getNodeId(),
k -> new ClusterPartition(clusterNodeInfo.getNodeId()));
slavePartition.setType(Type.SLAVE);
slavePartition.setParent(masterPartition);
masterPartition.addSlaveAddress(address);
if (clusterNodeInfo.containsFlag(Flag.FAIL)) {
masterPartition.addFailedSlaveAddress(address);
RFuture<RedisURI> ipFuture = resolveIP(clusterNodeInfo.getAddress());
counter++;
ipFuture.onComplete((addr, e) -> {
if (e != null) {
latch.countDown();
return;
}
} else if (clusterNodeInfo.containsFlag(Flag.MASTER)) {
ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId));
masterPartition.addSlotRanges(clusterNodeInfo.getSlotRanges());
masterPartition.setMasterAddress(address);
masterPartition.setType(Type.MASTER);
if (clusterNodeInfo.containsFlag(Flag.FAIL)) {
masterPartition.setMasterFail(true);
RedisURI address = applyNatMap(addr);
if (clusterNodeInfo.containsFlag(Flag.SLAVE)) {
ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId));
ClusterPartition slavePartition = partitions.computeIfAbsent(clusterNodeInfo.getNodeId(),
k -> new ClusterPartition(clusterNodeInfo.getNodeId()));
slavePartition.setType(Type.SLAVE);
slavePartition.setParent(masterPartition);
masterPartition.addSlaveAddress(address);
if (clusterNodeInfo.containsFlag(Flag.FAIL)) {
masterPartition.addFailedSlaveAddress(address);
}
} else if (clusterNodeInfo.containsFlag(Flag.MASTER)) {
ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId));
masterPartition.addSlotRanges(clusterNodeInfo.getSlotRanges());
masterPartition.setMasterAddress(address);
masterPartition.setType(Type.MASTER);
if (clusterNodeInfo.containsFlag(Flag.FAIL)) {
masterPartition.setMasterFail(true);
}
}
}
latch.countDown();
});
}
addCascadeSlaves(partitions);
return partitions.values();
RPromise<Collection<ClusterPartition>> result = new RedissonPromise<>();
latch.latch(() -> {
addCascadeSlaves(partitions);
result.trySuccess(partitions.values());
}, counter);
return result;
}
private void addCascadeSlaves(Map<String, ClusterPartition> partitions) {
......
......@@ -26,16 +26,14 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.DefaultAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProviders;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.*;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.*;
import io.netty.util.internal.PlatformDependent;
import org.redisson.ElementsSubscribeService;
import org.redisson.Version;
......@@ -691,4 +689,28 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public RedisURI applyNatMap(RedisURI address) {
return address;
}
protected RFuture<RedisURI> resolveIP(RedisURI address) {
if (address.isIP()) {
return RedissonPromise.newSucceededFuture(address);
}
RPromise<RedisURI> result = new RedissonPromise<>();
AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(getGroup().next());
InetSocketAddress addr = InetSocketAddress.createUnresolved(address.getHost(), address.getPort());
Future<InetSocketAddress> future = resolver.resolve(addr);
future.addListener((FutureListener<InetSocketAddress>) f -> {
if (!f.isSuccess()) {
log.error("Unable to resolve " + address, f.cause());
result.tryFailure(f.cause());
return;
}
InetSocketAddress s = f.getNow();
RedisURI uri = new RedisURI(address.getScheme() + "://" + s.getAddress().getHostAddress() + ":" + address.getPort());
result.trySuccess(uri);
});
return result;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册