提交 653bc674 编写于 作者: N Nikita Koksharov

Fixed - MOVED redirection loop if "rediss://" scheme used in configuration #3162

上级 559c2866
......@@ -15,6 +15,8 @@
*/
package org.redisson.client;
import org.redisson.misc.RedisURI;
/**
*
* @author Nikita Koksharov
......@@ -24,7 +26,7 @@ public class RedisAskException extends RedisRedirectException {
private static final long serialVersionUID = -6969734163155547631L;
public RedisAskException(int slot, String url) {
public RedisAskException(int slot, RedisURI url) {
super(slot, url);
}
......
......@@ -15,6 +15,8 @@
*/
package org.redisson.client;
import org.redisson.misc.RedisURI;
/**
*
* @author Nikita Koksharov
......@@ -24,7 +26,7 @@ public class RedisMovedException extends RedisRedirectException {
private static final long serialVersionUID = -6969734163155547631L;
public RedisMovedException(int slot, String url) {
public RedisMovedException(int slot, RedisURI url) {
super(slot, url);
}
......
......@@ -29,9 +29,9 @@ public class RedisRedirectException extends RedisException {
private final int slot;
private final RedisURI url;
public RedisRedirectException(int slot, String url) {
public RedisRedirectException(int slot, RedisURI url) {
this.slot = slot;
this.url = new RedisURI("redis://" + url);
this.url = url;
}
public int getSlot() {
......
......@@ -44,6 +44,7 @@ import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -67,10 +68,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
private static final char LF = '\n';
private static final char ZERO = '0';
final ExecutorService executor;
final String scheme;
public CommandDecoder(ExecutorService executor) {
this.executor = executor;
public CommandDecoder(String scheme) {
this.scheme = scheme;
}
@Override
......@@ -314,12 +315,12 @@ public class CommandDecoder extends ReplayingDecoder<State> {
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]);
String addr = errorParts[2];
data.tryFailure(new RedisMovedException(slot, addr));
data.tryFailure(new RedisMovedException(slot, new RedisURI(scheme + "://" + addr)));
} else if (error.startsWith("ASK")) {
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]);
String addr = errorParts[2];
data.tryFailure(new RedisAskException(slot, addr));
data.tryFailure(new RedisAskException(slot, new RedisURI(scheme + "://" + addr)));
} else if (error.startsWith("TRYAGAIN")) {
data.tryFailure(new RedisTryAgainException(error
+ ". channel: " + channel + " data: " + data));
......
......@@ -18,6 +18,7 @@ package org.redisson.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
......@@ -51,11 +52,11 @@ public class CommandPubSubDecoder extends CommandDecoder {
private final Map<ChannelName, PubSubEntry> entries = new HashMap<>();
private final Map<PubSubKey, CommandData<Object, Object>> commands = new ConcurrentHashMap<>();
private final boolean keepOrder;
public CommandPubSubDecoder(ExecutorService executor, boolean keepOrder) {
super(executor);
this.keepOrder = keepOrder;
private final RedisClientConfig config;
public CommandPubSubDecoder(RedisClientConfig config) {
super(config.getAddress().getScheme());
this.config = config;
}
public void addPubSubCommand(ChannelName channel, CommandData<Object, Object> data) {
......@@ -96,7 +97,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,
Object result) throws IOException {
try {
if (executor.isShutdown()) {
if (config.getExecutor().isShutdown()) {
return;
}
} catch (IllegalStateException e) {
......@@ -123,14 +124,14 @@ public class CommandPubSubDecoder extends CommandDecoder {
channelName = ((PubSubPatternMessage) result).getPattern();
}
PubSubEntry entry = entries.remove(channelName);
if (keepOrder) {
if (config.isKeepAlive()) {
enqueueMessage(result, pubSubConnection, entry);
}
}
}
if (keepOrder) {
if (config.isKeepAlive()) {
if (result instanceof PubSubPatternMessage) {
channelName = ((PubSubPatternMessage) result).getPattern();
}
......@@ -139,7 +140,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
enqueueMessage(result, pubSubConnection, entry);
}
} else {
executor.execute(new Runnable() {
config.getExecutor().execute(new Runnable() {
@Override
public void run() {
if (result instanceof PubSubStatusMessage) {
......@@ -168,7 +169,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
return;
}
executor.execute(() -> {
config.getExecutor().execute(() -> {
try {
while (true) {
Message result = entry.getQueue().poll();
......
......@@ -95,9 +95,9 @@ public class RedisChannelInitializer extends ChannelInitializer<Channel> {
}
if (type == Type.PLAIN) {
ch.pipeline().addLast(new CommandDecoder(config.getExecutor()));
ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme()));
} else {
ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder()));
ch.pipeline().addLast(new CommandPubSubDecoder(config));
}
ch.pipeline().addLast(new ErrorsLoggingHandler());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册