提交 bc0abfd4 编写于 作者: G Gao Hongtao 提交者: gaohongtao

fixed #203 Merge batch events

上级 78b31ef7
......@@ -23,7 +23,6 @@ import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEventBus;
import com.dangdang.ddframe.rdb.sharding.executor.event.EventExecutionType;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.AbstractExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.BatchPreparedStatementExecutorWrapper;
import com.google.common.base.Optional;
import lombok.RequiredArgsConstructor;
......@@ -43,9 +42,7 @@ class EventPostman {
void postExecutionEvents() {
for (AbstractExecutorWrapper each : statementExecutorWrappers) {
if (each instanceof BatchPreparedStatementExecutorWrapper) {
postBatchExecutionEvent((BatchPreparedStatementExecutorWrapper) each);
} else if (each.getDMLExecutionEvent().isPresent()) {
if (each.getDMLExecutionEvent().isPresent()) {
DMLExecutionEventBus.post(each.getDMLExecutionEvent().get());
} else if (each.getDQLExecutionEvent().isPresent()) {
DQLExecutionEventBus.post(each.getDQLExecutionEvent().get());
......@@ -53,12 +50,6 @@ class EventPostman {
}
}
private void postBatchExecutionEvent(final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper) {
for (DMLExecutionEvent each : batchPreparedStatementExecutorWrapper.getDmlExecutionEvents()) {
DMLExecutionEventBus.post(each);
}
}
void postExecutionEventsAfterExecution(final AbstractExecutorWrapper statementExecutorWrapper) {
postExecutionEventsAfterExecution(statementExecutorWrapper, EventExecutionType.EXECUTE_SUCCESS, Optional.<SQLException>absent());
}
......@@ -76,17 +67,4 @@ class EventPostman {
DQLExecutionEventBus.post(event);
}
}
void postBatchExecutionEventsAfterExecution(final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper) {
postBatchExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper, EventExecutionType.EXECUTE_SUCCESS, Optional.<SQLException>absent());
}
void postBatchExecutionEventsAfterExecution(
final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper, final EventExecutionType eventExecutionType, final Optional<SQLException> exp) {
for (DMLExecutionEvent each : batchPreparedStatementExecutorWrapper.getDmlExecutionEvents()) {
each.setEventExecutionType(eventExecutionType);
each.setExp(exp);
DMLExecutionEventBus.post(each);
}
}
}
......@@ -19,7 +19,6 @@ package com.dangdang.ddframe.rdb.sharding.executor;
import com.codahale.metrics.Timer.Context;
import com.dangdang.ddframe.rdb.sharding.executor.event.EventExecutionType;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.BatchPreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.PreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;
import com.google.common.base.Optional;
......@@ -213,14 +212,14 @@ public final class PreparedStatementExecutor {
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
try {
if (1 == preparedStatementExecutorWrappers.size()) {
return executeBatchInternal((BatchPreparedStatementExecutorWrapper) preparedStatementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap);
return executeBatchInternal(preparedStatementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap);
}
return executorEngine.execute(preparedStatementExecutorWrappers, new ExecuteUnit<PreparedStatementExecutorWrapper, int[]>() {
@Override
public int[] execute(final PreparedStatementExecutorWrapper input) throws Exception {
synchronized (input.getPreparedStatement().getConnection()) {
return executeBatchInternal((BatchPreparedStatementExecutorWrapper) input, isExceptionThrown, dataMap);
return executeBatchInternal(input, isExceptionThrown, dataMap);
}
}
}, new MergeUnit<int[], int[]>() {
......@@ -248,18 +247,18 @@ public final class PreparedStatementExecutor {
}
}
private int[] executeBatchInternal(final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
private int[] executeBatchInternal(final PreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
int[] result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
try {
result = batchPreparedStatementExecutorWrapper.getPreparedStatement().executeBatch();
} catch (final SQLException ex) {
eventPostman.postBatchExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper, EventExecutionType.EXECUTE_FAILURE, Optional.of(ex));
eventPostman.postExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper, EventExecutionType.EXECUTE_FAILURE, Optional.of(ex));
ExecutorExceptionHandler.handleException(ex);
return null;
}
eventPostman.postBatchExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper);
eventPostman.postExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper);
return result;
}
}
......@@ -18,10 +18,12 @@
package com.dangdang.ddframe.rdb.sharding.executor.event;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
......@@ -40,7 +42,7 @@ public class ExecutionEvent {
private final String sql;
private final List<Object> parameters;
private final List<List<Object>> parameters = new ArrayList<>();
@Setter
private EventExecutionType eventExecutionType = EventExecutionType.BEFORE_EXECUTE;
......@@ -57,6 +59,42 @@ public class ExecutionEvent {
id = UUID.randomUUID().toString();
this.dataSource = dataSource;
this.sql = sql;
this.parameters = parameters;
this.parameters.add(parameters);
}
/**
* 获取参数.
* 调用该方法前需要调用{@linkplain #isBatch()},
* 如果返回值为{@code false}那么可以调用该方法获取参数.
*
* @return 参数列表
*/
public List<Object> getParameters() {
return parameters.get(0);
}
/**
* 判断事件是否为批量操作事件.
* 如果返回值为{@code false}那么可以调用{@link #getParameters()}获取参数,
* 如果返回值为{@code true}那么可以调用{@link #getBatchParameters()}获取参数.
*
* @return {@code true}是批量操作事件,{@code false}不是批量操作事件
*/
public boolean isBatch() {
return parameters.size() > 1;
}
/**
* 获取批量参数.
* 不论{@linkplain #isBatch()}返回值是什么,该方法都可以获得所有的参数.
*
* @return 参数列表
*/
public List<List<Object>> getBatchParameters() {
return parameters;
}
public void addBatchParameters(final List<Object> parameters) {
this.parameters.add(Lists.newArrayList(parameters));
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.executor.wrapper;
import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import lombok.Getter;
import java.sql.PreparedStatement;
import java.util.LinkedList;
import java.util.List;
/**
* 批量操作执行上下文.
* 批量操作只支持DML语句,故只包含DML操作的事件.
*
* @author gaohongtao
*/
@Getter
public class BatchPreparedStatementExecutorWrapper extends PreparedStatementExecutorWrapper {
private final List<DMLExecutionEvent> dmlExecutionEvents = new LinkedList<>();
public BatchPreparedStatementExecutorWrapper(final PreparedStatement preparedStatement, final List<Object> parameters, final SQLExecutionUnit sqlExecutionUnit) {
super(preparedStatement, parameters, sqlExecutionUnit);
}
}
......@@ -21,6 +21,7 @@ import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import lombok.Getter;
......@@ -66,4 +67,14 @@ public class PreparedStatementExecutorWrapper extends AbstractExecutorWrapper {
public Optional<DQLExecutionEvent> getDQLExecutionEvent() {
return dqlExecutionEvent;
}
/**
* 增加批量参数.
*
* @param parameters 参数列表
*/
public void addBatchParameters(final List<Object> parameters) {
Preconditions.checkArgument(isDML() && dmlExecutionEvent.isPresent());
dmlExecutionEvent.get().addBatchParameters(Lists.newArrayList(parameters));
}
}
......@@ -18,7 +18,6 @@
package com.dangdang.ddframe.rdb.sharding.jdbc;
import com.dangdang.ddframe.rdb.sharding.executor.PreparedStatementExecutor;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.BatchPreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.PreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractPreparedStatementAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory;
......@@ -26,15 +25,17 @@ import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext;
import com.dangdang.ddframe.rdb.sharding.router.PreparedSQLRouter;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import com.dangdang.ddframe.rdb.sharding.router.SQLRouteResult;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* 支持分片的预编译语句对象.
......@@ -46,7 +47,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd
private final PreparedSQLRouter preparedSQLRouter;
private final Map<PreparedStatement, PreparedStatementExecutorWrapper> cachedRoutePreparedStatementMap = new HashMap<>();
private final List<PreparedStatementExecutorWrapper> cachedPreparedStatementWrappers = new ArrayList<>();
private Integer autoGeneratedKeys;
......@@ -116,8 +117,13 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd
}
protected void clearRouteContext() throws SQLException {
super.clearRouteContext();
clearParameters();
resetBatch();
cachedPreparedStatementWrappers.clear();
}
@Override
public void clearBatch() throws SQLException {
clearRouteContext();
}
@Override
......@@ -125,40 +131,28 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd
try {
for (PreparedStatementExecutorWrapper each : routeSQL()) {
each.getPreparedStatement().addBatch();
BatchPreparedStatementExecutorWrapper wrapper;
if (cachedRoutePreparedStatementMap.containsKey(each.getPreparedStatement())) {
wrapper = (BatchPreparedStatementExecutorWrapper) cachedRoutePreparedStatementMap.get(each.getPreparedStatement());
} else {
wrapper = new BatchPreparedStatementExecutorWrapper(each.getPreparedStatement(), getParameters(), each.getSqlExecutionUnit());
cachedRoutePreparedStatementMap.put(each.getPreparedStatement(), wrapper);
}
if (each.getDMLExecutionEvent().isPresent()) {
wrapper.getDmlExecutionEvents().add(each.getDMLExecutionEvent().get());
}
}
getGeneratedKeyContext().addRow();
} finally {
clearRouteContext();
resetBatch();
}
}
@Override
public void clearBatch() throws SQLException {
cachedRoutePreparedStatementMap.clear();
clearRouteContext();
private void resetBatch() throws SQLException {
super.clearRouteContext();
clearParameters();
}
@Override
public int[] executeBatch() throws SQLException {
try {
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), cachedRoutePreparedStatementMap.values()).executeBatch();
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), cachedPreparedStatementWrappers).executeBatch();
} finally {
clearBatch();
clearRouteContext();
}
}
private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
List<Object> parameters = getParameters();
List<PreparedStatementExecutorWrapper> result = new ArrayList<>();
SQLRouteResult sqlRouteResult = preparedSQLRouter.route(getParameters());
MergeContext mergeContext = sqlRouteResult.getMergeContext();
......@@ -168,8 +162,24 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd
PreparedStatement preparedStatement = (PreparedStatement) getStatement(getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatementType()), each.getSql());
replayMethodsInvocation(preparedStatement);
getParameters().replayMethodsInvocation(preparedStatement);
result.add(new PreparedStatementExecutorWrapper(preparedStatement, parameters, each));
result.add(wrap(preparedStatement, each));
}
return result;
}
private PreparedStatementExecutorWrapper wrap(final PreparedStatement preparedStatement, final SQLExecutionUnit sqlExecutionUnit) {
Optional<PreparedStatementExecutorWrapper> wrapperOptional = Iterators.tryFind(cachedPreparedStatementWrappers.iterator(), new Predicate<PreparedStatementExecutorWrapper>() {
@Override
public boolean apply(final PreparedStatementExecutorWrapper input) {
return Objects.equals(input.getPreparedStatement(), preparedStatement);
}
});
if (wrapperOptional.isPresent()) {
wrapperOptional.get().addBatchParameters(getParameters());
return wrapperOptional.get();
}
PreparedStatementExecutorWrapper result = new PreparedStatementExecutorWrapper(preparedStatement, getParameters(), sqlExecutionUnit);
cachedPreparedStatementWrappers.add(result);
return result;
}
......
......@@ -32,8 +32,10 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
......@@ -228,9 +230,10 @@ public final class ShardingPreparedStatementTest extends AbstractShardingDataBas
@Test
public void assertAddBatch() throws SQLException {
final AtomicInteger beforeEventSum = new AtomicInteger();
final AtomicInteger successEventSum = new AtomicInteger();
DMLExecutionEventBus.register(new DMLExecutionEventListener() {
private List<DMLExecutionEvent> beforeEvents = new ArrayList<>();
@Override
public String getName() {
return "test";
......@@ -239,10 +242,12 @@ public final class ShardingPreparedStatementTest extends AbstractShardingDataBas
@Subscribe
@AllowConcurrentEvents
public void subscribe(final DMLExecutionEvent event) {
assertTrue(event.isBatch());
assertThat(event.getBatchParameters().size(), is(2));
if (event.getEventExecutionType().equals(EventExecutionType.BEFORE_EXECUTE)) {
beforeEventSum.incrementAndGet();
beforeEvents.add(event);
} else if (event.getEventExecutionType().equals(EventExecutionType.EXECUTE_SUCCESS)) {
successEventSum.incrementAndGet();
assertThat(beforeEvents, hasItem(event));
}
}
});
......@@ -270,10 +275,9 @@ public final class ShardingPreparedStatementTest extends AbstractShardingDataBas
for (int each : result) {
assertThat(each, is(1));
}
} finally {
DMLExecutionEventBus.clearListener();
}
assertThat(beforeEventSum.get(), is(4));
assertThat(successEventSum.get(), is(4));
DMLExecutionEventBus.clearListener();
}
@Test
......
......@@ -13,6 +13,7 @@ weight = 1
1. [ISSUE #194](https://github.com/dangdangdotcom/sharding-jdbc/issues/194) jdbc接口中资源释放错误
1. [ISSUE #199](https://github.com/dangdangdotcom/sharding-jdbc/issues/199) 分表且复用PreparedStatement对象造成数据路由错误
1. [ISSUE #201](https://github.com/dangdangdotcom/sharding-jdbc/issues/201) 批量操作执行前事件发送缺失
1. [ISSUE #203](https://github.com/dangdangdotcom/sharding-jdbc/issues/201) 合并batch操作发送的事件
## 1.4.0
......
......@@ -89,6 +89,7 @@ public final class RdbTransactionLogStorage implements TransactionLogStorage {
try (ResultSet rs = preparedStatement.executeQuery()) {
while (rs.next()) {
Gson gson = new Gson();
//TODO 对于批量执行的参数需要解析成两层列表
List<Object> parameters = gson.fromJson(rs.getString(5), new TypeToken<List<Object>>() { }.getType());
result.add(new TransactionLog(rs.getString(1), "", SoftTransactionType.valueOf(rs.getString(2)), rs.getString(3), rs.getString(4), parameters, rs.getLong(6), rs.getInt(7)));
}
......
......@@ -56,6 +56,7 @@ public final class BestEffortsDeliveryListener implements DMLExecutionEventListe
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
switch (event.getEventExecutionType()) {
case BEFORE_EXECUTE:
//TODO 对于批量执行的SQL需要解析成两层列表
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
return;
......@@ -79,6 +80,7 @@ public final class BestEffortsDeliveryListener implements DMLExecutionEventListe
isNewConnection = true;
}
preparedStatement = conn.prepareStatement(event.getSql());
//TODO 对于批量事件需要解析成两层列表
for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册