提交 55502492 编写于 作者: N Nikita Koksharov

refactoring

上级 42313bab
......@@ -91,7 +91,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
List<String> failedMasters = new ArrayList<String>();
for (String address : cfg.getNodeAddresses()) {
RedisURI addr = new RedisURI(address);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
......@@ -276,7 +276,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
RPromise<Void> result = new RedissonPromise<>();
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null, configEndpointHostName);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName);
connectionFuture.onComplete((connection, ex1) -> {
if (ex1 != null) {
log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
......@@ -425,7 +425,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return;
}
RedisURI uri = iterator.next();
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, null, configEndpointHostName);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, configEndpointHostName);
connectionFuture.onComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
......
......@@ -36,10 +36,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandSyncService;
import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.TransportMode;
import org.redisson.config.*;
import org.redisson.misc.CountableListener;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
......@@ -149,7 +146,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected PublishSubscribeService subscribeService;
private final Map<Object, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
private final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
this(config, id);
......@@ -244,34 +241,19 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
protected final void disconnectNode(RedisClient client) {
RedisConnection conn = nodeConnections.remove(client);
if (conn != null) {
conn.closeAsync();
}
}
protected final RFuture<RedisConnection> connectToNode(BaseMasterSlaveServersConfig<?> cfg, RedisURI addr, RedisClient client, String sslHostname) {
final Object key;
if (client != null) {
key = client;
} else {
key = addr;
}
RedisConnection conn = nodeConnections.get(key);
protected final RFuture<RedisConnection> connectToNode(BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
RedisConnection conn = nodeConnections.get(addr);
if (conn != null) {
if (!conn.isActive()) {
nodeConnections.remove(key);
nodeConnections.remove(addr);
conn.closeAsync();
} else {
return RedissonPromise.newSucceededFuture(conn);
}
}
if (addr != null) {
client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
}
final RPromise<RedisConnection> result = new RedissonPromise<RedisConnection>();
RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
RPromise<RedisConnection> result = new RedissonPromise<>();
RFuture<RedisConnection> future = client.connectAsync();
future.onComplete((connection, e) -> {
if (e != null) {
......@@ -280,7 +262,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
if (connection.isActive()) {
nodeConnections.put(key, connection);
nodeConnections.put(addr, connection);
result.trySuccess(connection);
} else {
connection.closeAsync();
......
......@@ -71,7 +71,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
for (String address : cfg.getNodeAddresses()) {
RedisURI addr = new RedisURI(address);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
connectionFuture.awaitUninterruptibly();
RedisConnection connection = connectionFuture.getNow();
if (connection == null) {
......@@ -131,7 +131,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
AtomicInteger count = new AtomicInteger(cfg.getNodeAddresses().size());
for (String address : cfg.getNodeAddresses()) {
RedisURI addr = new RedisURI(address);
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
connectionFuture.onComplete((connection, exc) -> {
if (exc != null) {
log.error(exc.getMessage(), exc);
......
......@@ -296,7 +296,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
Set<RedisURI> newUris = future.getNow().stream()
.map(addr -> toURI(addr.getAddress().getHostAddress(), "" + addr.getPort()))
.map(addr -> getIpAddr(addr))
.collect(Collectors.toSet());
for (RedisURI uri : newUris) {
......@@ -343,7 +343,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
RedisClient client = iterator.next();
RFuture<RedisConnection> connectionFuture = connectToNode(null, null, client, null);
RedisURI addr = getIpAddr(client.getAddr());
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
connectionFuture.onComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
......@@ -484,7 +485,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}).collect(Collectors.toSet());
InetSocketAddress addr = connection.getRedisClient().getAddr();
RedisURI currentAddr = toURI(addr.getAddress().getHostAddress(), "" + addr.getPort());
RedisURI currentAddr = getIpAddr(addr);
newUris.add(currentAddr);
updateSentinels(newUris);
......@@ -505,7 +506,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
for (RedisURI uri : currentUris) {
RedisClient sentinel = SentinelConnectionManager.this.sentinels.remove(uri);
if (sentinel != null) {
disconnectNode(sentinel);
disconnectNode(uri);
sentinel.shutdownAsync();
log.warn("sentinel: {} is down", uri);
}
......@@ -540,7 +541,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
RedisURI ipAddr = toURI(client.getAddr().getAddress().getHostAddress(), "" + client.getAddr().getPort());
RedisURI ipAddr = getIpAddr(client.getAddr());
if (isHostname) {
RedisClient sentinel = sentinels.get(ipAddr);
if (sentinel != null) {
......@@ -574,6 +575,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private RedisURI getIpAddr(InetSocketAddress addr) {
return toURI(addr.getAddress().getHostAddress(), "" + addr.getPort());
}
private RFuture<Void> addSlave(RedisURI uri) {
RPromise<Void> result = new RedissonPromise<Void>();
// to avoid addition twice
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册