diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index f09f92ab26d6686e81e2dfc129170b44ea60a77a..115adf0c4810099136ebb57c930306675965066b 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -173,7 +173,6 @@ public class CommandBatchService extends CommandAsyncService { return executeAsync(BatchOptions.defaults()); } - @SuppressWarnings("MethodLength") public RFuture executeAsync(BatchOptions options) { if (executed.get()) { throw new IllegalStateException("Batch already executed!"); @@ -190,113 +189,7 @@ public class CommandBatchService extends CommandAsyncService { } if (isRedisBasedQueue()) { - int permits = 0; - for (Entry entry : commands.values()) { - permits += entry.getCommands().size(); - } - - RPromise resultPromise = new RedissonPromise(); - semaphore.acquire(new Runnable() { - @Override - public void run() { - for (Entry entry : commands.values()) { - for (BatchCommandData command : entry.getCommands()) { - if (command.getPromise().isDone() && !command.getPromise().isSuccess()) { - resultPromise.tryFailure(command.getPromise().cause()); - break; - } - } - } - - if (resultPromise.isDone()) { - return; - } - - RPromise>> mainPromise = new RedissonPromise>>(); - Map> result = new ConcurrentHashMap>(); - CountableListener>> listener = new CountableListener>>(mainPromise, result); - listener.setCounter(connections.size()); - for (Map.Entry entry : commands.entrySet()) { - RPromise> execPromise = new RedissonPromise>(); - async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, - new Object[] {}, execPromise, false); - execPromise.onComplete((r, ex) -> { - if (ex != null) { - mainPromise.tryFailure(ex); - return; - } - - BatchCommandData lastCommand = (BatchCommandData) entry.getValue().getCommands().peekLast(); - result.put(entry.getKey(), r); - if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) { - lastCommand.getPromise().onComplete((res, e) -> { - if (e != null) { - mainPromise.tryFailure(e); - return; - } - - execPromise.onComplete(listener); - }); - } else { - execPromise.onComplete(listener); - } - }); - } - - mainPromise.onComplete((res, ex) -> { - executed.set(true); - if (ex != null) { - resultPromise.tryFailure(ex); - return; - } - - try { - for (java.util.Map.Entry> entry : res.entrySet()) { - Entry commandEntry = commands.get(entry.getKey()); - Iterator resultIter = entry.getValue().iterator(); - for (BatchCommandData data : commandEntry.getCommands()) { - if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) { - break; - } - - RPromise promise = (RPromise) data.getPromise(); - if (resultIter.hasNext()) { - promise.trySuccess(resultIter.next()); - } else { - // fix for https://github.com/redisson/redisson/issues/2212 - promise.trySuccess(null); - } - } - } - - List entries = new ArrayList(); - for (Entry e : commands.values()) { - entries.addAll(e.getCommands()); - } - Collections.sort(entries); - List responses = new ArrayList(entries.size()); - int syncedSlaves = 0; - for (BatchCommandData commandEntry : entries) { - if (isWaitCommand(commandEntry)) { - syncedSlaves += (Integer) commandEntry.getPromise().getNow(); - } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) - && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) { - Object entryResult = commandEntry.getPromise().getNow(); - entryResult = RedisExecutor.tryHandleReference(objectBuilder, entryResult); - responses.add(entryResult); - } - } - BatchResult r = new BatchResult(responses, syncedSlaves); - resultPromise.trySuccess((R) r); - } catch (Exception e) { - resultPromise.tryFailure(e); - } - - commands = null; - }); - } - }, permits); - return resultPromise; + return executeRedisBasedQueue(); } if (this.options.getExecutionMode() != ExecutionMode.IN_MEMORY) { @@ -378,7 +271,7 @@ public class CommandBatchService extends CommandAsyncService { AtomicInteger slots = new AtomicInteger(commands.size()); - for (java.util.Map.Entry, List> entry : nestedServices.entrySet()) { + for (Map.Entry, List> entry : nestedServices.entrySet()) { slots.incrementAndGet(); for (CommandBatchService service : entry.getValue()) { service.executeAsync(); @@ -389,20 +282,127 @@ public class CommandBatchService extends CommandAsyncService { }); } - for (java.util.Map.Entry e : commands.entrySet()) { - execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, this.options); + for (Map.Entry e : commands.entrySet()) { + RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise, + connectionManager, this.options, e.getValue(), slots); + executor.execute(); } return resultPromise; } - protected boolean isRedisBasedQueue() { - return options != null && (this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC || this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC); + private RFuture executeRedisBasedQueue() { + int permits = 0; + for (Entry entry : commands.values()) { + permits += entry.getCommands().size(); + } + + RPromise resultPromise = new RedissonPromise(); + semaphore.acquire(new Runnable() { + @Override + public void run() { + for (Entry entry : commands.values()) { + for (BatchCommandData command : entry.getCommands()) { + if (command.getPromise().isDone() && !command.getPromise().isSuccess()) { + resultPromise.tryFailure(command.getPromise().cause()); + break; + } + } + } + + if (resultPromise.isDone()) { + return; + } + + RPromise>> mainPromise = new RedissonPromise>>(); + Map> result = new ConcurrentHashMap>(); + CountableListener>> listener = new CountableListener>>(mainPromise, result); + listener.setCounter(connections.size()); + for (Map.Entry entry : commands.entrySet()) { + RPromise> execPromise = new RedissonPromise>(); + async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, + new Object[] {}, execPromise, false); + execPromise.onComplete((r, ex) -> { + if (ex != null) { + mainPromise.tryFailure(ex); + return; + } + + BatchCommandData lastCommand = (BatchCommandData) entry.getValue().getCommands().peekLast(); + result.put(entry.getKey(), r); + if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) { + lastCommand.getPromise().onComplete((res, e) -> { + if (e != null) { + mainPromise.tryFailure(e); + return; + } + + execPromise.onComplete(listener); + }); + } else { + execPromise.onComplete(listener); + } + }); + } + + mainPromise.onComplete((res, ex) -> { + executed.set(true); + if (ex != null) { + resultPromise.tryFailure(ex); + return; + } + + try { + for (java.util.Map.Entry> entry : res.entrySet()) { + Entry commandEntry = commands.get(entry.getKey()); + Iterator resultIter = entry.getValue().iterator(); + for (BatchCommandData data : commandEntry.getCommands()) { + if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) { + break; + } + + RPromise promise = (RPromise) data.getPromise(); + if (resultIter.hasNext()) { + promise.trySuccess(resultIter.next()); + } else { + // fix for https://github.com/redisson/redisson/issues/2212 + promise.trySuccess(null); + } + } + } + + List entries = new ArrayList(); + for (Entry e : commands.values()) { + entries.addAll(e.getCommands()); + } + Collections.sort(entries); + List responses = new ArrayList(entries.size()); + int syncedSlaves = 0; + for (BatchCommandData commandEntry : entries) { + if (isWaitCommand(commandEntry)) { + syncedSlaves += (Integer) commandEntry.getPromise().getNow(); + } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) + && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) { + Object entryResult = commandEntry.getPromise().getNow(); + entryResult = RedisExecutor.tryHandleReference(objectBuilder, entryResult); + responses.add(entryResult); + } + } + BatchResult r = new BatchResult(responses, syncedSlaves); + resultPromise.trySuccess((R) r); + } catch (Exception e) { + resultPromise.tryFailure(e); + } + + commands = null; + }); + } + }, permits); + return resultPromise; } - private void execute(Entry entry, NodeSource source, RPromise mainPromise, AtomicInteger slots, - BatchOptions options) { - RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(source, mainPromise, connectionManager, options, entry, slots); - executor.execute(); + protected boolean isRedisBasedQueue() { + return options != null && (options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC + || options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC); } protected boolean isWaitCommand(CommandData c) { diff --git a/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java index 3ff196c9c3386aa60251ab1ff6ebb959e93c693a..c21b59706a64b0cdd6cc704cf1307cc87f4c655e 100644 --- a/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java @@ -17,12 +17,11 @@ package org.redisson.command; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.BatchOptions; -import org.redisson.api.RFuture; import org.redisson.api.BatchOptions.ExecutionMode; +import org.redisson.api.RFuture; import org.redisson.client.RedisConnection; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.CommandData; diff --git a/redisson/src/main/java/org/redisson/command/RedisExecutor.java b/redisson/src/main/java/org/redisson/command/RedisExecutor.java index 6482fc8da3dd390031dde165602c328a55669b6e..353bc0046ac330f4f0951aa85716e01aed64876d 100644 --- a/redisson/src/main/java/org/redisson/command/RedisExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisExecutor.java @@ -73,7 +73,7 @@ import io.netty.util.concurrent.FutureListener; * @param type of value * @param type of returned value */ -@SuppressWarnings({"NestedIfDepth", "MethodLength"}) +@SuppressWarnings({"NestedIfDepth"}) public class RedisExecutor { static final Logger log = LoggerFactory.getLogger(RedisExecutor.class); @@ -152,6 +152,48 @@ public class RedisExecutor { }); } + scheduleRetryTimeout(connectionFuture, attemptPromise); + + connectionFuture.onComplete((connection, e) -> { + if (connectionFuture.isCancelled()) { + connectionManager.getShutdownLatch().release(); + return; + } + + if (!connectionFuture.isSuccess()) { + connectionManager.getShutdownLatch().release(); + exception = convertException(connectionFuture); + return; + } + + if (attemptPromise.isDone() || mainPromise.isDone()) { + releaseConnection(attemptPromise, connectionFuture); + return; + } + + sendCommand(attemptPromise, connection); + + writeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + checkWriteFuture(writeFuture, attemptPromise, connection); + } + }); + + releaseConnection(attemptPromise, connectionFuture); + }); + + attemptPromise.onComplete((r, e) -> { + checkAttemptPromise(attemptPromise, connectionFuture); + }); + } + + private void scheduleRetryTimeout(RFuture connectionFuture, RPromise attemptPromise) { + if (retryInterval == 0 || attempts == 0) { + this.timeout = MasterSlaveConnectionManager.DUMMY_TIMEOUT; + return; + } + TimerTask retryTimerTask = new TimerTask() { @Override @@ -233,44 +275,7 @@ public class RedisExecutor { }; - if (retryInterval > 0 && attempts > 0) { - this.timeout = connectionManager.newTimeout(retryTimerTask, retryInterval, TimeUnit.MILLISECONDS); - } else { - this.timeout = MasterSlaveConnectionManager.DUMMY_TIMEOUT; - } - - connectionFuture.onComplete((connection, e) -> { - if (connectionFuture.isCancelled()) { - connectionManager.getShutdownLatch().release(); - return; - } - - if (!connectionFuture.isSuccess()) { - connectionManager.getShutdownLatch().release(); - exception = convertException(connectionFuture); - return; - } - - if (attemptPromise.isDone() || mainPromise.isDone()) { - releaseConnection(attemptPromise, connectionFuture); - return; - } - - sendCommand(attemptPromise, connection); - - writeFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - checkWriteFuture(writeFuture, attemptPromise, connection); - } - }); - - releaseConnection(attemptPromise, connectionFuture); - }); - - attemptPromise.onComplete((r, e) -> { - checkAttemptPromise(attemptPromise, connectionFuture); - }); + timeout = connectionManager.newTimeout(retryTimerTask, retryInterval, TimeUnit.MILLISECONDS); } protected void free() {