提交 e2f40943 编写于 作者: T terrymanu

simplify AbstractExecutorWrapper 4rd version

上级 8aef60d4
......@@ -22,17 +22,17 @@ import com.dangdang.ddframe.rdb.sharding.constant.SQLType;
import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.event.ExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.StatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;
import com.dangdang.ddframe.rdb.sharding.routing.SQLExecutionUnit;
import lombok.RequiredArgsConstructor;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* 多线程执行静态语句对象请求的执行器.
......@@ -48,7 +48,7 @@ public final class StatementExecutor {
private final SQLType sqlType;
private final Collection<StatementExecutorWrapper> statementExecutorWrappers;
private final Map<SQLExecutionUnit, Statement> statements;
private final EventPostman eventPostman = new EventPostman();
......@@ -63,15 +63,16 @@ public final class StatementExecutor {
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
List<ResultSet> result;
try {
if (1 == statementExecutorWrappers.size()) {
return Collections.singletonList(executeQueryInternal(statementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap));
if (1 == statements.size()) {
Entry<SQLExecutionUnit, Statement> entry = statements.entrySet().iterator().next();
return Collections.singletonList(executeQueryInternal(entry.getKey(), entry.getValue(), isExceptionThrown, dataMap));
}
result = executorEngine.execute(statementExecutorWrappers, new ExecuteUnit<StatementExecutorWrapper, ResultSet>() {
result = executorEngine.execute(statements.entrySet(), new ExecuteUnit<Entry<SQLExecutionUnit, Statement>, ResultSet>() {
@Override
public ResultSet execute(final StatementExecutorWrapper input) throws Exception {
synchronized (input.getStatement().getConnection()) {
return executeQueryInternal(input, isExceptionThrown, dataMap);
public ResultSet execute(final Entry<SQLExecutionUnit, Statement> input) throws Exception {
synchronized (input.getValue().getConnection()) {
return executeQueryInternal(input.getKey(), input.getValue(), isExceptionThrown, dataMap);
}
}
});
......@@ -81,14 +82,14 @@ public final class StatementExecutor {
return result;
}
private ResultSet executeQueryInternal(final StatementExecutorWrapper statementExecutorWrapper, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
private ResultSet executeQueryInternal(final SQLExecutionUnit sqlExecutionUnit, final Statement statement, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
ResultSet result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
ExecutionEvent event = getExecutionEvent(statementExecutorWrapper);
ExecutionEvent event = getExecutionEvent(sqlExecutionUnit);
eventPostman.post(event);
try {
result = statementExecutorWrapper.getStatement().executeQuery(statementExecutorWrapper.getSqlExecutionUnit().getSql());
result = statement.executeQuery(sqlExecutionUnit.getSql());
} catch (final SQLException ex) {
eventPostman.postForExecuteFailure(event, ex);
ExecutorExceptionHandler.handleException(ex);
......@@ -148,15 +149,16 @@ public final class StatementExecutor {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
try {
if (1 == statementExecutorWrappers.size()) {
return executeUpdateInternal(updater, statementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap);
if (1 == statements.size()) {
Entry<SQLExecutionUnit, Statement> entry = statements.entrySet().iterator().next();
return executeUpdateInternal(updater, entry.getKey(), entry.getValue(), isExceptionThrown, dataMap);
}
return executorEngine.execute(statementExecutorWrappers, new ExecuteUnit<StatementExecutorWrapper, Integer>() {
return executorEngine.execute(statements.entrySet(), new ExecuteUnit<Entry<SQLExecutionUnit, Statement>, Integer>() {
@Override
public Integer execute(final StatementExecutorWrapper input) throws Exception {
synchronized (input.getStatement().getConnection()) {
return executeUpdateInternal(updater, input, isExceptionThrown, dataMap);
public Integer execute(final Entry<SQLExecutionUnit, Statement> input) throws Exception {
synchronized (input.getValue().getConnection()) {
return executeUpdateInternal(updater, input.getKey(), input.getValue(), isExceptionThrown, dataMap);
}
}
}, new MergeUnit<Integer, Integer>() {
......@@ -178,14 +180,14 @@ public final class StatementExecutor {
}
}
private int executeUpdateInternal(final Updater updater, final StatementExecutorWrapper statementExecutorWrapper, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
private int executeUpdateInternal(final Updater updater, final SQLExecutionUnit sqlExecutionUnit, final Statement statement, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
int result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
ExecutionEvent event = getExecutionEvent(statementExecutorWrapper);
ExecutionEvent event = getExecutionEvent(sqlExecutionUnit);
eventPostman.post(event);
try {
result = updater.executeUpdate(statementExecutorWrapper.getStatement(), statementExecutorWrapper.getSqlExecutionUnit().getSql());
result = updater.executeUpdate(statement, sqlExecutionUnit.getSql());
} catch (final SQLException ex) {
eventPostman.postForExecuteFailure(event, ex);
ExecutorExceptionHandler.handleException(ex);
......@@ -245,15 +247,16 @@ public final class StatementExecutor {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
try {
if (1 == statementExecutorWrappers.size()) {
return executeInternal(executor, statementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap);
if (1 == statements.size()) {
Entry<SQLExecutionUnit, Statement> entry = statements.entrySet().iterator().next();
return executeInternal(executor, entry.getKey(), entry.getValue(), isExceptionThrown, dataMap);
}
List<Boolean> result = executorEngine.execute(statementExecutorWrappers, new ExecuteUnit<StatementExecutorWrapper, Boolean>() {
List<Boolean> result = executorEngine.execute(statements.entrySet(), new ExecuteUnit<Entry<SQLExecutionUnit, Statement>, Boolean>() {
@Override
public Boolean execute(final StatementExecutorWrapper input) throws Exception {
synchronized (input.getStatement().getConnection()) {
return executeInternal(executor, input, isExceptionThrown, dataMap);
public Boolean execute(final Entry<SQLExecutionUnit, Statement> input) throws Exception {
synchronized (input.getValue().getConnection()) {
return executeInternal(executor, input.getKey(), input.getValue(), isExceptionThrown, dataMap);
}
}
});
......@@ -263,14 +266,14 @@ public final class StatementExecutor {
}
}
private boolean executeInternal(final Executor executor, final StatementExecutorWrapper statementExecutorWrapper, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
private boolean executeInternal(final Executor executor, final SQLExecutionUnit sqlExecutionUnit, final Statement statement, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
boolean result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
ExecutionEvent event = getExecutionEvent(statementExecutorWrapper);
ExecutionEvent event = getExecutionEvent(sqlExecutionUnit);
eventPostman.post(event);
try {
result = executor.execute(statementExecutorWrapper.getStatement(), statementExecutorWrapper.getSqlExecutionUnit().getSql());
result = executor.execute(statement, sqlExecutionUnit.getSql());
} catch (final SQLException ex) {
eventPostman.postForExecuteFailure(event, ex);
ExecutorExceptionHandler.handleException(ex);
......@@ -280,12 +283,12 @@ public final class StatementExecutor {
return result;
}
private ExecutionEvent getExecutionEvent(final StatementExecutorWrapper statementExecutorWrapper) {
private ExecutionEvent getExecutionEvent(final SQLExecutionUnit sqlExecutionUnit) {
ExecutionEvent event;
if (SQLType.SELECT == sqlType) {
event = new DQLExecutionEvent(statementExecutorWrapper.getSqlExecutionUnit().getDataSource(), statementExecutorWrapper.getSqlExecutionUnit().getSql());
event = new DQLExecutionEvent(sqlExecutionUnit.getDataSource(), sqlExecutionUnit.getSql());
} else {
event = new DMLExecutionEvent(statementExecutorWrapper.getSqlExecutionUnit().getDataSource(), statementExecutorWrapper.getSqlExecutionUnit().getSql());
event = new DMLExecutionEvent(sqlExecutionUnit.getDataSource(), sqlExecutionUnit.getSql());
}
return event;
}
......
......@@ -18,7 +18,6 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.core.statement;
import com.dangdang.ddframe.rdb.sharding.executor.StatementExecutor;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.StatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractStatementAdapter;
import com.dangdang.ddframe.rdb.sharding.jdbc.core.connection.ShardingConnection;
import com.dangdang.ddframe.rdb.sharding.jdbc.core.resultset.GeneratedKeysResultSet;
......@@ -39,8 +38,10 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* 支持分片的静态语句对象.
......@@ -198,15 +199,15 @@ public class ShardingStatement extends AbstractStatementAdapter {
private StatementExecutor generateExecutor(final String sql) throws SQLException {
clearPrevious();
routeResult = new StatementRoutingEngine(shardingConnection.getShardingContext()).route(sql);
Collection<StatementExecutorWrapper> statementExecutorWrappers = new LinkedList<>();
Map<SQLExecutionUnit, Statement> statements = new HashMap<>(routeResult.getExecutionUnits().size(), 1);
for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
Statement statement = shardingConnection.getConnection(
each.getDataSource(), routeResult.getSqlStatement().getType()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
replayMethodsInvocation(statement);
statementExecutorWrappers.add(new StatementExecutorWrapper(statement, each));
statements.put(each, statement);
routedStatements.add(statement);
}
return new StatementExecutor(shardingConnection.getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), statementExecutorWrappers);
return new StatementExecutor(shardingConnection.getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), statements);
}
private void clearPrevious() throws SQLException {
......
......@@ -195,6 +195,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd
private PreparedStatementExecutorWrapper wrap(final PreparedStatement preparedStatement, final SQLExecutionUnit sqlExecutionUnit) {
Optional<PreparedStatementExecutorWrapper> wrapperOptional = Iterators.tryFind(cachedPreparedStatementWrappers.iterator(), new Predicate<PreparedStatementExecutorWrapper>() {
@Override
public boolean apply(final PreparedStatementExecutorWrapper input) {
return Objects.equals(input.getPreparedStatement(), preparedStatement);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册