提交 0b941a1b 编写于 作者: N Nikita Koksharov

Feature - added batched persist method to RLiveObjectService object #2359

上级 0d4ec96d
......@@ -671,7 +671,7 @@ public class Redisson implements RedissonClient {
@Override
public RLiveObjectService getLiveObjectService() {
return new RedissonLiveObjectService(this, liveObjectClassCache, connectionManager.getCommandExecutor());
return new RedissonLiveObjectService(liveObjectClassCache, connectionManager);
}
@Override
......
......@@ -15,62 +15,6 @@
*/
package org.redisson;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.redisson.api.RCascadeType;
import org.redisson.api.RDeque;
import org.redisson.api.RExpirable;
import org.redisson.api.RExpirableAsync;
import org.redisson.api.RList;
import org.redisson.api.RLiveObject;
import org.redisson.api.RLiveObjectService;
import org.redisson.api.RMap;
import org.redisson.api.RMapAsync;
import org.redisson.api.RObject;
import org.redisson.api.RObjectAsync;
import org.redisson.api.RQueue;
import org.redisson.api.RSet;
import org.redisson.api.RSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RCascade;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RFieldAccessor;
import org.redisson.api.annotation.RId;
import org.redisson.api.annotation.RIndex;
import org.redisson.api.condition.Condition;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.liveobject.LiveObjectSearch;
import org.redisson.liveobject.LiveObjectTemplate;
import org.redisson.liveobject.core.AccessorInterceptor;
import org.redisson.liveobject.core.FieldAccessorInterceptor;
import org.redisson.liveobject.core.LiveObjectInterceptor;
import org.redisson.liveobject.core.RExpirableInterceptor;
import org.redisson.liveobject.core.RMapInterceptor;
import org.redisson.liveobject.misc.AdvBeanCopy;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.RIdResolver;
import jodd.bean.BeanCopy;
import jodd.bean.BeanUtil;
import net.bytebuddy.ByteBuddy;
......@@ -82,20 +26,40 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.FieldProxy;
import net.bytebuddy.matcher.ElementMatchers;
import org.redisson.api.*;
import org.redisson.api.annotation.*;
import org.redisson.api.condition.Condition;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.LiveObjectSearch;
import org.redisson.liveobject.LiveObjectTemplate;
import org.redisson.liveobject.core.*;
import org.redisson.liveobject.misc.AdvBeanCopy;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.RIdResolver;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class RedissonLiveObjectService implements RLiveObjectService {
private static final ConcurrentMap<Class<? extends RIdResolver<?>>, RIdResolver<?>> PROVIDER_CACHE = new ConcurrentHashMap<>();
private final ConcurrentMap<Class<?>, Class<?>> classCache;
private final RedissonClient redisson;
private final CommandAsyncExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final LiveObjectSearch seachEngine;
public RedissonLiveObjectService(RedissonClient redisson, ConcurrentMap<Class<?>, Class<?>> classCache, CommandAsyncExecutor commandExecutor) {
this.redisson = redisson;
public RedissonLiveObjectService(ConcurrentMap<Class<?>, Class<?>> classCache,
ConnectionManager connectionManager) {
this.classCache = classCache;
this.commandExecutor = commandExecutor;
this.seachEngine = new LiveObjectSearch(redisson, commandExecutor.getObjectBuilder());
this.connectionManager = connectionManager;
this.seachEngine = new LiveObjectSearch(connectionManager.getCommandExecutor());
}
//TODO: Add ttl renewal functionality
......@@ -117,12 +81,12 @@ public class RedissonLiveObjectService implements RLiveObjectService {
String idFieldName = getRIdFieldName(entityClass);
RId annotation = ClassUtils.getDeclaredField(entityClass, idFieldName)
.getAnnotation(RId.class);
RIdResolver<?> resolver = getResolver(entityClass, annotation.generator(), annotation);
Object id = resolver.resolve(entityClass, annotation, idFieldName, redisson);
RIdResolver<?> resolver = getResolver(entityClass, annotation.generator());
Object id = resolver.resolve(entityClass, annotation, idFieldName, connectionManager.getCommandExecutor());
return id;
}
private RIdResolver<?> getResolver(Class<?> cls, Class<? extends RIdResolver<?>> resolverClass, Annotation anno) {
private RIdResolver<?> getResolver(Class<?> cls, Class<? extends RIdResolver<?>> resolverClass) {
if (!PROVIDER_CACHE.containsKey(resolverClass)) {
try {
PROVIDER_CACHE.putIfAbsent(resolverClass, resolverClass.newInstance());
......@@ -134,9 +98,21 @@ public class RedissonLiveObjectService implements RLiveObjectService {
}
public <T> T createLiveObject(Class<T> entityClass, Object id) {
return instantiateLiveObject(getProxyClass(entityClass), id);
registerClass(entityClass);
Class<?> proxyClass = classCache.get(entityClass);
return (T) instantiateLiveObject(proxyClass, id);
}
private <T> T createLiveObject(Class<T> entityClass, Object id, CommandAsyncExecutor commandExecutor, Map<Class<?>, Class<?>> classCache) {
Class<?> proxyClass = classCache.get(entityClass);
if (proxyClass == null) {
validateClass(entityClass);
proxyClass = createProxy(entityClass, commandExecutor);
classCache.put(entityClass, proxyClass);
}
return (T) instantiateLiveObject(proxyClass, id);
}
@Override
public <T> T get(Class<T> entityClass, Object id) {
T proxied = createLiveObject(entityClass, id);
......@@ -164,6 +140,14 @@ public class RedissonLiveObjectService implements RLiveObjectService {
return createLiveObject(entityClass, id);
}
private <T> T attach(T detachedObject, CommandAsyncExecutor commandExecutor, Map<Class<?>, Class<?>> classCache) {
validateDetached(detachedObject);
Class<T> entityClass = (Class<T>) detachedObject.getClass();
String idFieldName = getRIdFieldName(detachedObject.getClass());
Object id = ClassUtils.getField(detachedObject, idFieldName);
return createLiveObject(entityClass, id, commandExecutor, classCache);
}
@Override
public <T> T merge(T detachedObject) {
Map<Object, Object> alreadyPersisted = new HashMap<Object, Object>();
......@@ -175,7 +159,70 @@ public class RedissonLiveObjectService implements RLiveObjectService {
Map<Object, Object> alreadyPersisted = new HashMap<Object, Object>();
return persist(detachedObject, alreadyPersisted, RCascadeType.PERSIST);
}
@Override
public <T> List<T> persist(T... detachedObjects) {
CommandBatchService commandExecutor = new CommandBatchService(connectionManager);
Map<Class<?>, Class<?>> classCache = new HashMap<>();
Map<T, Object> detached2Attached = new LinkedHashMap<>();
Map<String, Object> name2id = new HashMap<>();
for (T detachedObject : detachedObjects) {
String idFieldName = getRIdFieldName(detachedObject.getClass());
Object id = ClassUtils.getField(detachedObject, idFieldName);
if (id == null) {
try {
id = generateId(detachedObject.getClass());
} catch (NoSuchFieldException e) {
throw new IllegalArgumentException(e);
}
ClassUtils.setField(detachedObject, idFieldName, id);
}
T attachedObject = attach(detachedObject, commandExecutor, classCache);
RMap<String, Object> liveMap = getMap(attachedObject);
detached2Attached.put(detachedObject, attachedObject);
name2id.put(liveMap.getName(), id);
}
CommandBatchService checkExecutor = new CommandBatchService(connectionManager);
for (Entry<String, Object> entry : name2id.entrySet()) {
RMap map = new RedissonMap(checkExecutor, entry.getKey(), null, null, null);
map.containsKeyAsync("redisson_live_object");
}
BatchResult<?> checkResponse = checkExecutor.execute();
for (int i = 0; i < checkResponse.getResponses().size(); i++) {
Boolean value = (Boolean) checkResponse.getResponses().get(i);
if (value) {
List<Object> list = new ArrayList<>(name2id.values());
Object id = list.get(i);
throw new IllegalArgumentException("Object with id=" + id + " already exists.");
}
}
for (Entry<T, Object> entry : detached2Attached.entrySet()) {
T detachedObject = entry.getKey();
Object attachedObject = entry.getValue();
for (FieldDescription.InDefinedShape field : Introspectior.getAllFields(detachedObject.getClass())) {
Object object = ClassUtils.getField(detachedObject, field.getName());
if (object == null) {
continue;
}
String idFieldName = getRIdFieldName(detachedObject.getClass());
validateAnnotation(detachedObject, field.getName());
copy(detachedObject, attachedObject, Arrays.asList(idFieldName));
}
}
commandExecutor.execute();
return new ArrayList<>(detached2Attached.keySet());
}
private <T> T persist(T detachedObject, Map<Object, Object> alreadyPersisted, RCascadeType type) {
String idFieldName = getRIdFieldName(detachedObject.getClass());
Object id = ClassUtils.getField(detachedObject, idFieldName);
......@@ -187,31 +234,31 @@ public class RedissonLiveObjectService implements RLiveObjectService {
}
ClassUtils.setField(detachedObject, idFieldName, id);
}
T attachedObject = attach(detachedObject);
alreadyPersisted.put(detachedObject, attachedObject);
RMap<String, Object> liveMap = getMap(attachedObject);
List<String> excludedFields = new ArrayList<String>();
excludedFields.add(idFieldName);
boolean fastResult = liveMap.fastPut("redisson_live_object", "1");
if (type == RCascadeType.PERSIST && !fastResult) {
throw new IllegalArgumentException("This REntity already exists.");
}
for (FieldDescription.InDefinedShape field : Introspectior.getAllFields(detachedObject.getClass())) {
Object object = ClassUtils.getField(detachedObject, field.getName());
if (object == null) {
continue;
}
RObject rObject = commandExecutor.getObjectBuilder().createObject(id, detachedObject.getClass(), object.getClass(), field.getName());
RObject rObject = connectionManager.getCommandExecutor().getObjectBuilder().createObject(id, detachedObject.getClass(), object.getClass(), field.getName());
if (rObject != null) {
commandExecutor.getObjectBuilder().store(rObject, field.getName(), liveMap);
connectionManager.getCommandExecutor().getObjectBuilder().store(rObject, field.getName(), liveMap);
if (rObject instanceof SortedSet) {
((RSortedSet) rObject).trySetComparator(((SortedSet) object).comparator());
}
if (rObject instanceof Collection) {
for (Object obj : (Collection<Object>) object) {
if (obj != null && ClassUtils.isAnnotationPresent(obj.getClass(), REntity.class)) {
......@@ -248,7 +295,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
}
value = persisted;
}
rMap.put(key, value);
}
}
......@@ -260,13 +307,13 @@ public class RedissonLiveObjectService implements RLiveObjectService {
persisted = persist(object, alreadyPersisted, type);
}
}
excludedFields.add(field.getName());
BeanUtil.pojo.setSimpleProperty(attachedObject, field.getName(), persisted);
} else {
validateAnnotation(detachedObject, field.getName());
}
}
copy(detachedObject, attachedObject, excludedFields);
return attachedObject;
......@@ -522,7 +569,8 @@ public class RedissonLiveObjectService implements RLiveObjectService {
public void registerClass(Class<?> cls) {
if (!classCache.containsKey(cls)) {
validateClass(cls);
registerClassInternal(cls);
Class<?> proxyClass = createProxy(cls, connectionManager.getCommandExecutor());
classCache.putIfAbsent(cls, proxyClass);
}
}
......@@ -585,11 +633,6 @@ public class RedissonLiveObjectService implements RLiveObjectService {
throw new IllegalArgumentException("Can't find default constructor for " + cls);
}
private <T> Class<? extends T> getProxyClass(Class<T> entityClass) {
registerClass(entityClass);
return (Class<? extends T>) classCache.get(entityClass);
}
private <T> void validateClass(Class<T> entityClass) {
if (entityClass.isAnonymousClass() || entityClass.isLocalClass()) {
throw new IllegalArgumentException(entityClass.getName() + " is not publically accessable.");
......@@ -641,7 +684,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
}
}
private <T> void registerClassInternal(Class<T> entityClass) {
private <T> Class<? extends T> createProxy(Class<T> entityClass, CommandAsyncExecutor commandExecutor) {
DynamicType.Builder<T> builder = new ByteBuddy()
.subclass(entityClass);
for (FieldDescription.InDefinedShape field
......@@ -659,8 +702,8 @@ public class RedissonLiveObjectService implements RLiveObjectService {
.withBinders(FieldProxy.Binder
.install(LiveObjectInterceptor.Getter.class,
LiveObjectInterceptor.Setter.class))
.to(new LiveObjectInterceptor(redisson, entityClass,
getRIdFieldName(entityClass), commandExecutor.getObjectBuilder())))
.to(new LiveObjectInterceptor(commandExecutor, connectionManager,
entityClass, getRIdFieldName(entityClass))))
// .intercept(MethodDelegation.to(
// new LiveObjectInterceptor(redisson, codecProvider, entityClass,
// getRIdFieldName(entityClass)))
......@@ -684,7 +727,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
.or(ElementMatchers.isDeclaredBy(RMap.class)))
.intercept(MethodDelegation.to(RMapInterceptor.class))
.implement(RMap.class)
.method(ElementMatchers.not(ElementMatchers.isDeclaredBy(Object.class))
.and(ElementMatchers.not(ElementMatchers.isDeclaredBy(RLiveObject.class)))
.and(ElementMatchers.not(ElementMatchers.isDeclaredBy(RExpirable.class)))
......@@ -698,11 +741,11 @@ public class RedissonLiveObjectService implements RLiveObjectService {
.and(ElementMatchers.isPublic()
.or(ElementMatchers.isProtected()))
)
.intercept(MethodDelegation.to(new AccessorInterceptor(redisson, commandExecutor.getObjectBuilder())))
.intercept(MethodDelegation.to(new AccessorInterceptor(commandExecutor, connectionManager)))
.make().load(entityClass.getClassLoader(),
ClassLoadingStrategy.Default.WRAPPER)
.getLoaded();
classCache.putIfAbsent(entityClass, proxied);
return proxied;
}
}
......@@ -16,6 +16,7 @@
package org.redisson.api;
import java.util.Collection;
import java.util.List;
import org.redisson.api.condition.Condition;
import org.redisson.api.condition.Conditions;
......@@ -109,15 +110,22 @@ public interface RLiveObjectService {
* <b>NON NULL</b> field values to the redis server. Only when the it does
* not already exist.
*
* If this object is not in redis then a new hash key will be created to
* store it.
*
* @param <T> Entity type
* @param detachedObject - not proxied object
* @return proxied object
*/
<T> T persist(T detachedObject);
/**
* Returns proxied attached objects for the detached objects. Stores all the
* <b>NON NULL</b> field values.
*
* @param <T> Entity type
* @param detachedObjects - not proxied objects
* @return list of proxied objects
*/
<T> List<T> persist(T... detachedObjects);
/**
* Returns unproxied detached object for the attached object.
*
......
......@@ -15,32 +15,21 @@
*/
package org.redisson.liveobject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.BiFunction;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.RedissonSet;
import org.redisson.RedissonSetMultimap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RSet;
import org.redisson.api.RSetMultimap;
import org.redisson.api.RedissonClient;
import org.redisson.api.condition.Condition;
import org.redisson.liveobject.condition.ANDCondition;
import org.redisson.liveobject.condition.EQCondition;
import org.redisson.liveobject.condition.GECondition;
import org.redisson.liveobject.condition.GTCondition;
import org.redisson.liveobject.condition.LECondition;
import org.redisson.liveobject.condition.LTCondition;
import org.redisson.liveobject.condition.ORCondition;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.liveobject.condition.*;
import org.redisson.liveobject.resolver.NamingScheme;
import java.util.*;
import java.util.Map.Entry;
import java.util.function.BiFunction;
/**
*
* @author Nikita Koksharov
......@@ -48,13 +37,10 @@ import org.redisson.liveobject.resolver.NamingScheme;
*/
public class LiveObjectSearch {
private final RedissonClient redisson;
private final RedissonObjectBuilder objectBuilder;
private final CommandAsyncExecutor commandExecutor;
public LiveObjectSearch(RedissonClient redisson, RedissonObjectBuilder objectBuilder) {
super();
this.redisson = redisson;
this.objectBuilder = objectBuilder;
public LiveObjectSearch(CommandAsyncExecutor commandExecutor) {
this.commandExecutor = commandExecutor;
}
private Set<Object> traverseAnd(ANDCondition condition, NamingScheme namingScheme, Class<?> entityClass) {
......@@ -74,10 +60,10 @@ public class LiveObjectSearch {
String indexName = namingScheme.getIndexName(entityClass, eqc.getName());
if (eqc.getValue() instanceof Number) {
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
eqNumericNames.put(values, (Number) eqc.getValue());
} else {
RSetMultimap<Object, Object> map = redisson.getSetMultimap(indexName, namingScheme.getCodec());
RSetMultimap<Object, Object> map = new RedissonSetMultimap<>(namingScheme.getCodec(), commandExecutor, indexName);
RSet<Object> values = map.get(eqc.getValue());
eqNames.add(values.getName());
}
......@@ -86,28 +72,28 @@ public class LiveObjectSearch {
LTCondition ltc = (LTCondition) cond;
String indexName = namingScheme.getIndexName(entityClass, ltc.getName());
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
ltNumericNames.put(values, ltc.getValue());
}
if (cond instanceof LECondition) {
LECondition lec = (LECondition) cond;
String indexName = namingScheme.getIndexName(entityClass, lec.getName());
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
leNumericNames.put(values, lec.getValue());
}
if (cond instanceof GECondition) {
GECondition gec = (GECondition) cond;
String indexName = namingScheme.getIndexName(entityClass, gec.getName());
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
geNumericNames.put(values, gec.getValue());
}
if (cond instanceof GTCondition) {
GTCondition gtc = (GTCondition) cond;
String indexName = namingScheme.getIndexName(entityClass, gtc.getName());
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
gtNumericNames.put(values, gtc.getValue());
}
......@@ -124,7 +110,7 @@ public class LiveObjectSearch {
}
if (!eqNames.isEmpty()) {
RSet<Object> set = redisson.getSet(eqNames.get(0));
RSet<Object> set = new RedissonSet<>(commandExecutor, eqNames.get(0), null);
Set<Object> intersect = set.readIntersection(eqNames.toArray(new String[eqNames.size()]));
if (!allIds.isEmpty()) {
allIds.retainAll(intersect);
......@@ -209,10 +195,10 @@ public class LiveObjectSearch {
String indexName = namingScheme.getIndexName(entityClass, eqc.getName());
if (eqc.getValue() instanceof Number) {
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
eqNumericNames.put(values, (Number) eqc.getValue());
} else {
RSetMultimap<Object, Object> map = redisson.getSetMultimap(indexName, namingScheme.getCodec());
RSetMultimap<Object, Object> map = new RedissonSetMultimap<>(namingScheme.getCodec(), commandExecutor, indexName);
RSet<Object> values = map.get(eqc.getValue());
eqNames.add(values.getName());
}
......@@ -221,28 +207,28 @@ public class LiveObjectSearch {
GTCondition gtc = (GTCondition) cond;
String indexName = namingScheme.getIndexName(entityClass, gtc.getName());
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
gtNumericNames.put(values, gtc.getValue());
}
if (cond instanceof GECondition) {
GECondition gec = (GECondition) cond;
String indexName = namingScheme.getIndexName(entityClass, gec.getName());
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
geNumericNames.put(values, gec.getValue());
}
if (cond instanceof LTCondition) {
LTCondition ltc = (LTCondition) cond;
String indexName = namingScheme.getIndexName(entityClass, ltc.getName());
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
ltNumericNames.put(values, ltc.getValue());
}
if (cond instanceof LECondition) {
LECondition lec = (LECondition) cond;
String indexName = namingScheme.getIndexName(entityClass, lec.getName());
RScoredSortedSet<Object> values = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> values = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
leNumericNames.put(values, lec.getValue());
}
if (cond instanceof ANDCondition) {
......@@ -252,7 +238,7 @@ public class LiveObjectSearch {
}
if (!eqNames.isEmpty()) {
RSet<Object> set = redisson.getSet(eqNames.get(0));
RSet<Object> set = new RedissonSet<>(commandExecutor, eqNames.get(0), null);
allIds.addAll(set.readUnion(eqNames.toArray(new String[eqNames.size()])));
}
......@@ -285,43 +271,43 @@ public class LiveObjectSearch {
}
public Set<Object> find(Class<?> entityClass, Condition condition) {
NamingScheme namingScheme = objectBuilder.getNamingScheme(entityClass);
NamingScheme namingScheme = commandExecutor.getObjectBuilder().getNamingScheme(entityClass);
if (condition instanceof EQCondition) {
EQCondition c = (EQCondition) condition;
String indexName = namingScheme.getIndexName(entityClass, c.getName());
if (c.getValue() instanceof Number) {
RScoredSortedSet<Object> set = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
double v = ((Number) c.getValue()).doubleValue();
Collection<Object> gtIds = set.valueRange(v, true, v, true);
return new HashSet<>(gtIds);
} else {
RSetMultimap<Object, Object> map = redisson.getSetMultimap(indexName, namingScheme.getCodec());
RSetMultimap<Object, Object> map = new RedissonSetMultimap<>(namingScheme.getCodec(), commandExecutor, indexName);
return map.getAll(c.getValue());
}
} else if (condition instanceof GTCondition) {
GTCondition c = (GTCondition) condition;
String indexName = namingScheme.getIndexName(entityClass, c.getName());
RScoredSortedSet<Object> set = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
Collection<Object> gtIds = set.valueRange(c.getValue().doubleValue(), false, Double.POSITIVE_INFINITY, false);
return new HashSet<>(gtIds);
} else if (condition instanceof GECondition) {
GECondition c = (GECondition) condition;
String indexName = namingScheme.getIndexName(entityClass, c.getName());
RScoredSortedSet<Object> set = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
Collection<Object> gtIds = set.valueRange(c.getValue().doubleValue(), true, Double.POSITIVE_INFINITY, false);
return new HashSet<>(gtIds);
} else if (condition instanceof LTCondition) {
LTCondition c = (LTCondition) condition;
String indexName = namingScheme.getIndexName(entityClass, c.getName());
RScoredSortedSet<Object> set = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
Collection<Object> gtIds = set.valueRange(Double.NEGATIVE_INFINITY, false, c.getValue().doubleValue(), false);
return new HashSet<>(gtIds);
} else if (condition instanceof LECondition) {
LECondition c = (LECondition) condition;
String indexName = namingScheme.getIndexName(entityClass, c.getName());
RScoredSortedSet<Object> set = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSet<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), commandExecutor, indexName, null);
Collection<Object> gtIds = set.valueRange(Double.NEGATIVE_INFINITY, false, c.getValue().doubleValue(), true);
return new HashSet<>(gtIds);
} else if (condition instanceof ORCondition) {
......
......@@ -22,16 +22,16 @@ import java.util.Map;
import java.util.concurrent.Callable;
import org.redisson.RedissonReference;
import org.redisson.api.RLiveObject;
import org.redisson.api.RMap;
import org.redisson.api.RObject;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RSetMultimap;
import org.redisson.api.RedissonClient;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.RedissonSetMultimap;
import org.redisson.api.*;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.REntity.TransformationMode;
import org.redisson.api.annotation.RId;
import org.redisson.api.annotation.RIndex;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
......@@ -53,12 +53,12 @@ import net.bytebuddy.implementation.bind.annotation.This;
*/
public class AccessorInterceptor {
private final RedissonClient redisson;
private final RedissonObjectBuilder objectBuilder;
private final CommandAsyncExecutor commandExecutor;
private final ConnectionManager connectionManager;
public AccessorInterceptor(RedissonClient redisson, RedissonObjectBuilder objectBuilder) {
this.redisson = redisson;
this.objectBuilder = objectBuilder;
public AccessorInterceptor(CommandAsyncExecutor commandExecutor, ConnectionManager connectionManager) {
this.commandExecutor = commandExecutor;
this.connectionManager = connectionManager;
}
@RuntimeType
......@@ -81,9 +81,9 @@ public class AccessorInterceptor {
if (isGetter(method, fieldName)) {
Object result = liveMap.get(fieldName);
if (result == null) {
RObject ar = objectBuilder.createObject(((RLiveObject) me).getLiveObjectId(), me.getClass().getSuperclass(), fieldType, fieldName);
RObject ar = connectionManager.getCommandExecutor().getObjectBuilder().createObject(((RLiveObject) me).getLiveObjectId(), me.getClass().getSuperclass(), fieldType, fieldName);
if (ar != null) {
objectBuilder.store(ar, fieldName, liveMap);
connectionManager.getCommandExecutor().getObjectBuilder().store(ar, fieldName, liveMap);
return ar;
}
}
......@@ -95,7 +95,7 @@ public class AccessorInterceptor {
return result;
}
if (result instanceof RedissonReference) {
return objectBuilder.fromReference((RedissonReference) result);
return connectionManager.getCommandExecutor().getObjectBuilder().fromReference((RedissonReference) result);
}
return result;
}
......@@ -111,10 +111,18 @@ public class AccessorInterceptor {
storeIndex(field, me, liveObject.getLiveObjectId());
Class<? extends Object> rEntity = liveObject.getClass().getSuperclass();
NamingScheme ns = objectBuilder.getNamingScheme(rEntity);
liveMap.fastPut(fieldName, new RedissonReference(rEntity,
ns.getName(rEntity, fieldType, getREntityIdFieldName(liveObject),
liveObject.getLiveObjectId())));
NamingScheme ns = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(rEntity);
if (commandExecutor instanceof CommandBatchService) {
liveMap.fastPutAsync(fieldName, new RedissonReference(rEntity,
ns.getName(rEntity, fieldType, getREntityIdFieldName(liveObject),
liveObject.getLiveObjectId())));
} else {
liveMap.fastPut(fieldName, new RedissonReference(rEntity,
ns.getName(rEntity, fieldType, getREntityIdFieldName(liveObject),
liveObject.getLiveObjectId())));
}
return me;
}
......@@ -123,7 +131,7 @@ public class AccessorInterceptor {
&& TransformationMode.ANNOTATION_BASED
.equals(ClassUtils.getAnnotation(me.getClass().getSuperclass(),
REntity.class).fieldTransformation())) {
RObject rObject = objectBuilder.createObject(((RLiveObject) me).getLiveObjectId(), me.getClass().getSuperclass(), arg.getClass(), fieldName);
RObject rObject = connectionManager.getCommandExecutor().getObjectBuilder().createObject(((RLiveObject) me).getLiveObjectId(), me.getClass().getSuperclass(), arg.getClass(), fieldName);
if (arg != null) {
if (rObject instanceof Collection) {
Collection<?> c = (Collection<?>) rObject;
......@@ -141,32 +149,45 @@ public class AccessorInterceptor {
}
if (arg instanceof RObject) {
objectBuilder.store((RObject) arg, fieldName, liveMap);
connectionManager.getCommandExecutor().getObjectBuilder().store((RObject) arg, fieldName, liveMap);
return me;
}
if (arg == null) {
Object oldArg = liveMap.remove(fieldName);
if (field.getAnnotation(RIndex.class) != null) {
NamingScheme namingScheme = objectBuilder.getNamingScheme(me.getClass().getSuperclass());
NamingScheme namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(me.getClass().getSuperclass());
String indexName = namingScheme.getIndexName(me.getClass().getSuperclass(), fieldName);
CommandBatchService ce;
if (commandExecutor instanceof CommandBatchService) {
ce = (CommandBatchService) commandExecutor;
} else {
ce = new CommandBatchService(connectionManager);
}
if (oldArg instanceof Number) {
RScoredSortedSet<Object> set = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
set.remove(((RLiveObject) me).getLiveObjectId());
RScoredSortedSetAsync<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), ce, indexName, null);
set.removeAsync(((RLiveObject) me).getLiveObjectId());
} else {
RSetMultimap<Object, Object> map = redisson.getSetMultimap(indexName, namingScheme.getCodec());
RMultimapAsync<Object, Object> map = new RedissonSetMultimap<>(namingScheme.getCodec(), ce, indexName);
if (oldArg instanceof RLiveObject) {
map.remove(((RLiveObject) oldArg).getLiveObjectId(), ((RLiveObject) me).getLiveObjectId());
map.removeAsync(((RLiveObject) oldArg).getLiveObjectId(), ((RLiveObject) me).getLiveObjectId());
} else {
map.remove(oldArg, ((RLiveObject) me).getLiveObjectId());
map.removeAsync(oldArg, ((RLiveObject) me).getLiveObjectId());
}
}
ce.execute();
}
} else {
storeIndex(field, me, arg);
liveMap.fastPut(fieldName, arg);
if (commandExecutor instanceof CommandBatchService) {
liveMap.fastPutAsync(fieldName, arg);
} else {
liveMap.fastPut(fieldName, arg);
}
}
return me;
}
......@@ -175,15 +196,24 @@ public class AccessorInterceptor {
protected void storeIndex(Field field, Object me, Object arg) {
if (field.getAnnotation(RIndex.class) != null) {
NamingScheme namingScheme = objectBuilder.getNamingScheme(me.getClass().getSuperclass());
NamingScheme namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(me.getClass().getSuperclass());
String indexName = namingScheme.getIndexName(me.getClass().getSuperclass(), field.getName());
CommandBatchService ce;
if (commandExecutor instanceof CommandBatchService) {
ce = (CommandBatchService) commandExecutor;
} else {
ce = new CommandBatchService(connectionManager);
}
if (arg instanceof Number) {
RScoredSortedSet<Object> set = redisson.getScoredSortedSet(indexName, namingScheme.getCodec());
set.add(((Number) arg).doubleValue(), ((RLiveObject) me).getLiveObjectId());
RScoredSortedSetAsync<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), ce, indexName, null);
set.addAsync(((Number) arg).doubleValue(), ((RLiveObject) me).getLiveObjectId());
} else {
RSetMultimap<Object, Object> map = redisson.getSetMultimap(indexName, namingScheme.getCodec());
map.put(arg, ((RLiveObject) me).getLiveObjectId());
RMultimapAsync<Object, Object> map = new RedissonSetMultimap<>(namingScheme.getCodec(), ce, indexName);
map.putAsync(arg, ((RLiveObject) me).getLiveObjectId());
}
ce.execute();
}
}
......
......@@ -15,29 +15,24 @@
*/
package org.redisson.liveobject.core;
import java.lang.reflect.Method;
import org.redisson.api.RBatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLiveObject;
import org.redisson.api.RMap;
import org.redisson.api.RMultimapAsync;
import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.api.RedissonClient;
import net.bytebuddy.description.field.FieldDescription.InDefinedShape;
import net.bytebuddy.description.field.FieldList;
import net.bytebuddy.implementation.bind.annotation.*;
import org.redisson.RedissonKeys;
import org.redisson.RedissonMap;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.RedissonSetMultimap;
import org.redisson.api.*;
import org.redisson.api.annotation.RIndex;
import org.redisson.client.RedisException;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
import net.bytebuddy.description.field.FieldDescription.InDefinedShape;
import net.bytebuddy.description.field.FieldList;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.FieldProxy;
import net.bytebuddy.implementation.bind.annotation.FieldValue;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import net.bytebuddy.implementation.bind.annotation.This;
import java.lang.reflect.Method;
/**
*
......@@ -56,21 +51,21 @@ public class LiveObjectInterceptor {
void setValue(Object value);
}
private final RedissonClient redisson;
private final CommandAsyncExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final Class<?> originalClass;
private final String idFieldName;
private final Class<?> idFieldType;
private final NamingScheme namingScheme;
private final RedissonObjectBuilder objectBuilder;
public LiveObjectInterceptor(RedissonClient redisson, Class<?> entityClass, String idFieldName, RedissonObjectBuilder objectBuilder) {
this.redisson = redisson;
public LiveObjectInterceptor(CommandAsyncExecutor commandExecutor, ConnectionManager connectionManager, Class<?> entityClass, String idFieldName) {
this.commandExecutor = commandExecutor;
this.connectionManager = connectionManager;
this.originalClass = entityClass;
this.idFieldName = idFieldName;
this.objectBuilder = objectBuilder;
namingScheme = objectBuilder.getNamingScheme(entityClass);
namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(entityClass);
try {
this.idFieldType = ClassUtils.getDeclaredField(originalClass, idFieldName).getType();
} catch (Exception e) {
......@@ -109,7 +104,9 @@ public class LiveObjectInterceptor {
//key may already renamed by others.
}
}
RMap<Object, Object> liveMap = redisson.getMap(idKey, namingScheme.getCodec());
RMap<Object, Object> liveMap = new RedissonMap<Object, Object>(namingScheme.getCodec(), commandExecutor,
idKey, null, null, null);
mapSetter.setValue(liveMap);
return null;
......@@ -124,24 +121,29 @@ public class LiveObjectInterceptor {
if ("delete".equals(method.getName())) {
FieldList<InDefinedShape> fields = Introspectior.getFieldsWithAnnotation(me.getClass().getSuperclass(), RIndex.class);
RBatch batch = redisson.createBatch();
CommandBatchService ce;
if (commandExecutor instanceof CommandBatchService) {
ce = (CommandBatchService) commandExecutor;
} else {
ce = new CommandBatchService(connectionManager);
}
for (InDefinedShape field : fields) {
String fieldName = field.getName();
Object value = map.get(fieldName);
NamingScheme namingScheme = objectBuilder.getNamingScheme(me.getClass().getSuperclass());
NamingScheme namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(me.getClass().getSuperclass());
String indexName = namingScheme.getIndexName(me.getClass().getSuperclass(), fieldName);
if (value instanceof Number) {
RScoredSortedSetAsync<Object> set = batch.getScoredSortedSet(indexName, namingScheme.getCodec());
RScoredSortedSetAsync<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), ce, indexName, null);
set.removeAsync(((RLiveObject) me).getLiveObjectId());
} else {
RMultimapAsync<Object, Object> idsMultimap = batch.getSetMultimap(indexName, namingScheme.getCodec());
RMultimapAsync<Object, Object> idsMultimap = new RedissonSetMultimap<>(namingScheme.getCodec(), ce, indexName);
idsMultimap.removeAsync(value, ((RLiveObject) me).getLiveObjectId());
}
}
RFuture<Long> deleteFuture = batch.getKeys().deleteAsync(map.getName());
batch.execute();
RFuture<Long> deleteFuture = new RedissonKeys(ce).deleteAsync(map.getName());
ce.execute();
return deleteFuture.getNow() > 0;
}
......
......@@ -15,8 +15,10 @@
*/
package org.redisson.liveobject.resolver;
import org.redisson.api.RedissonClient;
import org.redisson.RedissonAtomicLong;
import org.redisson.api.annotation.RId;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
/**
*
......@@ -27,10 +29,13 @@ public class LongGenerator implements RIdResolver<Long> {
public static final LongGenerator INSTANCE = new LongGenerator();
@Override
public Long resolve(Class<?> value, RId id, String idFieldName, RedissonClient redisson) {
return redisson.getAtomicLong(this.getClass().getCanonicalName()
+ "{" + value.getCanonicalName() + "}:" + idFieldName)
.incrementAndGet();
public Long resolve(Class<?> value, RId id, String idFieldName, CommandAsyncExecutor commandAsyncExecutor) {
if (commandAsyncExecutor instanceof CommandBatchService) {
throw new IllegalStateException("this generator couldn't be used in batch");
}
return new RedissonAtomicLong(commandAsyncExecutor, this.getClass().getCanonicalName()
+ "{" + value.getCanonicalName() + "}:" + idFieldName).incrementAndGet();
}
}
......@@ -15,8 +15,8 @@
*/
package org.redisson.liveobject.resolver;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RId;
import org.redisson.command.CommandAsyncExecutor;
/**
*
......@@ -32,9 +32,9 @@ public interface RIdResolver<V> {
* @param cls the class of the LiveObject.
* @param annotation the RId annotation used in the class.
* @param idFieldName field id
* @param redisson instance
* @param commandAsyncExecutor instance
* @return resolved RId field value.
*/
V resolve(Class<?> cls, RId annotation, String idFieldName, RedissonClient redisson);
V resolve(Class<?> cls, RId annotation, String idFieldName, CommandAsyncExecutor commandAsyncExecutor);
}
......@@ -15,8 +15,8 @@
*/
package org.redisson.liveobject.resolver;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RId;
import org.redisson.command.CommandAsyncExecutor;
/**
*
......@@ -28,7 +28,7 @@ public class RequiredIdResolver implements RIdResolver<Object> {
public static final RequiredIdResolver INSTANCE = new RequiredIdResolver();
@Override
public Object resolve(Class<?> cls, RId annotation, String idFieldName, RedissonClient redisson) {
public Object resolve(Class<?> cls, RId annotation, String idFieldName, CommandAsyncExecutor commandAsyncExecutor) {
throw new IllegalArgumentException("id value is not defined for instance of " + cls);
}
......
......@@ -15,10 +15,10 @@
*/
package org.redisson.liveobject.resolver;
import java.util.UUID;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RId;
import org.redisson.command.CommandAsyncExecutor;
import java.util.UUID;
/**
*
......@@ -29,7 +29,7 @@ public class UUIDGenerator implements RIdResolver<String>{
public static final UUIDGenerator INSTANCE = new UUIDGenerator();
@Override
public String resolve(Class<?> value, RId id, String idFieldName, RedissonClient redisson) {
public String resolve(Class<?> value, RId id, String idFieldName, CommandAsyncExecutor commandAsyncExecutor) {
return UUID.randomUUID().toString();
}
......
......@@ -28,19 +28,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RCascadeType;
import org.redisson.api.RDeque;
import org.redisson.api.RList;
import org.redisson.api.RLiveObject;
import org.redisson.api.RLiveObjectService;
import org.redisson.api.RMap;
import org.redisson.api.RObject;
import org.redisson.api.RQueue;
import org.redisson.api.RSet;
import org.redisson.api.RSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.*;
import org.redisson.api.annotation.RCascade;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RFieldAccessor;
......@@ -2040,6 +2028,24 @@ public class RedissonLiveObjectServiceTest extends BaseTest {
}
}
@Test
public void testBatchedPersist() {
RLiveObjectService s = redisson.getLiveObjectService();
List<TestREntity> objects = new ArrayList<>();
int objectsAmount = 1000000;
for (int i = 0; i < objectsAmount; i++) {
TestREntity e = new TestREntity();
e.setName("" + i);
e.setValue("value" + i);
objects.add(e);
}
List<Object> attachedObjects = s.persist(objects.toArray());
assertThat(attachedObjects).hasSize(objectsAmount);
assertThat(redisson.getKeys().count()).isEqualTo(objectsAmount);
}
@Test
public void testIsAccessor() {
HasIsAccessor o = new HasIsAccessor();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册