提交 2bcddb1d 编写于 作者: N Nikita Koksharov

Feature - allow to use Spring's @Autowired, @Value and JSR-330 @Inject...

Feature - allow to use Spring's @Autowired, @Value and JSR-330 @Inject annotations in ExecutorService tasks. #1657
上级 f58c0836
......@@ -57,6 +57,7 @@ import org.redisson.api.RScheduledFuture;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.WorkerOptions;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
......@@ -237,11 +238,15 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public void registerWorkers(int workers) {
registerWorkers(workers, commandExecutor.getConnectionManager().getExecutor());
registerWorkers(WorkerOptions.defaults().workers(workers));
}
@Override
public void registerWorkers(int workers, ExecutorService executor) {
public void registerWorkers(WorkerOptions options) {
if (options.getWorkers() == 0) {
throw new IllegalArgumentException("workers amount can't be zero");
}
QueueTransferTask task = new QueueTransferTask(connectionManager) {
@Override
protected RTopic getTopic() {
......@@ -306,17 +311,28 @@ public class RedissonExecutorService implements RScheduledExecutorService {
service.setSchedulerChannelName(schedulerChannelName);
service.setSchedulerQueueName(schedulerQueueName);
service.setTasksRetryIntervalName(tasksRetryIntervalName);
service.setBeanFactory(options.getBeanFactory());
ExecutorService es = commandExecutor.getConnectionManager().getExecutor();
if (options.getExecutorService() != null) {
es = options.getExecutorService();
}
remoteService.register(RemoteExecutorService.class, service, workers, executor);
remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es);
workersGroupListenerId = workersTopic.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String id) {
redisson.getAtomicLong(workersCounterName + ":" + id).getAndAdd(workers);
redisson.getAtomicLong(workersCounterName + ":" + id).getAndAdd(options.getWorkers());
redisson.getSemaphore(workersSemaphoreName + ":" + id).release();
}
});
}
@Override
public void registerWorkers(int workers, ExecutorService executor) {
registerWorkers(WorkerOptions.defaults().workers(workers).executorService(executor));
}
@Override
public void execute(Runnable task) {
check(task);
......
......@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.WorkerOptions;
import org.redisson.client.RedisConnection;
import org.redisson.config.RedissonNodeConfig;
import org.redisson.connection.ConnectionManager;
......@@ -139,14 +140,24 @@ public final class RedissonNode {
if (mapReduceWorkers == 0) {
mapReduceWorkers = Runtime.getRuntime().availableProcessors();
}
redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME).registerWorkers(mapReduceWorkers);
WorkerOptions options = WorkerOptions.defaults()
.workers(mapReduceWorkers)
.beanFactory(config.getBeanFactory());
redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME).registerWorkers(options);
log.info("{} map reduce worker(s) registered", mapReduceWorkers);
}
for (Entry<String, Integer> entry : config.getExecutorServiceWorkers().entrySet()) {
String name = entry.getKey();
int workers = entry.getValue();
redisson.getExecutorService(name).registerWorkers(workers);
WorkerOptions options = WorkerOptions.defaults()
.workers(workers)
.beanFactory(config.getBeanFactory());
redisson.getExecutorService(name).registerWorkers(options);
log.info("{} worker(s) registered for ExecutorService with '{}' name", workers, name);
}
......
......@@ -102,21 +102,27 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
*/
boolean delete();
/**
* Register workers
/*
* Use {@link #registerWorkers(WorkerOptions)} setting instead
*
* @param workers - workers amount
*/
@Deprecated
void registerWorkers(int workers);
/**
* Register workers with custom executor
/*
* Use {@link #registerWorkers(WorkerOptions)} setting instead
*
* @param workers - workers amount
* @param executor - executor instance
*/
@Deprecated
void registerWorkers(int workers, ExecutorService executor);
/**
* Register workers
*
* @param options - worker options
*/
void registerWorkers(WorkerOptions options);
/**
* Returns active worker groups
*
......
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.ExecutorService;
import org.redisson.config.Config;
import org.springframework.beans.factory.BeanFactory;
/**
* Configuration for RExecutorService workers.
*
* @author Nikita Koksharov
*
*/
public final class WorkerOptions {
private int workers;
private ExecutorService executorService;
private BeanFactory beanFactory;
private WorkerOptions() {
}
public static WorkerOptions defaults() {
return new WorkerOptions();
}
public int getWorkers() {
return workers;
}
/**
* Defines workers amount used to execute tasks.
*
* @param workers - workers amount
* @return self instance
*/
public WorkerOptions workers(int workers) {
this.workers = workers;
return this;
}
public BeanFactory getBeanFactory() {
return beanFactory;
}
/**
* Defines Spring BeanFactory instance to execute tasks with Spring's '@Autowired',
* '@Value' or JSR-330's '@Inject' annotation.
*
* @param beanFactory - Spring BeanFactory instance
* @return self instance
*/
public WorkerOptions beanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
return this;
}
public ExecutorService getExecutorService() {
return executorService;
}
/**
* Defines custom ExecutorService to execute tasks.
* {@link Config#setExecutor(ExecutorService)} is used by default.
*
* @param executorService - custom ExecutorService
* @return self instance
*/
public WorkerOptions executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
}
......@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import org.redisson.api.RedissonNodeInitializer;
import org.springframework.beans.factory.BeanFactory;
/**
* Redisson Node configuration
......@@ -32,6 +33,7 @@ public class RedissonNodeConfig extends Config {
private int mapReduceWorkers = 0;
private RedissonNodeInitializer redissonNodeInitializer;
private BeanFactory beanFactory;
private Map<String, Integer> executorServiceWorkers = new HashMap<String, Integer>();
public RedissonNodeConfig() {
......@@ -47,6 +49,7 @@ public class RedissonNodeConfig extends Config {
this.executorServiceWorkers = new HashMap<String, Integer>(oldConf.executorServiceWorkers);
this.redissonNodeInitializer = oldConf.redissonNodeInitializer;
this.mapReduceWorkers = oldConf.mapReduceWorkers;
this.beanFactory = oldConf.beanFactory;
}
/**
......@@ -98,6 +101,19 @@ public class RedissonNodeConfig extends Config {
return redissonNodeInitializer;
}
public BeanFactory getBeanFactory() {
return beanFactory;
}
/**
* Defines Spring Bean Factory instance to execute tasks with Spring's '@Autowired',
* '@Value' or JSR-330's '@Inject' annotation.
*
* @param beanFactory - Spring BeanFactory instance
*/
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
/**
* Read config object stored in JSON format from <code>File</code>
......
......@@ -50,6 +50,8 @@ import org.redisson.misc.HashValue;
import org.redisson.misc.Injector;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
......@@ -79,6 +81,7 @@ public class TasksRunnerService implements RemoteExecutorService {
private String schedulerQueueName;
private String schedulerChannelName;
private String tasksRetryIntervalName;
private BeanFactory beanFactory;
private ConcurrentMap<String, ResponseEntry> responses;
public TasksRunnerService(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name, ConcurrentMap<String, ResponseEntry> responses) {
......@@ -90,6 +93,10 @@ public class TasksRunnerService implements RemoteExecutorService {
this.codec = codec;
}
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
public void setTasksRetryIntervalName(String tasksRetryInterval) {
this.tasksRetryIntervalName = tasksRetryInterval;
}
......@@ -292,6 +299,13 @@ public class TasksRunnerService implements RemoteExecutorService {
}
Injector.inject(task, redisson);
if (beanFactory != null) {
AutowiredAnnotationBeanPostProcessor bpp = new AutowiredAnnotationBeanPostProcessor();
bpp.setBeanFactory(beanFactory);
bpp.processInjection(task);
}
return task;
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
......
package org.redisson.executor;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.Callable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.RedissonNode;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;
public class RedissonExecutorServiceSpringTest extends BaseTest {
public static class SampleRunnable implements Runnable, Serializable {
@Autowired
private SampleBean bean;
@RInject
private RedissonClient redisson;
public SampleRunnable() {
}
@Override
public void run() {
String res = bean.myMethod("runnable");
redisson.getBucket("result").set(res);
}
}
public static class SampleCallable implements Callable<String>, Serializable {
@Autowired
private SampleBean bean;
public SampleCallable() {
}
@Override
public String call() throws Exception {
return bean.myMethod("callable");
}
}
@Service
public static class SampleBean {
public String myMethod(String key) {
return "hello " + key;
}
}
private static final String EXECUTOR_NAME = "spring_test";
@Configuration
@ComponentScan
public static class Application {
@Bean(destroyMethod = "shutdown")
RedissonNode redissonNode(BeanFactory beanFactory) {
Config config = BaseTest.createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap(EXECUTOR_NAME, 1));
nodeConfig.setBeanFactory(beanFactory);
RedissonNode node = RedissonNode.create(nodeConfig);
node.start();
return node;
}
}
private static AnnotationConfigApplicationContext context;
@BeforeClass
public static void beforeTest() throws FailedToStartRedisException, IOException, InterruptedException {
context = new AnnotationConfigApplicationContext(Application.class);
}
@AfterClass
public static void afterTest() {
context.close();
}
@Test
public void testRunnable() throws InterruptedException {
redisson.getExecutorService(EXECUTOR_NAME).execute(new SampleRunnable());
Thread.sleep(500);
assertThat(redisson.getBucket("result").get()).isEqualTo("hello runnable");
}
@Test
public void testCallable() throws InterruptedException {
RExecutorFuture<String> future = redisson.getExecutorService(EXECUTOR_NAME).submit(new SampleCallable());
Thread.sleep(500);
assertThat(future.sync().getNow()).isEqualTo("hello callable");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册