提交 62dbdedf 编写于 作者: N Nikita

Fixed - failedSlaveCheckInterval setting is not applied under some conditions

上级 b669a99b
...@@ -155,7 +155,9 @@ public class LoadBalancerManager { ...@@ -155,7 +155,9 @@ public class LoadBalancerManager {
} }
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) { if (connectionEntry == null || (connectionEntry.isFailed()
&& connectionEntry.getFreezeReason() == FreezeReason.RECONNECT
&& freezeReason == FreezeReason.RECONNECT)) {
return null; return null;
} }
......
...@@ -197,8 +197,8 @@ abstract class ConnectionPool<T extends RedisConnection> { ...@@ -197,8 +197,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
public RFuture<T> get(RedisCommand<?> command, ClientConnectionsEntry entry) { public RFuture<T> get(RedisCommand<?> command, ClientConnectionsEntry entry) {
return acquireConnection(command, entry); return acquireConnection(command, entry);
} }
public static abstract class AcquireCallback<T> implements Runnable, FutureListener<T> { public static abstract class AcquireCallback<T> implements Runnable, FutureListener<T> {
...@@ -207,28 +207,29 @@ abstract class ConnectionPool<T extends RedisConnection> { ...@@ -207,28 +207,29 @@ abstract class ConnectionPool<T extends RedisConnection> {
protected final RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) { protected final RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) {
final RPromise<T> result = new RedissonPromise<T>(); final RPromise<T> result = new RedissonPromise<T>();
AcquireCallback<T> callback = new AcquireCallback<T>() { AcquireCallback<T> callback = new AcquireCallback<T>() {
@Override @Override
public void run() { public void run() {
result.removeListener(this); result.removeListener(this);
connectTo(entry, result); connectTo(entry, result);
} }
@Override
public void operationComplete(Future<T> future) throws Exception {
entry.removeConnection(this);
}
};
@Override result.addListener(callback);
public void operationComplete(Future<T> future) throws Exception { acquireConnection(entry, callback);
entry.removeConnection(this);
}
};
result.addListener(callback);
acquireConnection(entry, callback);
return result; return result;
} }
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE && entry.isFailed()) {
return !entry.isFailed(); checkForReconnect(entry, null);
return false;
} }
return true; return true;
} }
...@@ -248,6 +249,11 @@ abstract class ConnectionPool<T extends RedisConnection> { ...@@ -248,6 +249,11 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
T conn = poll(entry); T conn = poll(entry);
if (conn != null) { if (conn != null) {
if (!conn.isActive()) {
promiseFailure(entry, promise, conn);
return;
}
connectedSuccessful(entry, promise, conn); connectedSuccessful(entry, promise, conn);
return; return;
} }
...@@ -321,10 +327,10 @@ abstract class ConnectionPool<T extends RedisConnection> { ...@@ -321,10 +327,10 @@ abstract class ConnectionPool<T extends RedisConnection> {
private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) { private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) {
if (masterSlaveEntry.slaveDown(entry, FreezeReason.RECONNECT)) { if (masterSlaveEntry.slaveDown(entry, FreezeReason.RECONNECT)) {
log.error("slave " + entry.getClient().getAddr() + " has been disconnected after " log.error("slave " + entry.getClient().getAddr() + " has been disconnected after "
+ config.getFailedSlaveCheckInterval() + " time interval since moment of first failed connection", cause); + config.getFailedSlaveCheckInterval() + " ms interval since moment of the first failed connection", cause);
scheduleCheck(entry); scheduleCheck(entry);
}
} }
}
private void scheduleCheck(final ClientConnectionsEntry entry) { private void scheduleCheck(final ClientConnectionsEntry entry) {
...@@ -380,9 +386,9 @@ abstract class ConnectionPool<T extends RedisConnection> { ...@@ -380,9 +386,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
promise.addListener(new FutureListener<Void>() { promise.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT); masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT);
log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
} }
}); });
initConnections(entry, promise, false); initConnections(entry, promise, false);
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册