提交 f6ce91fe 编写于 作者: G gaohongtao

fix #219 Improve thread performance.

上级 f301fdc9
......@@ -58,32 +58,13 @@ public enum ShardingPropertiesConstant {
METRICS_LOGGER_NAME("metrics.logger.name", "com.dangdang.ddframe.rdb.sharding.metrics", String.class),
/**
* 最小空闲工作线程数量.
*
* <p>
* 默认值: 0
* </p>
*/
EXECUTOR_MIN_IDLE_SIZE("executor.min.idle.size", "0", int.class),
/**
* 最大工作线程数量.
* 工作线程数量.
*
* <p>
* 默认值: 100
* </p>
*/
EXECUTOR_MAX_SIZE("executor.max.size", "100", int.class),
/**
* 工作线程空闲时超时时间.
*
* <p>
* 单位: 毫秒.
* 默认值: 60000毫秒.
* </p>
*/
EXECUTOR_MAX_IDLE_TIMEOUT_MILLISECONDS("executor.max.idle.timeout.millisecond", "60000", long.class);
EXECUTOR_SIZE("executor.size", String.valueOf(Runtime.getRuntime().availableProcessors()), int.class);
private final String key;
......
......@@ -20,20 +20,21 @@ package com.dangdang.ddframe.rdb.sharding.executor;
import com.dangdang.ddframe.rdb.sharding.config.ShardingProperties;
import com.dangdang.ddframe.rdb.sharding.config.ShardingPropertiesConstant;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
......@@ -48,15 +49,16 @@ public final class ExecutorEngine {
private final ListeningExecutorService executorService;
public ExecutorEngine(final ShardingProperties shardingProperties) {
int executorMinIdleSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MIN_IDLE_SIZE);
int executorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE);
long executorMaxIdleTimeoutMilliseconds = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_IDLE_TIMEOUT_MILLISECONDS);
executorService = MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(executorMinIdleSize, executorMaxSize, executorMaxIdleTimeoutMilliseconds, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())));
int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}
/**
* 多线程执行任务.
* 一组任务中,将第一个任务放在当前线程中执行,其余的任务放到线程池中运行.
*
*
* @param inputs 输入参数
* @param executeUnit 执行单元
......@@ -65,9 +67,26 @@ public final class ExecutorEngine {
* @return 执行结果
*/
public <I, O> List<O> execute(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
ListenableFuture<List<O>> futures = submitFutures(inputs, executeUnit);
addCallback(futures);
return getFutureResults(futures);
Iterator<I> iterator = inputs.iterator();
if (!iterator.hasNext()) {
return Collections.emptyList();
}
I firstInput = iterator.next();
ListenableFuture<List<O>> restListFuture = asyncRun(Lists.newArrayList(iterator), executeUnit);
O firstOutput;
List<O> restOutputs;
try {
firstOutput = executeUnit.execute(firstInput);
restOutputs = restListFuture.get();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
ExecutorExceptionHandler.handleException(ex);
return null;
}
List<O> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}
/**
......@@ -85,22 +104,8 @@ public final class ExecutorEngine {
return mergeUnit.merge(execute(inputs, executeUnit));
}
/**
* 安全关闭执行器,并释放线程.
*/
public void shutdown() {
executorService.shutdownNow();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException ignored) {
}
if (!executorService.isTerminated()) {
throw new ShardingJdbcException("ExecutorEngine can not been terminated");
}
}
private <I, O> ListenableFuture<List<O>> submitFutures(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
Set<ListenableFuture<O>> result = new HashSet<>(inputs.size());
private <I, O> ListenableFuture<List<O>> asyncRun(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
List<ListenableFuture<O>> result = new ArrayList<>(inputs.size());
for (final I each : inputs) {
result.add(executorService.submit(new Callable<O>() {
......@@ -113,26 +118,17 @@ public final class ExecutorEngine {
return Futures.allAsList(result);
}
private <T> void addCallback(final ListenableFuture<T> allFutures) {
Futures.addCallback(allFutures, new FutureCallback<T>() {
@Override
public void onSuccess(final T result) {
log.trace("Concurrent execute result success {}", result);
}
@Override
public void onFailure(final Throwable thrown) {
log.error("Concurrent execute result error {}", thrown);
}
});
}
private <O> O getFutureResults(final ListenableFuture<O> futures) {
/**
* 安全关闭执行器,并释放线程.
*/
public void shutdown() {
executorService.shutdownNow();
try {
return futures.get();
} catch (final InterruptedException | ExecutionException ex) {
ExecutorExceptionHandler.handleException(ex);
return null;
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException ignored) {
}
if (!executorService.isTerminated()) {
throw new ShardingJdbcException("ExecutorEngine can not been terminated");
}
}
}
......@@ -37,7 +37,7 @@ public final class ShardingPropertiesTest {
prop.put(ShardingPropertiesConstant.METRICS_ENABLE.getKey(), "true");
prop.put(ShardingPropertiesConstant.METRICS_MILLISECONDS_PERIOD.getKey(), "1000");
prop.put(ShardingPropertiesConstant.METRICS_LOGGER_NAME.getKey(), "example");
prop.put(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE.getKey(), "10");
prop.put(ShardingPropertiesConstant.EXECUTOR_SIZE.getKey(), "10");
shardingProperties = new ShardingProperties(prop);
}
......@@ -50,8 +50,8 @@ public final class ShardingPropertiesTest {
assertThat(actualMetricsEnabled, is(Boolean.valueOf(ShardingPropertiesConstant.METRICS_ENABLE.getDefaultValue())));
assertThat(actualMetricsMillisecondsPeriod, is(Long.valueOf(ShardingPropertiesConstant.METRICS_MILLISECONDS_PERIOD.getDefaultValue())));
assertThat(actualMetricsPackageName, is(ShardingPropertiesConstant.METRICS_LOGGER_NAME.getDefaultValue()));
int executorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE);
assertThat(executorMaxSize, is(Integer.valueOf(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE.getDefaultValue())));
int executorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
assertThat(executorMaxSize, is(Integer.valueOf(ShardingPropertiesConstant.EXECUTOR_SIZE.getDefaultValue())));
}
@Test
......@@ -62,7 +62,7 @@ public final class ShardingPropertiesTest {
@Test
public void assertGetValueForInteger() {
int actualExecutorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE);
int actualExecutorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
assertThat(actualExecutorMaxSize, is(10));
}
......@@ -83,7 +83,7 @@ public final class ShardingPropertiesTest {
Properties prop = new Properties();
prop.put(ShardingPropertiesConstant.METRICS_ENABLE.getKey(), "error");
prop.put(ShardingPropertiesConstant.METRICS_MILLISECONDS_PERIOD.getKey(), "error");
prop.put(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE.getKey(), "error");
prop.put(ShardingPropertiesConstant.EXECUTOR_SIZE.getKey(), "error");
prop.put("other", "other");
new ShardingProperties(prop);
}
......
......@@ -11,6 +11,8 @@ next = "/03-community/directory-structure"
### 功能提升
1. [ISSUE #219](https://github.com/dangdangdotcom/sharding-jdbc/issues/219) 线程性能优化
### 缺陷修正
1. [ISSUE #212](https://github.com/dangdangdotcom/sharding-jdbc/issues/212) 对去缺少数据源规则给出更有意义的提示
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册