diff --git a/src/main/java/org/redisson/RedissonExecutorService.java b/src/main/java/org/redisson/RedissonExecutorService.java index 15015cbe8937b4b7e27365b796398fe794deff08..29657106ee7a3f11b5a1fcd82f32c41939785915 100644 --- a/src/main/java/org/redisson/RedissonExecutorService.java +++ b/src/main/java/org/redisson/RedissonExecutorService.java @@ -18,6 +18,8 @@ package org.redisson; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.Serializable; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -39,9 +41,9 @@ import org.redisson.api.RAtomicLong; import org.redisson.api.RBucket; import org.redisson.api.RExecutorService; import org.redisson.api.RKeys; -import org.redisson.api.RRemoteService; import org.redisson.api.RTopic; import org.redisson.api.RemoteInvocationOptions; +import org.redisson.api.annotation.RInject; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; @@ -57,6 +59,11 @@ import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; +/** + * + * @author Nikita Koksharov + * + */ public class RedissonExecutorService implements RExecutorService { public static final int SHUTDOWN_STATE = 1; @@ -97,24 +104,52 @@ public class RedissonExecutorService implements RExecutorService { topic = redisson.getTopic(objectName + ":topic"); keys = redisson.getKeys(); - RRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor); + ExecutorRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor); + remoteService.setTasksCounterName(tasksCounter.getName()); + remoteService.setStatusName(status.getName()); + asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck()); asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); } @Override public void registerExecutors(int executors) { - RemoteExecutorService service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, name); + String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}"; + + RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, objectName); + service.setStatusName(status.getName()); + service.setTasksCounterName(tasksCounter.getName()); + service.setTopicName(topic.getChannelNames().get(0)); + redisson.getRemoteSerivce(name, codec).register(RemoteExecutorService.class, service, executors); } @Override public void execute(Runnable task) { + check(task); byte[] classBody = getClassBody(task); + byte[] state = encode(task); + RemotePromise promise = (RemotePromise)asyncServiceWithoutResult.executeVoid(task.getClass().getName(), classBody, state); + execute(promise); + } + + private byte[] encode(Object task) { + // erase RedissonClient field to avoid its serialization + Field[] fields = task.getClass().getDeclaredFields(); + for (Field field : fields) { + if (RedissonClient.class.isAssignableFrom(field.getType()) + && field.isAnnotationPresent(RInject.class)) { + field.setAccessible(true); + try { + field.set(task, null); + } catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } + } + } + try { - byte[] state = codec.getValueEncoder().encode(task); - RemotePromise promise = (RemotePromise)asyncServiceWithoutResult.executeVoid(task.getClass().getName(), classBody, state); - check(promise); + return codec.getValueEncoder().encode(task); } catch (IOException e) { throw new IllegalArgumentException(e); } @@ -210,18 +245,24 @@ public class RedissonExecutorService implements RExecutorService { @Override public Future submit(Callable task) { + check(task); byte[] classBody = getClassBody(task); - try { - byte[] state = codec.getValueEncoder().encode(task); - RemotePromise promise = (RemotePromise)asyncService.execute(task.getClass().getName(), classBody, state); - check(promise); - return promise; - } catch (IOException e) { - throw new IllegalArgumentException(e); + byte[] state = encode(task); + RemotePromise promise = (RemotePromise)asyncService.execute(task.getClass().getName(), classBody, state); + execute(promise); + return promise; + } + + private void check(Object task) { + if (task.getClass().isAnonymousClass()) { + throw new IllegalArgumentException("Task can't be created using anonymous class"); + } + if (!Serializable.class.isAssignableFrom(task.getClass())) { + throw new IllegalArgumentException("Task class should implement Serializable interface"); } } - private void check(RemotePromise promise) { + private void execute(RemotePromise promise) { io.netty.util.concurrent.Future addFuture = promise.getAddFuture(); addFuture.syncUninterruptibly(); Boolean res = addFuture.getNow(); @@ -249,15 +290,12 @@ public class RedissonExecutorService implements RExecutorService { @Override public Future submit(Runnable task) { + check(task); byte[] classBody = getClassBody(task); - try { - byte[] state = codec.getValueEncoder().encode(task); - RemotePromise promise = (RemotePromise) asyncService.executeVoid(task.getClass().getName(), classBody, state); - check(promise); - return promise; - } catch (IOException e) { - throw new IllegalArgumentException(e); - } + byte[] state = encode(task); + RemotePromise promise = (RemotePromise) asyncService.executeVoid(task.getClass().getName(), classBody, state); + execute(promise); + return promise; } private T doInvokeAny(Collection> tasks, diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index e372ecfd27d0c55172a0e63b9414328a2d7d8379..f9b57fad5c4235c1b2893cffd47604f921e36183 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -68,7 +68,7 @@ public class RedissonRemoteService implements RRemoteService { private final Map beans = PlatformDependent.newConcurrentHashMap(); - private final Codec codec; + protected final Codec codec; protected final Redisson redisson; protected final String name; protected final CommandExecutor commandExecutor; diff --git a/src/main/java/org/redisson/api/RExecutorService.java b/src/main/java/org/redisson/api/RExecutorService.java index a5b67066db6e6e12f0a92c5259da4f126a73ace3..d1ede86ba3ee551e71796deb04c9677e26120a45 100644 --- a/src/main/java/org/redisson/api/RExecutorService.java +++ b/src/main/java/org/redisson/api/RExecutorService.java @@ -17,6 +17,11 @@ package org.redisson.api; import java.util.concurrent.ExecutorService; +/** + * + * @author Nikita Koksharov + * + */ public interface RExecutorService extends ExecutorService { String getName(); diff --git a/src/main/java/org/redisson/api/annotation/RInject.java b/src/main/java/org/redisson/api/annotation/RInject.java new file mode 100644 index 0000000000000000000000000000000000000000..9a90423ec983adeee88fdd5f9dc2722e2b3a39ca --- /dev/null +++ b/src/main/java/org/redisson/api/annotation/RInject.java @@ -0,0 +1,32 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * + * @author Nikita Koksharov + * + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface RInject { + +} diff --git a/src/main/java/org/redisson/executor/ClassLoaderDelegator.java b/src/main/java/org/redisson/executor/ClassLoaderDelegator.java index ba501b5d043e2ca9ab6771ccfa0ec04e14b186a5..105e55c41d9f0bac554362a3f30f5b587ed58631 100644 --- a/src/main/java/org/redisson/executor/ClassLoaderDelegator.java +++ b/src/main/java/org/redisson/executor/ClassLoaderDelegator.java @@ -20,6 +20,11 @@ import java.io.InputStream; import java.net.URL; import java.util.Enumeration; +/** + * + * @author Nikita Koksharov + * + */ public class ClassLoaderDelegator extends ClassLoader { private final ThreadLocal threadLocalClassLoader = new ThreadLocal(); diff --git a/src/main/java/org/redisson/executor/ExecutorRemoteService.java b/src/main/java/org/redisson/executor/ExecutorRemoteService.java index 1afe6e32a9095bfc7dcad43bb2d26c7dea2025d8..3872b5402103e07280227f479bf72e79e6e0f4fb 100644 --- a/src/main/java/org/redisson/executor/ExecutorRemoteService.java +++ b/src/main/java/org/redisson/executor/ExecutorRemoteService.java @@ -19,7 +19,6 @@ import java.util.Arrays; import org.redisson.Redisson; import org.redisson.RedissonRemoteService; -import org.redisson.api.RAtomicLong; import org.redisson.api.RBlockingQueue; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; @@ -31,17 +30,26 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; +/** + * + * @author Nikita Koksharov + * + */ public class ExecutorRemoteService extends RedissonRemoteService { - private final RAtomicLong tasksCounter; - private final RAtomicLong status; + private String tasksCounterName; + private String statusName; public ExecutorRemoteService(Codec codec, Redisson redisson, String name, CommandExecutor commandExecutor) { super(codec, redisson, name, commandExecutor); - - String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}"; - tasksCounter = redisson.getAtomicLong(objectName + ":counter"); - status = redisson.getAtomicLong(objectName + ":status"); + } + + public void setStatusName(String statusName) { + this.statusName = statusName; + } + + public void setTasksCounterName(String tasksCounterName) { + this.tasksCounterName = tasksCounterName; } @Override @@ -55,7 +63,7 @@ public class ExecutorRemoteService extends RedissonRemoteService { + "return 1;" + "end;" + "return 0;", - Arrays.asList(tasksCounter.getName(), status.getName(), requestQueue.getName()), + Arrays.asList(tasksCounterName, statusName, requestQueue.getName()), encode(request)); result.setAddFuture(future); diff --git a/src/main/java/org/redisson/executor/RedissonClassLoader.java b/src/main/java/org/redisson/executor/RedissonClassLoader.java index 5fceb2b867babb750d7ffca7476a286dfc92bf91..1685acaa098f688dee759bdb778be0bd053a0dca 100644 --- a/src/main/java/org/redisson/executor/RedissonClassLoader.java +++ b/src/main/java/org/redisson/executor/RedissonClassLoader.java @@ -15,6 +15,11 @@ */ package org.redisson.executor; +/** + * + * @author Nikita Koksharov + * + */ public class RedissonClassLoader extends ClassLoader { public RedissonClassLoader(ClassLoader parent) { diff --git a/src/main/java/org/redisson/executor/RemoteExecutorService.java b/src/main/java/org/redisson/executor/RemoteExecutorService.java index ffced181f4cc9e0ea392213e25275ea79ea8cfbb..50fdc0006b88d21d15ed81ccd009777e55a920c3 100644 --- a/src/main/java/org/redisson/executor/RemoteExecutorService.java +++ b/src/main/java/org/redisson/executor/RemoteExecutorService.java @@ -15,6 +15,11 @@ */ package org.redisson.executor; +/** + * + * @author Nikita Koksharov + * + */ public interface RemoteExecutorService { Object execute(String className, byte[] classBody, byte[] state); diff --git a/src/main/java/org/redisson/executor/RemoteExecutorServiceAsync.java b/src/main/java/org/redisson/executor/RemoteExecutorServiceAsync.java index 039ec895366c5a0d239c13b95326018948b6c14f..7d769858036e2394a75a59b0d6016176845adb1c 100644 --- a/src/main/java/org/redisson/executor/RemoteExecutorServiceAsync.java +++ b/src/main/java/org/redisson/executor/RemoteExecutorServiceAsync.java @@ -19,6 +19,11 @@ import org.redisson.remote.RRemoteAsync; import io.netty.util.concurrent.Future; +/** + * + * @author Nikita Koksharov + * + */ @RRemoteAsync(RemoteExecutorService.class) public interface RemoteExecutorServiceAsync { diff --git a/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java b/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java index b6202909051ac9ca2fd064c8cbdc82e4eb2d3929..574355a32ec77d63ae2949e6bf7743f28020044c 100644 --- a/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java +++ b/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java @@ -15,14 +15,14 @@ */ package org.redisson.executor; +import java.io.IOException; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.concurrent.Callable; import org.redisson.RedissonClient; import org.redisson.RedissonExecutorService; -import org.redisson.api.RAtomicLong; -import org.redisson.api.RBucket; -import org.redisson.api.RTopic; +import org.redisson.api.annotation.RInject; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; @@ -31,6 +31,11 @@ import org.redisson.command.CommandExecutor; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +/** + * + * @author Nikita Koksharov + * + */ public class RemoteExecutorServiceImpl implements RemoteExecutorService { private final ClassLoaderDelegator classLoader = new ClassLoaderDelegator(); @@ -39,17 +44,15 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService { private final String name; private final CommandExecutor commandExecutor; - private final RAtomicLong tasksCounter; - private final RBucket status; - private final RTopic topic; + private final RedissonClient redisson; + private String tasksCounterName; + private String statusName; + private String topicName; public RemoteExecutorServiceImpl(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name) { this.commandExecutor = commandExecutor; - - this.name = name + ":{"+ RemoteExecutorService.class.getName() + "}"; - tasksCounter = redisson.getAtomicLong(this.name + ":counter"); - status = redisson.getBucket(this.name + ":status"); - topic = redisson.getTopic(this.name + ":topic"); + this.name = name; + this.redisson = redisson; try { this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader); @@ -57,6 +60,18 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService { throw new IllegalStateException(e); } } + + public void setTasksCounterName(String tasksCounterName) { + this.tasksCounterName = tasksCounterName; + } + + public void setStatusName(String statusName) { + this.statusName = statusName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } @Override public Object execute(String className, byte[] classBody, byte[] state) { @@ -68,7 +83,7 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService { cl.loadClass(className, classBody); classLoader.setCurrentClassLoader(cl); - Callable callable = (Callable) codec.getValueDecoder().decode(buf, null); + Callable callable = decode(buf); return callable.call(); } catch (Exception e) { throw new IllegalArgumentException(e); @@ -78,6 +93,23 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService { } } + private T decode(ByteBuf buf) throws IOException { + T task = (T) codec.getValueDecoder().decode(buf, null); + Field[] fields = task.getClass().getDeclaredFields(); + for (Field field : fields) { + if (RedissonClient.class.isAssignableFrom(field.getType()) + && field.isAnnotationPresent(RInject.class)) { + field.setAccessible(true); + try { + field.set(task, redisson); + } catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } + } + } + return task; + } + @Override public void executeVoid(String className, byte[] classBody, byte[] state) { ByteBuf buf = null; @@ -88,7 +120,7 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService { cl.loadClass(className, classBody); classLoader.setCurrentClassLoader(cl); - Runnable runnable = (Runnable) codec.getValueDecoder().decode(buf, null); + Runnable runnable = decode(buf); runnable.run(); } catch (Exception e) { throw new IllegalArgumentException(e); @@ -106,7 +138,7 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService { + "redis.call('set', KEYS[2], ARGV[2]);" + "redis.call('publish', KEYS[3], ARGV[2]);" + "end;", - Arrays.asList(tasksCounter.getName(), status.getName(), topic.getChannelNames().get(0)), + Arrays.asList(tasksCounterName, statusName, topicName), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } diff --git a/src/main/java/org/redisson/executor/RemotePromise.java b/src/main/java/org/redisson/executor/RemotePromise.java index 1bbd90ea4ebb7ff01202e9772e65218f576283a8..fb0fc6e83c4e0f30ca2dfd5018f74fd1bb815889 100644 --- a/src/main/java/org/redisson/executor/RemotePromise.java +++ b/src/main/java/org/redisson/executor/RemotePromise.java @@ -20,6 +20,11 @@ import org.redisson.misc.PromiseDelegator; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +/** + * + * @author Nikita Koksharov + * + */ public class RemotePromise extends PromiseDelegator { private Future addFuture; diff --git a/src/test/java/org/redisson/executor/CallableRedissonTask.java b/src/test/java/org/redisson/executor/CallableRedissonTask.java new file mode 100644 index 0000000000000000000000000000000000000000..12f0dd80089166bd80b7b82ae34ced0d6608893c --- /dev/null +++ b/src/test/java/org/redisson/executor/CallableRedissonTask.java @@ -0,0 +1,30 @@ +package org.redisson.executor; + +import java.io.Serializable; +import java.util.concurrent.Callable; + +import org.redisson.RedissonClient; +import org.redisson.api.annotation.RInject; + +public class CallableRedissonTask implements Callable, Serializable { + + private static final long serialVersionUID = 8875732248655428049L; + + private Long incrementValue; + + @RInject + private RedissonClient redissonClient; + + public CallableRedissonTask() { + } + + public CallableRedissonTask(Long incrementValue) { + this.incrementValue = incrementValue; + } + + @Override + public Long call() throws Exception { + return redissonClient.getAtomicLong("counter").addAndGet(incrementValue); + } + +} diff --git a/src/test/java/org/redisson/executor/RedissonCallableTask.java b/src/test/java/org/redisson/executor/CallableTask.java similarity index 80% rename from src/test/java/org/redisson/executor/RedissonCallableTask.java rename to src/test/java/org/redisson/executor/CallableTask.java index 069c7fa41d368f399582a4c90cf953b756c5cca6..e230d4edd807bc29e88bc819cd8b5ffa9f7cc52e 100644 --- a/src/test/java/org/redisson/executor/RedissonCallableTask.java +++ b/src/test/java/org/redisson/executor/CallableTask.java @@ -3,7 +3,7 @@ package org.redisson.executor; import java.io.Serializable; import java.util.concurrent.Callable; -public class RedissonCallableTask implements Callable, Serializable { +public class CallableTask implements Callable, Serializable { public static final String RESULT = "callable"; diff --git a/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index b88b684e26de7ec1a14af9f275b953a57a63b472..4d44c2f7596983224393730d15fec3ab066f299b 100644 --- a/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -3,6 +3,7 @@ package org.redisson.executor; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -41,35 +42,35 @@ public class RedissonExecutorServiceTest extends BaseTest { } @Test - public void test2() throws InterruptedException, ExecutionException, TimeoutException { + public void testMultipleTasks() throws InterruptedException, ExecutionException, TimeoutException { RExecutorService e = redisson.getExecutorService(); - e.execute(new RedissonRunnableTask()); - Future f = e.submit(new RedissonRunnableTask2()); + e.execute(new RunnableTask()); + Future f = e.submit(new RunnableTask2()); f.get(); - Future fs = e.submit(new RedissonCallableTask()); - assertThat(fs.get()).isEqualTo(RedissonCallableTask.RESULT); + Future fs = e.submit(new CallableTask()); + assertThat(fs.get()).isEqualTo(CallableTask.RESULT); - Future f2 = e.submit(new RedissonRunnableTask(), 12); + Future f2 = e.submit(new RunnableTask(), 12); assertThat(f2.get()).isEqualTo(12); - String invokeResult = e.invokeAny(Arrays.asList(new RedissonCallableTask(), new RedissonCallableTask(), new RedissonCallableTask())); - assertThat(invokeResult).isEqualTo(RedissonCallableTask.RESULT); + String invokeResult = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask())); + assertThat(invokeResult).isEqualTo(CallableTask.RESULT); - String a = e.invokeAny(Arrays.asList(new RedissonCallableTask(), new RedissonCallableTask(), new RedissonCallableTask()), 1, TimeUnit.SECONDS); - assertThat(a).isEqualTo(RedissonCallableTask.RESULT); + String a = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()), 1, TimeUnit.SECONDS); + assertThat(a).isEqualTo(CallableTask.RESULT); - List invokeAllParams = Arrays.asList(new RedissonCallableTask(), new RedissonCallableTask(), new RedissonCallableTask()); + List invokeAllParams = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()); List> allResult = e.invokeAll(invokeAllParams); assertThat(allResult).hasSize(invokeAllParams.size()); for (Future future : allResult) { - assertThat(future.get()).isEqualTo(RedissonCallableTask.RESULT); + assertThat(future.get()).isEqualTo(CallableTask.RESULT); } - List invokeAllParams1 = Arrays.asList(new RedissonCallableTask(), new RedissonCallableTask(), new RedissonCallableTask()); + List invokeAllParams1 = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()); List> allResult1 = e.invokeAll(invokeAllParams1, 1, TimeUnit.SECONDS); assertThat(allResult1).hasSize(invokeAllParams.size()); for (Future future : allResult1) { - assertThat(future.get()).isEqualTo(RedissonCallableTask.RESULT); + assertThat(future.get()).isEqualTo(CallableTask.RESULT); } } @@ -77,49 +78,49 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test(expected = RejectedExecutionException.class) public void testRejectExecute() throws InterruptedException, ExecutionException { RExecutorService e = redisson.getExecutorService(); - e.execute(new RedissonRunnableTask()); - Future f1 = e.submit(new RedissonRunnableTask2()); - Future f2 = e.submit(new RedissonCallableTask()); + e.execute(new RunnableTask()); + Future f1 = e.submit(new RunnableTask2()); + Future f2 = e.submit(new CallableTask()); e.shutdown(); f1.get(); - assertThat(f2.get()).isEqualTo(RedissonCallableTask.RESULT); + assertThat(f2.get()).isEqualTo(CallableTask.RESULT); assertThat(e.isShutdown()).isTrue(); - e.execute(new RedissonRunnableTask()); + e.execute(new RunnableTask()); } @Test(expected = RejectedExecutionException.class) public void testRejectSubmitRunnable() throws InterruptedException, ExecutionException { RExecutorService e = redisson.getExecutorService(); - e.execute(new RedissonRunnableTask()); - Future f1 = e.submit(new RedissonRunnableTask2()); - Future f2 = e.submit(new RedissonCallableTask()); + e.execute(new RunnableTask()); + Future f1 = e.submit(new RunnableTask2()); + Future f2 = e.submit(new CallableTask()); e.shutdown(); f1.get(); - assertThat(f2.get()).isEqualTo(RedissonCallableTask.RESULT); + assertThat(f2.get()).isEqualTo(CallableTask.RESULT); assertThat(e.isShutdown()).isTrue(); - e.submit(new RedissonRunnableTask2()); + e.submit(new RunnableTask2()); } @Test(expected = RejectedExecutionException.class) public void testRejectSubmitCallable() throws InterruptedException, ExecutionException { RExecutorService e = redisson.getExecutorService(); - e.execute(new RedissonRunnableTask()); - Future f1 = e.submit(new RedissonRunnableTask2()); - Future f2 = e.submit(new RedissonCallableTask()); + e.execute(new RunnableTask()); + Future f1 = e.submit(new RunnableTask2()); + Future f2 = e.submit(new CallableTask()); e.shutdown(); f1.get(); - assertThat(f2.get()).isEqualTo(RedissonCallableTask.RESULT); + assertThat(f2.get()).isEqualTo(CallableTask.RESULT); assertThat(e.isShutdown()).isTrue(); - e.submit(new RedissonCallableTask()); + e.submit(new CallableTask()); } @Test(expected = RejectedExecutionException.class) @@ -128,7 +129,7 @@ public class RedissonExecutorServiceTest extends BaseTest { e.shutdown(); assertThat(e.isShutdown()).isTrue(); - e.submit(new RedissonRunnableTask2()); + e.submit(new RunnableTask2()); } @@ -137,7 +138,7 @@ public class RedissonExecutorServiceTest extends BaseTest { RExecutorService e = redisson.getExecutorService(); assertThat(e.isShutdown()).isFalse(); assertThat(e.isTerminated()).isFalse(); - e.execute(new RedissonRunnableTask()); + e.execute(new RunnableTask()); e.shutdown(); assertThat(e.isShutdown()).isTrue(); assertThat(e.awaitTermination(5, TimeUnit.SECONDS)).isTrue(); @@ -159,7 +160,7 @@ public class RedissonExecutorServiceTest extends BaseTest { public void testResetShutdownState() throws InterruptedException, ExecutionException { for (int i = 0; i < 100; i++) { RExecutorService e = redisson.getExecutorService(); - e.execute(new RedissonRunnableTask()); + e.execute(new RunnableTask()); assertThat(e.isShutdown()).isFalse(); e.shutdown(); assertThat(e.isShutdown()).isTrue(); @@ -168,9 +169,51 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(e.delete()).isTrue(); assertThat(e.isShutdown()).isFalse(); assertThat(e.isTerminated()).isFalse(); - Future future = e.submit(new RedissonRunnableTask()); + Future future = e.submit(new RunnableTask()); future.get(); } } + @Test + public void testRedissonInjected() throws InterruptedException, ExecutionException { + Future s1 = redisson.getExecutorService().submit(new CallableRedissonTask(1L)); + Future s2 = redisson.getExecutorService().submit(new CallableRedissonTask(2L)); + Future s3 = redisson.getExecutorService().submit(new CallableRedissonTask(30L)); + Future s4 = (Future) redisson.getExecutorService().submit(new RunnableRedissonTask()); + + List results = Arrays.asList(s1.get(), s2.get(), s3.get()); + assertThat(results).containsOnlyOnce(33L); + + s4.get(); + assertThat(redisson.getAtomicLong("runnableCounter").get()).isEqualTo(100L); + } + + @Test(expected = IllegalArgumentException.class) + public void testAnonymousRunnable() { + redisson.getExecutorService().submit(new Runnable() { + @Override + public void run() { + } + }); + } + + @Test(expected = IllegalArgumentException.class) + public void testAnonymousCallable() { + redisson.getExecutorService().submit(new Callable() { + @Override + public Object call() throws Exception { + return null; + } + }); + } + + @Test(expected = IllegalArgumentException.class) + public void testAnonymousRunnableExecute() { + redisson.getExecutorService().execute(new Runnable() { + @Override + public void run() { + } + }); + } + } diff --git a/src/test/java/org/redisson/executor/RunnableRedissonTask.java b/src/test/java/org/redisson/executor/RunnableRedissonTask.java new file mode 100644 index 0000000000000000000000000000000000000000..6c052236c6bf05e5a506424753f462ab7f48566f --- /dev/null +++ b/src/test/java/org/redisson/executor/RunnableRedissonTask.java @@ -0,0 +1,20 @@ +package org.redisson.executor; + +import java.io.Serializable; + +import org.redisson.RedissonClient; +import org.redisson.api.annotation.RInject; + +public class RunnableRedissonTask implements Runnable, Serializable { + + private static final long serialVersionUID = 4165626916136893351L; + + @RInject + private RedissonClient redissonClient; + + @Override + public void run() { + redissonClient.getAtomicLong("runnableCounter").addAndGet(100); + } + +} diff --git a/src/test/java/org/redisson/executor/RedissonRunnableTask.java b/src/test/java/org/redisson/executor/RunnableTask.java similarity index 78% rename from src/test/java/org/redisson/executor/RedissonRunnableTask.java rename to src/test/java/org/redisson/executor/RunnableTask.java index e2af075e30c75283cc9a52c71df3cb829f4542de..773da2372dd2acf18480f52450b49cebe5583aa0 100644 --- a/src/test/java/org/redisson/executor/RedissonRunnableTask.java +++ b/src/test/java/org/redisson/executor/RunnableTask.java @@ -2,7 +2,7 @@ package org.redisson.executor; import java.io.Serializable; -public class RedissonRunnableTask implements Runnable, Serializable { +public class RunnableTask implements Runnable, Serializable { private static final long serialVersionUID = 2105094575950438867L; diff --git a/src/test/java/org/redisson/executor/RedissonRunnableTask2.java b/src/test/java/org/redisson/executor/RunnableTask2.java similarity index 78% rename from src/test/java/org/redisson/executor/RedissonRunnableTask2.java rename to src/test/java/org/redisson/executor/RunnableTask2.java index c66481330633b98d0b5c36183286a22a842dc4aa..16a692161517f21a2fcb9bf33bcbd8ef02e5b015 100644 --- a/src/test/java/org/redisson/executor/RedissonRunnableTask2.java +++ b/src/test/java/org/redisson/executor/RunnableTask2.java @@ -2,7 +2,7 @@ package org.redisson.executor; import java.io.Serializable; -public class RedissonRunnableTask2 implements Runnable, Serializable { +public class RunnableTask2 implements Runnable, Serializable { private static final long serialVersionUID = 2105094575950438867L;