提交 ecd3a2cb 编写于 作者: N Nikita Koksharov

refactoring

上级 a7cb1a48
......@@ -173,7 +173,6 @@ public class CommandBatchService extends CommandAsyncService {
return executeAsync(BatchOptions.defaults());
}
@SuppressWarnings("MethodLength")
public <R> RFuture<R> executeAsync(BatchOptions options) {
if (executed.get()) {
throw new IllegalStateException("Batch already executed!");
......@@ -190,113 +189,7 @@ public class CommandBatchService extends CommandAsyncService {
}
if (isRedisBasedQueue()) {
int permits = 0;
for (Entry entry : commands.values()) {
permits += entry.getCommands().size();
}
RPromise<R> resultPromise = new RedissonPromise<R>();
semaphore.acquire(new Runnable() {
@Override
public void run() {
for (Entry entry : commands.values()) {
for (BatchCommandData<?, ?> command : entry.getCommands()) {
if (command.getPromise().isDone() && !command.getPromise().isSuccess()) {
resultPromise.tryFailure(command.getPromise().cause());
break;
}
}
}
if (resultPromise.isDone()) {
return;
}
RPromise<Map<MasterSlaveEntry, List<Object>>> mainPromise = new RedissonPromise<Map<MasterSlaveEntry, List<Object>>>();
Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<MasterSlaveEntry, List<Object>>();
CountableListener<Map<MasterSlaveEntry, List<Object>>> listener = new CountableListener<Map<MasterSlaveEntry, List<Object>>>(mainPromise, result);
listener.setCounter(connections.size());
for (Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) {
RPromise<List<Object>> execPromise = new RedissonPromise<List<Object>>();
async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC,
new Object[] {}, execPromise, false);
execPromise.onComplete((r, ex) -> {
if (ex != null) {
mainPromise.tryFailure(ex);
return;
}
BatchCommandData<?, Integer> lastCommand = (BatchCommandData<?, Integer>) entry.getValue().getCommands().peekLast();
result.put(entry.getKey(), r);
if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) {
lastCommand.getPromise().onComplete((res, e) -> {
if (e != null) {
mainPromise.tryFailure(e);
return;
}
execPromise.onComplete(listener);
});
} else {
execPromise.onComplete(listener);
}
});
}
mainPromise.onComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
resultPromise.tryFailure(ex);
return;
}
try {
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : res.entrySet()) {
Entry commandEntry = commands.get(entry.getKey());
Iterator<Object> resultIter = entry.getValue().iterator();
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {
if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
break;
}
RPromise<Object> promise = (RPromise<Object>) data.getPromise();
if (resultIter.hasNext()) {
promise.trySuccess(resultIter.next());
} else {
// fix for https://github.com/redisson/redisson/issues/2212
promise.trySuccess(null);
}
}
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (isWaitCommand(commandEntry)) {
syncedSlaves += (Integer) commandEntry.getPromise().getNow();
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = RedisExecutor.tryHandleReference(objectBuilder, entryResult);
responses.add(entryResult);
}
}
BatchResult<Object> r = new BatchResult<Object>(responses, syncedSlaves);
resultPromise.trySuccess((R) r);
} catch (Exception e) {
resultPromise.tryFailure(e);
}
commands = null;
});
}
}, permits);
return resultPromise;
return executeRedisBasedQueue();
}
if (this.options.getExecutionMode() != ExecutionMode.IN_MEMORY) {
......@@ -378,7 +271,7 @@ public class CommandBatchService extends CommandAsyncService {
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<RFuture<?>, List<CommandBatchService>> entry : nestedServices.entrySet()) {
for (Map.Entry<RFuture<?>, List<CommandBatchService>> entry : nestedServices.entrySet()) {
slots.incrementAndGet();
for (CommandBatchService service : entry.getValue()) {
service.executeAsync();
......@@ -389,20 +282,127 @@ public class CommandBatchService extends CommandAsyncService {
});
}
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, this.options);
for (Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise,
connectionManager, this.options, e.getValue(), slots);
executor.execute();
}
return resultPromise;
}
protected boolean isRedisBasedQueue() {
return options != null && (this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC || this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC);
private <R> RFuture<R> executeRedisBasedQueue() {
int permits = 0;
for (Entry entry : commands.values()) {
permits += entry.getCommands().size();
}
RPromise<R> resultPromise = new RedissonPromise<R>();
semaphore.acquire(new Runnable() {
@Override
public void run() {
for (Entry entry : commands.values()) {
for (BatchCommandData<?, ?> command : entry.getCommands()) {
if (command.getPromise().isDone() && !command.getPromise().isSuccess()) {
resultPromise.tryFailure(command.getPromise().cause());
break;
}
}
}
if (resultPromise.isDone()) {
return;
}
RPromise<Map<MasterSlaveEntry, List<Object>>> mainPromise = new RedissonPromise<Map<MasterSlaveEntry, List<Object>>>();
Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<MasterSlaveEntry, List<Object>>();
CountableListener<Map<MasterSlaveEntry, List<Object>>> listener = new CountableListener<Map<MasterSlaveEntry, List<Object>>>(mainPromise, result);
listener.setCounter(connections.size());
for (Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) {
RPromise<List<Object>> execPromise = new RedissonPromise<List<Object>>();
async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC,
new Object[] {}, execPromise, false);
execPromise.onComplete((r, ex) -> {
if (ex != null) {
mainPromise.tryFailure(ex);
return;
}
BatchCommandData<?, Integer> lastCommand = (BatchCommandData<?, Integer>) entry.getValue().getCommands().peekLast();
result.put(entry.getKey(), r);
if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) {
lastCommand.getPromise().onComplete((res, e) -> {
if (e != null) {
mainPromise.tryFailure(e);
return;
}
execPromise.onComplete(listener);
});
} else {
execPromise.onComplete(listener);
}
});
}
mainPromise.onComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
resultPromise.tryFailure(ex);
return;
}
try {
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : res.entrySet()) {
Entry commandEntry = commands.get(entry.getKey());
Iterator<Object> resultIter = entry.getValue().iterator();
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {
if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
break;
}
RPromise<Object> promise = (RPromise<Object>) data.getPromise();
if (resultIter.hasNext()) {
promise.trySuccess(resultIter.next());
} else {
// fix for https://github.com/redisson/redisson/issues/2212
promise.trySuccess(null);
}
}
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (isWaitCommand(commandEntry)) {
syncedSlaves += (Integer) commandEntry.getPromise().getNow();
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = RedisExecutor.tryHandleReference(objectBuilder, entryResult);
responses.add(entryResult);
}
}
BatchResult<Object> r = new BatchResult<Object>(responses, syncedSlaves);
resultPromise.trySuccess((R) r);
} catch (Exception e) {
resultPromise.tryFailure(e);
}
commands = null;
});
}
}, permits);
return resultPromise;
}
private void execute(Entry entry, NodeSource source, RPromise<Void> mainPromise, AtomicInteger slots,
BatchOptions options) {
RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(source, mainPromise, connectionManager, options, entry, slots);
executor.execute();
protected boolean isRedisBasedQueue() {
return options != null && (options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC
|| options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC);
}
protected boolean isWaitCommand(CommandData<?, ?> c) {
......
......@@ -17,12 +17,11 @@ package org.redisson.command;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.BatchOptions;
import org.redisson.api.RFuture;
import org.redisson.api.BatchOptions.ExecutionMode;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.CommandData;
......
......@@ -73,7 +73,7 @@ import io.netty.util.concurrent.FutureListener;
* @param <V> type of value
* @param <R> type of returned value
*/
@SuppressWarnings({"NestedIfDepth", "MethodLength"})
@SuppressWarnings({"NestedIfDepth"})
public class RedisExecutor<V, R> {
static final Logger log = LoggerFactory.getLogger(RedisExecutor.class);
......@@ -152,6 +152,48 @@ public class RedisExecutor<V, R> {
});
}
scheduleRetryTimeout(connectionFuture, attemptPromise);
connectionFuture.onComplete((connection, e) -> {
if (connectionFuture.isCancelled()) {
connectionManager.getShutdownLatch().release();
return;
}
if (!connectionFuture.isSuccess()) {
connectionManager.getShutdownLatch().release();
exception = convertException(connectionFuture);
return;
}
if (attemptPromise.isDone() || mainPromise.isDone()) {
releaseConnection(attemptPromise, connectionFuture);
return;
}
sendCommand(attemptPromise, connection);
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(writeFuture, attemptPromise, connection);
}
});
releaseConnection(attemptPromise, connectionFuture);
});
attemptPromise.onComplete((r, e) -> {
checkAttemptPromise(attemptPromise, connectionFuture);
});
}
private void scheduleRetryTimeout(RFuture<RedisConnection> connectionFuture, RPromise<R> attemptPromise) {
if (retryInterval == 0 || attempts == 0) {
this.timeout = MasterSlaveConnectionManager.DUMMY_TIMEOUT;
return;
}
TimerTask retryTimerTask = new TimerTask() {
@Override
......@@ -233,44 +275,7 @@ public class RedisExecutor<V, R> {
};
if (retryInterval > 0 && attempts > 0) {
this.timeout = connectionManager.newTimeout(retryTimerTask, retryInterval, TimeUnit.MILLISECONDS);
} else {
this.timeout = MasterSlaveConnectionManager.DUMMY_TIMEOUT;
}
connectionFuture.onComplete((connection, e) -> {
if (connectionFuture.isCancelled()) {
connectionManager.getShutdownLatch().release();
return;
}
if (!connectionFuture.isSuccess()) {
connectionManager.getShutdownLatch().release();
exception = convertException(connectionFuture);
return;
}
if (attemptPromise.isDone() || mainPromise.isDone()) {
releaseConnection(attemptPromise, connectionFuture);
return;
}
sendCommand(attemptPromise, connection);
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(writeFuture, attemptPromise, connection);
}
});
releaseConnection(attemptPromise, connectionFuture);
});
attemptPromise.onComplete((r, e) -> {
checkAttemptPromise(attemptPromise, connectionFuture);
});
timeout = connectionManager.newTimeout(retryTimerTask, retryInterval, TimeUnit.MILLISECONDS);
}
protected void free() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册