提交 0041487e 编写于 作者: N Nikita Koksharov

refactoring

上级 8b8cbab9
......@@ -15,37 +15,21 @@
*/
package org.redisson.command;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisResponseTimeoutException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.RedisTryAgainException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.*;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.*;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.connection.ConnectionManager;
......@@ -59,12 +43,10 @@ import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
*
......@@ -85,10 +67,10 @@ public class RedisExecutor<V, R> {
final boolean ignoreRedirect;
final RedissonObjectBuilder objectBuilder;
final ConnectionManager connectionManager;
NodeSource source;
NodeSource source;
Codec codec;
volatile int attempt;
volatile int attempt;
volatile Timeout timeout;
volatile BiConsumer<R, Throwable> mainPromiseListener;
volatile ChannelFuture writeFuture;
......@@ -204,9 +186,9 @@ public class RedisExecutor<V, R> {
if (connectionFuture.cancel(false)) {
if (exception == null) {
exception = new RedisTimeoutException("Unable to get connection! Try to increase 'nettyThreads' and/or connection pool size settings"
exception = new RedisTimeoutException("Unable to acquire connection! Increase connection pool size and/or retryInterval"
+ "Node source: " + source
+ ", command: " + LogHelper.toString(command, params)
+ ", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts");
}
} else {
......@@ -215,10 +197,10 @@ public class RedisExecutor<V, R> {
if (attempt == attempts) {
if (writeFuture != null && writeFuture.cancel(false)) {
if (exception == null) {
exception = new RedisTimeoutException("Unable to send command! Try to increase 'nettyThreads' and/or connection pool size settings "
+ "Node source: " + source + ", connection: " + connectionFuture.getNow()
+ ", command: " + LogHelper.toString(command, params)
+ " after " + attempts + " retry attempts");
exception = new RedisTimeoutException("Command still hasn't been written into connection! Increase nettyThreads and/or retryInterval"
+ "Node source: " + source + ", connection: " + connectionFuture.getNow()
+ ", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts");
}
attemptPromise.tryFailure(exception);
}
......@@ -244,11 +226,7 @@ public class RedisExecutor<V, R> {
}
if (attempt == attempts) {
if (exception == null) {
exception = new RedisTimeoutException("Unable to send command! Try to increase 'nettyThreads' and/or connection pool size settings. Node source: " + source
+ ", command: " + LogHelper.toString(command, params)
+ " after " + attempts + " retry attempts");
}
// filled out in connectionFuture or writeFuture handler
attemptPromise.tryFailure(exception);
return;
}
......@@ -289,13 +267,11 @@ public class RedisExecutor<V, R> {
if (!future.isSuccess()) {
exception = new WriteRedisConnectionException(
"Unable to send command! Node source: " + source + ", connection: " + connection +
"Unable to write command into connection! Node source: " + source + ", connection: " + connection +
", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts", future.cause());
if (attempt == attempts) {
if (!attemptPromise.tryFailure(exception)) {
log.error(exception.getMessage());
}
attemptPromise.tryFailure(exception);
}
return;
}
......@@ -358,7 +334,7 @@ public class RedisExecutor<V, R> {
attemptPromise.tryFailure(
new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured"
+ " after " + attempts + " retry attempts. Command: "
+ " after " + attempt + " retry attempts. Command: "
+ LogHelper.toString(command, params) + ", channel: " + connection.getChannel()));
}
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册