From f6ce91fe58b276bf0adf8c387337fe11f1de611f Mon Sep 17 00:00:00 2001 From: gaohongtao Date: Fri, 20 Jan 2017 16:23:50 +0800 Subject: [PATCH] fix #219 Improve thread performance. --- .../config/ShardingPropertiesConstant.java | 23 +---- .../rdb/sharding/executor/ExecutorEngine.java | 92 +++++++++---------- .../config/ShardingPropertiesTest.java | 10 +- .../content/03-community/release-notes.md | 2 + 4 files changed, 53 insertions(+), 74 deletions(-) diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/config/ShardingPropertiesConstant.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/config/ShardingPropertiesConstant.java index 577be9d35d..d84199871e 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/config/ShardingPropertiesConstant.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/config/ShardingPropertiesConstant.java @@ -58,32 +58,13 @@ public enum ShardingPropertiesConstant { METRICS_LOGGER_NAME("metrics.logger.name", "com.dangdang.ddframe.rdb.sharding.metrics", String.class), /** - * 最小空闲工作线程数量. - * - *

- * 默认值: 0 - *

- */ - EXECUTOR_MIN_IDLE_SIZE("executor.min.idle.size", "0", int.class), - - /** - * 最大工作线程数量. + * 工作线程数量. * *

* 默认值: 100 *

*/ - EXECUTOR_MAX_SIZE("executor.max.size", "100", int.class), - - /** - * 工作线程空闲时超时时间. - * - *

- * 单位: 毫秒. - * 默认值: 60000毫秒. - *

- */ - EXECUTOR_MAX_IDLE_TIMEOUT_MILLISECONDS("executor.max.idle.timeout.millisecond", "60000", long.class); + EXECUTOR_SIZE("executor.size", String.valueOf(Runtime.getRuntime().availableProcessors()), int.class); private final String key; diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/ExecutorEngine.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/ExecutorEngine.java index 1f0029cf97..987311afab 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/ExecutorEngine.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/ExecutorEngine.java @@ -20,20 +20,21 @@ package com.dangdang.ddframe.rdb.sharding.executor; import com.dangdang.ddframe.rdb.sharding.config.ShardingProperties; import com.dangdang.ddframe.rdb.sharding.config.ShardingPropertiesConstant; import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException; -import com.google.common.util.concurrent.FutureCallback; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; +import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -48,15 +49,16 @@ public final class ExecutorEngine { private final ListeningExecutorService executorService; public ExecutorEngine(final ShardingProperties shardingProperties) { - int executorMinIdleSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MIN_IDLE_SIZE); - int executorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE); - long executorMaxIdleTimeoutMilliseconds = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_IDLE_TIMEOUT_MILLISECONDS); - executorService = MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService( - new ThreadPoolExecutor(executorMinIdleSize, executorMaxSize, executorMaxIdleTimeoutMilliseconds, TimeUnit.MILLISECONDS, new SynchronousQueue()))); + int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE); + executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("ShardingJDBC-%d").build())); + MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS); } /** * 多线程执行任务. + * 一组任务中,将第一个任务放在当前线程中执行,其余的任务放到线程池中运行. + * * * @param inputs 输入参数 * @param executeUnit 执行单元 @@ -65,9 +67,26 @@ public final class ExecutorEngine { * @return 执行结果 */ public List execute(final Collection inputs, final ExecuteUnit executeUnit) { - ListenableFuture> futures = submitFutures(inputs, executeUnit); - addCallback(futures); - return getFutureResults(futures); + Iterator iterator = inputs.iterator(); + if (!iterator.hasNext()) { + return Collections.emptyList(); + } + I firstInput = iterator.next(); + ListenableFuture> restListFuture = asyncRun(Lists.newArrayList(iterator), executeUnit); + O firstOutput; + List restOutputs; + try { + firstOutput = executeUnit.execute(firstInput); + restOutputs = restListFuture.get(); + //CHECKSTYLE:OFF + } catch (final Exception ex) { + //CHECKSTYLE:ON + ExecutorExceptionHandler.handleException(ex); + return null; + } + List result = Lists.newLinkedList(restOutputs); + result.add(0, firstOutput); + return result; } /** @@ -85,22 +104,8 @@ public final class ExecutorEngine { return mergeUnit.merge(execute(inputs, executeUnit)); } - /** - * 安全关闭执行器,并释放线程. - */ - public void shutdown() { - executorService.shutdownNow(); - try { - executorService.awaitTermination(5, TimeUnit.SECONDS); - } catch (final InterruptedException ignored) { - } - if (!executorService.isTerminated()) { - throw new ShardingJdbcException("ExecutorEngine can not been terminated"); - } - } - - private ListenableFuture> submitFutures(final Collection inputs, final ExecuteUnit executeUnit) { - Set> result = new HashSet<>(inputs.size()); + private ListenableFuture> asyncRun(final Collection inputs, final ExecuteUnit executeUnit) { + List> result = new ArrayList<>(inputs.size()); for (final I each : inputs) { result.add(executorService.submit(new Callable() { @@ -113,26 +118,17 @@ public final class ExecutorEngine { return Futures.allAsList(result); } - private void addCallback(final ListenableFuture allFutures) { - Futures.addCallback(allFutures, new FutureCallback() { - @Override - public void onSuccess(final T result) { - log.trace("Concurrent execute result success {}", result); - } - - @Override - public void onFailure(final Throwable thrown) { - log.error("Concurrent execute result error {}", thrown); - } - }); - } - - private O getFutureResults(final ListenableFuture futures) { + /** + * 安全关闭执行器,并释放线程. + */ + public void shutdown() { + executorService.shutdownNow(); try { - return futures.get(); - } catch (final InterruptedException | ExecutionException ex) { - ExecutorExceptionHandler.handleException(ex); - return null; + executorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (final InterruptedException ignored) { + } + if (!executorService.isTerminated()) { + throw new ShardingJdbcException("ExecutorEngine can not been terminated"); } } } diff --git a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/config/ShardingPropertiesTest.java b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/config/ShardingPropertiesTest.java index 88445710f3..2a546e1467 100644 --- a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/config/ShardingPropertiesTest.java +++ b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/config/ShardingPropertiesTest.java @@ -37,7 +37,7 @@ public final class ShardingPropertiesTest { prop.put(ShardingPropertiesConstant.METRICS_ENABLE.getKey(), "true"); prop.put(ShardingPropertiesConstant.METRICS_MILLISECONDS_PERIOD.getKey(), "1000"); prop.put(ShardingPropertiesConstant.METRICS_LOGGER_NAME.getKey(), "example"); - prop.put(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE.getKey(), "10"); + prop.put(ShardingPropertiesConstant.EXECUTOR_SIZE.getKey(), "10"); shardingProperties = new ShardingProperties(prop); } @@ -50,8 +50,8 @@ public final class ShardingPropertiesTest { assertThat(actualMetricsEnabled, is(Boolean.valueOf(ShardingPropertiesConstant.METRICS_ENABLE.getDefaultValue()))); assertThat(actualMetricsMillisecondsPeriod, is(Long.valueOf(ShardingPropertiesConstant.METRICS_MILLISECONDS_PERIOD.getDefaultValue()))); assertThat(actualMetricsPackageName, is(ShardingPropertiesConstant.METRICS_LOGGER_NAME.getDefaultValue())); - int executorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE); - assertThat(executorMaxSize, is(Integer.valueOf(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE.getDefaultValue()))); + int executorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE); + assertThat(executorMaxSize, is(Integer.valueOf(ShardingPropertiesConstant.EXECUTOR_SIZE.getDefaultValue()))); } @Test @@ -62,7 +62,7 @@ public final class ShardingPropertiesTest { @Test public void assertGetValueForInteger() { - int actualExecutorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE); + int actualExecutorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE); assertThat(actualExecutorMaxSize, is(10)); } @@ -83,7 +83,7 @@ public final class ShardingPropertiesTest { Properties prop = new Properties(); prop.put(ShardingPropertiesConstant.METRICS_ENABLE.getKey(), "error"); prop.put(ShardingPropertiesConstant.METRICS_MILLISECONDS_PERIOD.getKey(), "error"); - prop.put(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE.getKey(), "error"); + prop.put(ShardingPropertiesConstant.EXECUTOR_SIZE.getKey(), "error"); prop.put("other", "other"); new ShardingProperties(prop); } diff --git a/sharding-jdbc-doc/content/03-community/release-notes.md b/sharding-jdbc-doc/content/03-community/release-notes.md index 7d9f8d90d5..6dc94143b0 100644 --- a/sharding-jdbc-doc/content/03-community/release-notes.md +++ b/sharding-jdbc-doc/content/03-community/release-notes.md @@ -11,6 +11,8 @@ next = "/03-community/directory-structure" ### 功能提升 +1. [ISSUE #219](https://github.com/dangdangdotcom/sharding-jdbc/issues/219) 线程性能优化 + ### 缺陷修正 1. [ISSUE #212](https://github.com/dangdangdotcom/sharding-jdbc/issues/212) 对去缺少数据源规则给出更有意义的提示 -- GitLab