提交 9d7a90b7 编写于 作者: N Nikita Koksharov

Fixed - RBatch object in REDIS_WRITE_ATOMIC or REDIS_READ_ATOMIC mode can be...

Fixed - RBatch object in REDIS_WRITE_ATOMIC or REDIS_READ_ATOMIC mode can be corrupted by PING command. #3634
上级 12085f84
......@@ -50,6 +50,7 @@ public class RedisConnection implements RedisCommands {
private volatile RPromise<Void> fastReconnect;
private volatile boolean closed;
private volatile boolean queued;
volatile Channel channel;
private RPromise<?> connectionPromise;
......@@ -75,6 +76,14 @@ public class RedisConnection implements RedisCommands {
}
}
public boolean isQueued() {
return queued;
}
public void setQueued(boolean queued) {
this.queued = queued;
}
public void setConnectedListener(Runnable connectedListener) {
this.connectedListener = connectedListener;
}
......
......@@ -63,7 +63,7 @@ public class PingConnectionHandler extends ChannelInboundHandlerAdapter {
RedisConnection connection = RedisConnection.getFrom(ctx.channel());
CommandData<?, ?> commandData = connection.getCurrentCommand();
RFuture<String> future;
if (commandData == null || !commandData.isBlockingCommand()) {
if ((commandData == null || !commandData.isBlockingCommand()) && !connection.isQueued()) {
future = connection.async(StringCodec.INSTANCE, RedisCommands.PING);
} else {
future = null;
......
......@@ -510,6 +510,7 @@ public class RedisExecutor<V, R> {
}
RedisConnection connection = connectionFuture.getNow();
connection.setQueued(false);
connectionManager.getShutdownLatch().release();
if (readOnlyMode) {
connectionManager.releaseRead(source, connection);
......
......@@ -131,6 +131,8 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
@Override
protected void sendCommand(RPromise<R> attemptPromise, RedisConnection connection) {
connection.setQueued(true);
MasterSlaveEntry msEntry = getEntry(source);
ConnectionEntry connectionEntry = connections.get(msEntry);
......
......@@ -242,8 +242,7 @@ public class RedissonBatchTest extends BaseTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void testConnectionLeakAfterError() throws InterruptedException {
Config config = createConfig();
config.useSingleServer()
......@@ -557,6 +556,22 @@ public class RedissonBatchTest extends BaseTest {
redisson.shutdown();
}
@ParameterizedTest
@MethodSource("data")
public void testBatchPing(BatchOptions batchOptions) throws InterruptedException {
Config config = createConfig();
config.useSingleServer().setPingConnectionInterval(100);
RedissonClient redisson = Redisson.create(config);
RBatch batch = redisson.createBatch(batchOptions);
batch.getBucket("test").trySetAsync("1232");
Thread.sleep(500);
BatchResult<?> r = batch.execute();
assertThat((List<Object>)r.getResponses()).containsExactly(true);
redisson.shutdown();
}
@ParameterizedTest
@MethodSource("data")
public void testShutdownTimeout(BatchOptions batchOptions) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册