提交 0bb9115d 编写于 作者: N Nikita

Enchantments to Redis commands processing core.

上级 86229ed1
......@@ -539,7 +539,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
if (details.getWriteFuture().cancel(false)) {
if (details.getException() == null) {
details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")));
details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams()) + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
}
details.getAttemptPromise().tryFailure(details.getException());
}
......@@ -661,7 +661,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
ChannelFuture future = details.getWriteFuture();
if (details.getAttemptPromise().isDone()) {
if (future.isCancelled() || details.getAttemptPromise().isDone()) {
return;
}
......@@ -671,7 +671,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
"Can't write command: " + details.getCommand() + ", params: " + LogHelper.toString(details.getParams()) + " to channel: " + future.channel(), future.cause()));
if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
details.getAttemptPromise().tryFailure(details.getException());
free(details);
}
return;
}
......
......@@ -418,11 +418,14 @@ public class CommandBatchService extends CommandAsyncService {
private void checkWriteFuture(Entry entry, final RPromise<Void> attemptPromise, AsyncDetails details,
final RedisConnection connection, ChannelFuture future, boolean noResult, long responseTimeout, int attempts) {
if (future.isCancelled() || details.getAttemptPromise().isDone()) {
return;
}
if (!future.isSuccess()) {
details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause()));
if (details.getAttempt() == attempts) {
details.getAttemptPromise().tryFailure(details.getException());
free(entry);
}
return;
}
......@@ -448,7 +451,7 @@ public class CommandBatchService extends CommandAsyncService {
private void checkConnectionFuture(final Entry entry, final NodeSource source,
final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details,
RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, final int attempts) {
if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) {
if (connFuture.isCancelled()) {
return;
}
......@@ -458,6 +461,11 @@ public class CommandBatchService extends CommandAsyncService {
return;
}
if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {
releaseConnection(source, connFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
return;
}
final RedisConnection connection = connFuture.getNow();
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size() + 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册