提交 fac18462 编写于 作者: N Nikita Koksharov

Fixed - references in RxJava objects aren't supported #3480

上级 6616c723
......@@ -130,7 +130,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......@@ -1426,13 +1426,13 @@ public class RedissonConnection extends AbstractRedisConnection {
if (isPipelined()) {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
return;
}
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......
......@@ -160,7 +160,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......@@ -1483,13 +1483,13 @@ public class RedissonConnection extends AbstractRedisConnection {
if (isPipelined()) {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
return;
}
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......
......@@ -163,7 +163,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......@@ -1496,13 +1496,13 @@ public class RedissonConnection extends AbstractRedisConnection {
if (isPipelined()) {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
return;
}
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......
......@@ -135,7 +135,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......@@ -1484,13 +1484,13 @@ public class RedissonConnection extends AbstractRedisConnection {
if (isPipelined()) {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
return;
}
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......
......@@ -161,7 +161,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......@@ -1515,13 +1515,13 @@ public class RedissonConnection extends AbstractRedisConnection {
if (isPipelined()) {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
return;
}
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......
......@@ -153,7 +153,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......@@ -1507,13 +1507,13 @@ public class RedissonConnection extends AbstractRedisConnection {
if (isPipelined()) {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
return;
}
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......
......@@ -130,7 +130,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......@@ -1484,13 +1484,13 @@ public class RedissonConnection extends AbstractRedisConnection {
if (isPipelined()) {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
return;
}
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......
......@@ -130,7 +130,7 @@ public class RedissonConnection extends AbstractRedisConnection {
public void openPipeline() {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......@@ -1484,13 +1484,13 @@ public class RedissonConnection extends AbstractRedisConnection {
if (isPipelined()) {
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.IN_MEMORY_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
return;
}
BatchOptions options = BatchOptions.defaults()
.executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
this.executorService = new CommandBatchService(redisson.getConnectionManager(), options);
this.executorService = new CommandBatchService(executorService, options);
}
@Override
......
......@@ -15,18 +15,18 @@
*/
package org.redisson;
import java.io.Serializable;
import java.util.Optional;
import org.redisson.api.*;
import org.redisson.api.annotation.REntity;
import org.redisson.client.codec.Codec;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.misc.BiHashMap;
import java.io.Serializable;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
* @author Nikita Koksharov
*/
public class RedissonReference implements Serializable {
......@@ -78,7 +78,6 @@ public class RedissonReference implements Serializable {
private String type;
private String keyName;
private String codec;
private ReferenceType referenceType;
public RedissonReference() {
}
......@@ -95,19 +94,16 @@ public class RedissonReference implements Serializable {
throw new IllegalArgumentException("Class reference has to be a type of either RObject/RLiveObject/RObjectReactive/RObjectRx");
}
if (RObjectReactive.class.isAssignableFrom(type)) {
this.referenceType = ReferenceType.REACTIVE;
this.type = REACTIVE_MAP.get(type.getName());
if (this.type == null) {
throw new IllegalArgumentException("There is no Reactive compatible type for " + type);
}
} else if (RObjectRx.class.isAssignableFrom(type)) {
this.referenceType = ReferenceType.RXJAVA;
this.type = RXJAVA_MAP.get(type.getName());
if (this.type == null) {
throw new IllegalArgumentException("There is no RxJava compatible type for " + type);
}
} else {
this.referenceType = ReferenceType.DEFAULT;
this.type = type.getName();
}
this.keyName = keyName;
......@@ -124,10 +120,6 @@ public class RedissonReference implements Serializable {
return Class.forName(type);
}
public ReferenceType getReferenceType() {
return Optional.ofNullable(referenceType).orElse(ReferenceType.DEFAULT);
}
public Class<?> getRxJavaType() throws ClassNotFoundException {
if (RXJAVA_MAP.containsValue(type)) {
return Class.forName(RXJAVA_MAP.reverseGet(type)); //live object is not supported in reactive client
......@@ -153,17 +145,6 @@ public class RedissonReference implements Serializable {
return type;
}
/**
* @param type the type to set
*/
public void setType(Class<?> type) {
if (!ClassUtils.isAnnotationPresent(type, REntity.class)
&& (!RObject.class.isAssignableFrom(type) || !RObjectReactive.class.isAssignableFrom(type))) {
throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject or RObjectReactive");
}
this.type = type.getName();
}
/**
* @return the keyName
*/
......@@ -192,19 +173,4 @@ public class RedissonReference implements Serializable {
}
return null;
}
/**
* @return Codec name in string
*/
public String getCodecName() {
return codec;
}
/**
* @param codec the codec to set
*/
public void setCodecType(Class<? extends Codec> codec) {
this.codec = codec.getName();
}
}
......@@ -47,13 +47,13 @@ public class BaseRedisBatchExecutor<V, R> extends RedisExecutor<V, R> {
@SuppressWarnings("ParameterNumber")
public BaseRedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect,
ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder,
ConcurrentMap<MasterSlaveEntry, Entry> commands,
BatchOptions options, AtomicInteger index, AtomicBoolean executed) {
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect,
ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder,
ConcurrentMap<MasterSlaveEntry, Entry> commands,
BatchOptions options, AtomicInteger index, AtomicBoolean executed, RedissonObjectBuilder.ReferenceType referenceType) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager,
objectBuilder);
objectBuilder, referenceType);
this.commands = commands;
this.options = options;
this.index = index;
......
......@@ -61,10 +61,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
final ConnectionManager connectionManager;
final RedissonObjectBuilder objectBuilder;
final RedissonObjectBuilder.ReferenceType referenceType;
public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
this.connectionManager = connectionManager;
this.objectBuilder = objectBuilder;
this.referenceType = referenceType;
}
@Override
......@@ -499,7 +501,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.addAll(Arrays.asList(params));
RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd,
args.toArray(), promise, false, connectionManager, objectBuilder);
args.toArray(), promise, false, connectionManager, objectBuilder, referenceType);
executor.execute();
promise.onComplete((res, e) -> {
......@@ -572,7 +574,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <V, R> void async(boolean readOnlyMode, NodeSource source, Codec codec,
RedisCommand<V> command, Object[] params, RPromise<R> mainPromise,
boolean ignoreRedirect) {
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder);
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise,
ignoreRedirect, connectionManager, objectBuilder, referenceType);
executor.execute();
}
......
......@@ -115,19 +115,24 @@ public class CommandBatchService extends CommandAsyncService {
private final AtomicBoolean executed = new AtomicBoolean();
public CommandBatchService(CommandAsyncExecutor executor) {
this(executor.getConnectionManager(), BatchOptions.defaults(), executor.getObjectBuilder());
this(executor, RedissonObjectBuilder.ReferenceType.DEFAULT);
}
public CommandBatchService(CommandAsyncExecutor executor, RedissonObjectBuilder.ReferenceType referenceType) {
this(executor.getConnectionManager(), BatchOptions.defaults(), executor.getObjectBuilder(), referenceType);
}
public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options) {
this(executor.getConnectionManager(), options, executor.getObjectBuilder());
this(executor.getConnectionManager(), options, executor.getObjectBuilder(), RedissonObjectBuilder.ReferenceType.DEFAULT);
}
public CommandBatchService(ConnectionManager connectionManager, BatchOptions options) {
this(connectionManager.getCommandExecutor(), options);
public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options, RedissonObjectBuilder.ReferenceType referenceType) {
this(executor.getConnectionManager(), options, executor.getObjectBuilder(), referenceType);
}
public CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder) {
super(connectionManager, objectBuilder);
private CommandBatchService(ConnectionManager connectionManager, BatchOptions options,
RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
super(connectionManager, objectBuilder, referenceType);
this.options = options;
}
......@@ -145,11 +150,11 @@ public class CommandBatchService extends CommandAsyncService {
if (isRedisBasedQueue()) {
boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;
RedisExecutor<V, R> executor = new RedisQueuedBatchExecutor<>(isReadOnly, nodeSource, codec, command, params, mainPromise,
false, connectionManager, objectBuilder, commands, connections, options, index, executed, latch);
false, connectionManager, objectBuilder, commands, connections, options, index, executed, latch, referenceType);
executor.execute();
} else {
RedisExecutor<V, R> executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise,
false, connectionManager, objectBuilder, commands, options, index, executed);
false, connectionManager, objectBuilder, commands, options, index, executed, referenceType);
executor.execute();
}
......@@ -285,7 +290,7 @@ public class CommandBatchService extends CommandAsyncService {
Object entryResult = commandEntry.getPromise().getNow();
try {
entryResult = RedisExecutor.tryHandleReference(objectBuilder, entryResult);
entryResult = objectBuilder.tryHandleReference(entryResult, referenceType);
} catch (ReflectiveOperationException exc) {
log.error("Unable to handle reference from " + entryResult, exc);
}
......@@ -316,7 +321,7 @@ public class CommandBatchService extends CommandAsyncService {
for (Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise,
connectionManager, this.options, e.getValue(), slots);
connectionManager, this.options, e.getValue(), slots, referenceType);
executor.execute();
}
return promise;
......@@ -436,7 +441,7 @@ public class CommandBatchService extends CommandAsyncService {
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = RedisExecutor.tryHandleReference(objectBuilder, entryResult);
entryResult = objectBuilder.tryHandleReference(entryResult, referenceType);
responses.add(entryResult);
}
}
......
......@@ -35,7 +35,7 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx
final Logger log = LoggerFactory.getLogger(getClass());
public CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
super(connectionManager, objectBuilder);
super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT);
}
@Override
......
......@@ -40,12 +40,12 @@ public class RedisBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R> {
@SuppressWarnings("ParameterNumber")
public RedisBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
RedissonObjectBuilder objectBuilder, ConcurrentMap<MasterSlaveEntry, Entry> commands,
BatchOptions options, AtomicInteger index,
AtomicBoolean executed) {
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
RedissonObjectBuilder objectBuilder, ConcurrentMap<MasterSlaveEntry, Entry> commands,
BatchOptions options, AtomicInteger index,
AtomicBoolean executed, RedissonObjectBuilder.ReferenceType referenceType) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder,
commands, options, index, executed);
commands, options, index, executed, referenceType);
}
@Override
......
......@@ -31,6 +31,7 @@ import org.redisson.command.CommandBatchService.Entry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
......@@ -49,9 +50,10 @@ public class RedisCommonBatchExecutor extends RedisExecutor<Object, Void> {
private final AtomicInteger slots;
private final BatchOptions options;
public RedisCommonBatchExecutor(NodeSource source, RPromise<Void> mainPromise,
ConnectionManager connectionManager, BatchOptions options, Entry entry, AtomicInteger slots) {
super(entry.isReadOnlyMode(), source, null, null, null, mainPromise, false, connectionManager, null);
public RedisCommonBatchExecutor(NodeSource source, RPromise<Void> mainPromise,
ConnectionManager connectionManager, BatchOptions options, Entry entry, AtomicInteger slots, RedissonObjectBuilder.ReferenceType referenceType) {
super(entry.isReadOnlyMode(), source, null, null, null,
mainPromise, false, connectionManager, null, referenceType);
this.options = options;
this.entry = entry;
this.slots = slots;
......
......@@ -22,7 +22,6 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
......@@ -30,9 +29,10 @@ import org.redisson.cache.LRUCacheMap;
import org.redisson.client.*;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.*;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
......@@ -43,7 +43,9 @@ import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
......@@ -66,6 +68,7 @@ public class RedisExecutor<V, R> {
final boolean ignoreRedirect;
final RedissonObjectBuilder objectBuilder;
final ConnectionManager connectionManager;
final RedissonObjectBuilder.ReferenceType referenceType;
RFuture<RedisConnection> connectionFuture;
NodeSource source;
......@@ -81,8 +84,9 @@ public class RedisExecutor<V, R> {
long responseTimeout;
public RedisExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect,
ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect,
ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder,
RedissonObjectBuilder.ReferenceType referenceType) {
super();
this.readOnlyMode = readOnlyMode;
this.source = source;
......@@ -97,6 +101,7 @@ public class RedisExecutor<V, R> {
this.attempts = connectionManager.getConfig().getRetryAttempts();
this.retryInterval = connectionManager.getConfig().getRetryInterval();
this.responseTimeout = connectionManager.getConfig().getTimeout();
this.referenceType = referenceType;
}
public void execute() {
......@@ -485,127 +490,9 @@ public class RedisExecutor<V, R> {
}
private void handleReference(RPromise<R> promise, R res) throws ReflectiveOperationException {
promise.trySuccess((R) tryHandleReference(objectBuilder, res));
promise.trySuccess((R) objectBuilder.tryHandleReference(res, referenceType));
}
public static Object tryHandleReference(RedissonObjectBuilder objectBuilder, Object o) throws ReflectiveOperationException {
boolean hasConversion = false;
if (o instanceof List) {
List<Object> r = (List<Object>) o;
for (int i = 0; i < r.size(); i++) {
Object ref = tryHandleReference0(objectBuilder, r.get(i));
if (ref != r.get(i)) {
r.set(i, ref);
}
}
return o;
} else if (o instanceof Set) {
Set<Object> set = (Set<Object>) o;
Set<Object> r = (Set<Object>) o;
boolean useNewSet = o instanceof LinkedHashSet;
try {
set = (Set<Object>) o.getClass().getConstructor().newInstance();
} catch (Exception exception) {
set = new LinkedHashSet<Object>();
}
for (Object i : r) {
Object ref = tryHandleReference0(objectBuilder, i);
//Not testing for ref changes because r.add(ref) below needs to
//fail on the first iteration to be able to perform fall back
//if failure happens.
//
//Assuming the failure reason is systematic such as put method
//is not supported or implemented, and not an occasional issue
//like only one element fails.
if (useNewSet) {
set.add(ref);
} else {
try {
r.add(ref);
set.add(i);
} catch (Exception e) {
//r is not supporting add operation, like
//LinkedHashMap$LinkedEntrySet and others.
//fall back to use a new set.
useNewSet = true;
set.add(ref);
}
}
hasConversion |= ref != i;
}
if (!hasConversion) {
return o;
} else if (useNewSet) {
return set;
} else if (!set.isEmpty()) {
r.removeAll(set);
}
return o;
} else if (o instanceof Map) {
Map<Object, Object> r = (Map<Object, Object>) o;
for (Map.Entry<Object, Object> e : r.entrySet()) {
if (e.getKey() instanceof RedissonReference
|| e.getValue() instanceof RedissonReference) {
Object key = e.getKey();
Object value = e.getValue();
if (e.getKey() instanceof RedissonReference) {
key = fromReference(objectBuilder, e.getKey());
r.remove(e.getKey());
}
if (e.getValue() instanceof RedissonReference) {
value = fromReference(objectBuilder, e.getValue());
}
r.put(key, value);
}
}
return o;
} else if (o instanceof ListScanResult) {
tryHandleReference(objectBuilder, ((ListScanResult) o).getValues());
return o;
} else if (o instanceof MapScanResult) {
MapScanResult scanResult = (MapScanResult) o;
Map oldMap = ((MapScanResult) o).getMap();
Map map = (Map) tryHandleReference(objectBuilder, oldMap);
if (map != oldMap) {
MapScanResult<Object, Object> newScanResult
= new MapScanResult<Object, Object>(scanResult.getPos(), map);
newScanResult.setRedisClient(scanResult.getRedisClient());
return newScanResult;
} else {
return o;
}
} else {
return tryHandleReference0(objectBuilder, o);
}
}
private static Object tryHandleReference0(RedissonObjectBuilder objectBuilder, Object o) throws ReflectiveOperationException {
if (o instanceof RedissonReference) {
return fromReference(objectBuilder, o);
} else if (o instanceof ScoredEntry && ((ScoredEntry) o).getValue() instanceof RedissonReference) {
ScoredEntry<?> se = (ScoredEntry<?>) o;
return new ScoredEntry(se.getScore(), fromReference(objectBuilder, se.getValue()));
} else if (o instanceof Map.Entry) {
Map.Entry old = (Map.Entry) o;
Object key = tryHandleReference0(objectBuilder, old.getKey());
Object value = tryHandleReference0(objectBuilder, old.getValue());
if (value != old.getValue() || key != old.getKey()) {
return new AbstractMap.SimpleEntry(key, value);
}
}
return o;
}
private static Object fromReference(RedissonObjectBuilder objectBuilder, Object res) throws ReflectiveOperationException {
if (objectBuilder == null) {
return res;
}
return objectBuilder.fromReference((RedissonReference) res);
}
protected void sendCommand(RPromise<R> attemptPromise, RedisConnection connection) {
if (source.getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
......
......@@ -53,12 +53,12 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
@SuppressWarnings("ParameterNumber")
public RedisQueuedBatchExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command,
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
RedissonObjectBuilder objectBuilder, ConcurrentMap<MasterSlaveEntry, Entry> commands,
ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections, BatchOptions options, AtomicInteger index,
AtomicBoolean executed, AsyncCountDownLatch latch) {
Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager,
RedissonObjectBuilder objectBuilder, ConcurrentMap<MasterSlaveEntry, Entry> commands,
ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections, BatchOptions options, AtomicInteger index,
AtomicBoolean executed, AsyncCountDownLatch latch, RedissonObjectBuilder.ReferenceType referenceType) {
super(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder,
commands, options, index, executed);
commands, options, index, executed, referenceType);
this.connections = connections;
this.latch = latch;
......
......@@ -97,7 +97,7 @@ public class AccessorInterceptor {
return result;
}
if (result instanceof RedissonReference) {
return commandExecutor.getObjectBuilder().fromReference((RedissonReference) result);
return commandExecutor.getObjectBuilder().fromReference((RedissonReference) result, RedissonObjectBuilder.ReferenceType.DEFAULT);
}
return result;
}
......
......@@ -20,6 +20,9 @@ import org.redisson.api.*;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RObjectField;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.codec.DefaultReferenceCodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
......@@ -42,6 +45,8 @@ import java.util.concurrent.ConcurrentMap;
*/
public class RedissonObjectBuilder {
public enum ReferenceType {RXJAVA, REACTIVE, DEFAULT}
private static final Map<Class<?>, Class<? extends RObject>> SUPPORTED_CLASS_MAPPING = new LinkedHashMap<>();
private static final Map<Class<?>, CodecMethodRef> REFERENCES = new HashMap<>();
......@@ -200,10 +205,10 @@ public class RedissonObjectBuilder {
}
}
public Object fromReference(RedissonReference rr) throws ReflectiveOperationException {
if (rr.getReferenceType() == RedissonReference.ReferenceType.REACTIVE) {
public Object fromReference(RedissonReference rr, ReferenceType type) throws ReflectiveOperationException {
if (type == ReferenceType.REACTIVE) {
return fromReference(redissonReactive, rr);
} else if (rr.getReferenceType() == RedissonReference.ReferenceType.RXJAVA) {
} else if (type == ReferenceType.RXJAVA) {
return fromReference(redissonRx, rr);
}
return fromReference(redisson, rr);
......@@ -240,7 +245,7 @@ public class RedissonObjectBuilder {
return builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType()));
}
}
throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodecName());
throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodec());
}
private boolean isDefaultCodec(RedissonReference rr) {
......@@ -311,7 +316,7 @@ public class RedissonObjectBuilder {
}
private <T extends RObject, K extends Codec> T createRObject(RedissonClient redisson, Class<T> expectedType, String name, K codec) throws ReflectiveOperationException {
List<Class<?>> interfaces = Arrays.asList(expectedType.getInterfaces());
Class<?>[] interfaces = expectedType.getInterfaces();
for (Class<?> iType : interfaces) {
if (REFERENCES.containsKey(iType)) {// user cache to speed up things a little.
Method builder = REFERENCES.get(iType).get(codec != null);
......@@ -322,15 +327,121 @@ public class RedissonObjectBuilder {
}
}
String type = null;
if (expectedType != null) {
type = expectedType.getName();
}
String codecName = null;
if (codec != null) {
codecName = codec.getClass().getName();
}
throw new ClassNotFoundException("No RObject is found to match class type of " + type + " with codec type of " + codecName);
throw new ClassNotFoundException("No RObject is found to match class type of " + expectedType.getName() + " with codec type of " + codecName);
}
public Object tryHandleReference(Object o, ReferenceType type) throws ReflectiveOperationException {
boolean hasConversion = false;
if (o instanceof List) {
List<Object> r = (List<Object>) o;
for (int i = 0; i < r.size(); i++) {
Object ref = tryHandleReference0(r.get(i), type);
if (ref != r.get(i)) {
r.set(i, ref);
}
}
return o;
} else if (o instanceof Set) {
Set<Object> set;
Set<Object> r = (Set<Object>) o;
boolean useNewSet = o instanceof LinkedHashSet;
try {
set = (Set<Object>) o.getClass().getConstructor().newInstance();
} catch (Exception exception) {
set = new LinkedHashSet<>();
}
for (Object i : r) {
Object ref = tryHandleReference0(i, type);
//Not testing for ref changes because r.add(ref) below needs to
//fail on the first iteration to be able to perform fall back
//if failure happens.
//
//Assuming the failure reason is systematic such as put method
//is not supported or implemented, and not an occasional issue
//like only one element fails.
if (useNewSet) {
set.add(ref);
} else {
try {
r.add(ref);
set.add(i);
} catch (Exception e) {
//r is not supporting add operation, like
//LinkedHashMap$LinkedEntrySet and others.
//fall back to use a new set.
useNewSet = true;
set.add(ref);
}
}
hasConversion |= ref != i;
}
if (!hasConversion) {
return o;
} else if (useNewSet) {
return set;
} else if (!set.isEmpty()) {
r.removeAll(set);
}
return o;
} else if (o instanceof Map) {
Map<Object, Object> r = (Map<Object, Object>) o;
for (Map.Entry<Object, Object> e : r.entrySet()) {
if (e.getKey() instanceof RedissonReference
|| e.getValue() instanceof RedissonReference) {
Object key = e.getKey();
Object value = e.getValue();
if (e.getKey() instanceof RedissonReference) {
key = fromReference((RedissonReference) e.getKey(), type);
r.remove(e.getKey());
}
if (e.getValue() instanceof RedissonReference) {
value = fromReference((RedissonReference) e.getValue(), type);
}
r.put(key, value);
}
}
return o;
} else if (o instanceof ListScanResult) {
tryHandleReference(((ListScanResult) o).getValues(), type);
return o;
} else if (o instanceof MapScanResult) {
MapScanResult scanResult = (MapScanResult) o;
Map oldMap = ((MapScanResult) o).getMap();
Map map = (Map) tryHandleReference(oldMap, type);
if (map != oldMap) {
MapScanResult<Object, Object> newScanResult
= new MapScanResult<Object, Object>(scanResult.getPos(), map);
newScanResult.setRedisClient(scanResult.getRedisClient());
return newScanResult;
} else {
return o;
}
} else {
return tryHandleReference0(o, type);
}
}
private Object tryHandleReference0(Object o, ReferenceType type) throws ReflectiveOperationException {
if (o instanceof RedissonReference) {
return fromReference((RedissonReference) o, type);
} else if (o instanceof ScoredEntry && ((ScoredEntry) o).getValue() instanceof RedissonReference) {
ScoredEntry<?> se = (ScoredEntry<?>) o;
return new ScoredEntry(se.getScore(), fromReference((RedissonReference) se.getValue(), type));
} else if (o instanceof Map.Entry) {
Map.Entry old = (Map.Entry) o;
Object key = tryHandleReference0(old.getKey(), type);
Object value = tryHandleReference0(old.getValue(), type);
if (value != old.getValue() || key != old.getKey()) {
return new AbstractMap.SimpleEntry(key, value);
}
}
return o;
}
}
......@@ -23,6 +23,7 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RPromise;
import reactor.core.publisher.Mono;
......@@ -39,7 +40,7 @@ public class CommandReactiveBatchService extends CommandReactiveService {
public CommandReactiveBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager.getCommandExecutor(), options);
batchService = new CommandBatchService(connectionManager.getCommandExecutor(), options, RedissonObjectBuilder.ReferenceType.REACTIVE);
}
@Override
......
......@@ -21,6 +21,7 @@ import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
......@@ -32,7 +33,7 @@ import reactor.core.publisher.Mono;
public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor {
public CommandReactiveService(ConnectionManager connectionManager) {
super(connectionManager, connectionManager.getCommandExecutor().getObjectBuilder());
super(connectionManager, connectionManager.getCommandExecutor().getObjectBuilder(), RedissonObjectBuilder.ReferenceType.REACTIVE);
}
@Override
......
......@@ -24,6 +24,7 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.RPromise;
import java.util.concurrent.Callable;
......@@ -39,7 +40,7 @@ public class CommandRxBatchService extends CommandRxService {
public CommandRxBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager.getCommandExecutor(), options);
batchService = new CommandBatchService(connectionManager.getCommandExecutor(), options, RedissonObjectBuilder.ReferenceType.RXJAVA);
}
@Override
......
......@@ -25,6 +25,7 @@ import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.LongConsumer;
import io.reactivex.rxjava3.processors.ReplayProcessor;
import org.redisson.liveobject.core.RedissonObjectBuilder;
/**
*
......@@ -34,7 +35,7 @@ import io.reactivex.rxjava3.processors.ReplayProcessor;
public class CommandRxService extends CommandAsyncService implements CommandRxExecutor {
public CommandRxService(ConnectionManager connectionManager) {
super(connectionManager, connectionManager.getCommandExecutor().getObjectBuilder());
super(connectionManager, connectionManager.getCommandExecutor().getObjectBuilder(), RedissonObjectBuilder.ReferenceType.RXJAVA);
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册