提交 6a2b5fc4 编写于 作者: G gaohongtao

Add routing database only feature

上级 86365334
......@@ -50,6 +50,9 @@ public final class HintManager implements AutoCloseable {
@Getter
private boolean masterRouteOnly;
@Getter
private boolean databaseShardingOnly;
/**
* 获取线索分片管理器实例.
*
......@@ -61,6 +64,18 @@ public final class HintManager implements AutoCloseable {
return result;
}
/**
* 设置分库分片值.
*
* <p>分片操作符为等号.该方法适用于只分库的场景</p>
*
* @param value 分片值
*/
public void setDatabaseShardingValue(final Comparable<?> value) {
databaseShardingOnly = true;
addDatabaseShardingValue(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME, value);
}
/**
* 添加分库分片值.
*
......
......@@ -67,9 +67,8 @@ public final class ShardingRule {
final DataSourceRule dataSourceRule, final Collection<TableRule> tableRules, final Collection<BindingTableRule> bindingTableRules,
final DatabaseShardingStrategy databaseShardingStrategy, final TableShardingStrategy tableShardingStrategy) {
Preconditions.checkNotNull(dataSourceRule);
Preconditions.checkNotNull(tableRules);
this.dataSourceRule = dataSourceRule;
this.tableRules = tableRules;
this.tableRules = null == tableRules ? Collections.<TableRule>emptyList() : tableRules;
this.bindingTableRules = null == bindingTableRules ? Collections.<BindingTableRule>emptyList() : bindingTableRules;
this.databaseShardingStrategy = null == databaseShardingStrategy ? new DatabaseShardingStrategy(
Collections.<String>emptyList(), new NoneDatabaseShardingAlgorithm()) : databaseShardingStrategy;
......
......@@ -17,10 +17,10 @@
package com.dangdang.ddframe.rdb.sharding.api.strategy.database;
import java.util.Collection;
import com.dangdang.ddframe.rdb.sharding.router.strategy.ShardingStrategy;
import java.util.Collection;
/**
* 分库策略.
*
......@@ -28,6 +28,10 @@ import com.dangdang.ddframe.rdb.sharding.router.strategy.ShardingStrategy;
*/
public final class DatabaseShardingStrategy extends ShardingStrategy {
public DatabaseShardingStrategy(final NoneKeyDatabaseShardingAlgorithm<?> databaseShardingAlgorithm) {
super("", databaseShardingAlgorithm);
}
public DatabaseShardingStrategy(final String shardingColumn, final SingleKeyDatabaseShardingAlgorithm<?> databaseShardingAlgorithm) {
super(shardingColumn, databaseShardingAlgorithm);
}
......
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.api.strategy.database;
import com.dangdang.ddframe.rdb.sharding.router.strategy.NoneKeyShardingAlgorithm;
/**
* 没有分片键的数据库分片算法.
*
* @author gaohongtao
*/
public interface NoneKeyDatabaseShardingAlgorithm<T extends Comparable<?>> extends NoneKeyShardingAlgorithm<T>, DatabaseShardingAlgorithm {
}
......@@ -32,6 +32,10 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class HintManagerHolder {
public static final String DB_TABLE_NAME = "DB_TABLE_NAME";
public static final String DB_COLUMN_NAME = "DB_COLUMN_NAME";
private static final ThreadLocal<HintManager> HINT_MANAGER_HOLDER = new ThreadLocal<>();
/**
......@@ -81,6 +85,15 @@ public final class HintManagerHolder {
return null != HINT_MANAGER_HOLDER.get() && HINT_MANAGER_HOLDER.get().isMasterRouteOnly();
}
/**
* 判断是否当前只分库.
*
* @return 是否当前只分库.
*/
public static boolean isDatabaseShardingOnly() {
return null != HINT_MANAGER_HOLDER.get() && HINT_MANAGER_HOLDER.get().isDatabaseShardingOnly();
}
/**
* 清理线索分片管理器的本地线程持有者.
*/
......
......@@ -43,8 +43,6 @@ public class PreparedSQLRouter {
private SQLParsedResult sqlParsedResult;
private Optional<TableRule> tableRuleOptional;
/**
* 使用参数进行SQL路由.
* 当第一次路由时进行SQL解析,之后的路由复用第一次的解析结果.
......@@ -55,7 +53,6 @@ public class PreparedSQLRouter {
public SQLRouteResult route(final List<Object> parameters) {
if (null == sqlParsedResult) {
sqlParsedResult = engine.parseSQL(logicSql, parameters);
tableRuleOptional = shardingRule.tryFindTableRule(sqlParsedResult.getRouteContext().getTables().iterator().next().getName());
} else {
generateId(parameters);
for (ConditionContext each : sqlParsedResult.getConditionContexts()) {
......@@ -66,6 +63,7 @@ public class PreparedSQLRouter {
}
private void generateId(final List<Object> parameters) {
Optional<TableRule> tableRuleOptional = shardingRule.tryFindTableRule(sqlParsedResult.getRouteContext().getTables().iterator().next().getName());
if (!tableRuleOptional.isPresent()) {
return;
}
......
......@@ -21,22 +21,27 @@ import com.codahale.metrics.Timer.Context;
import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule;
import com.dangdang.ddframe.rdb.sharding.constants.DatabaseType;
import com.dangdang.ddframe.rdb.sharding.exception.SQLParserException;
import com.dangdang.ddframe.rdb.sharding.hint.HintManagerHolder;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;
import com.dangdang.ddframe.rdb.sharding.parser.SQLParserFactory;
import com.dangdang.ddframe.rdb.sharding.parser.result.SQLParsedResult;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.Limit;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.ConditionContext;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.RouteContext;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLBuilder;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.Table;
import com.dangdang.ddframe.rdb.sharding.router.binding.BindingTablesRouter;
import com.dangdang.ddframe.rdb.sharding.router.database.DatabaseRouter;
import com.dangdang.ddframe.rdb.sharding.router.mixed.MixedTablesRouter;
import com.dangdang.ddframe.rdb.sharding.router.single.SingleTableRouter;
import com.dangdang.ddframe.rdb.sharding.util.SQLUtil;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
......@@ -81,12 +86,30 @@ public final class SQLRouteEngine {
}
SQLParsedResult parseSQL(final String logicSql, final List<Object> parameters) {
if (HintManagerHolder.isDatabaseShardingOnly()) {
return buildHintParsedResult(logicSql);
}
Context context = MetricsContext.start("Parse SQL");
SQLParsedResult result = SQLParserFactory.create(databaseType, logicSql, parameters, shardingRule).parse();
MetricsContext.stop(context);
return result;
}
private SQLParsedResult buildHintParsedResult(final String logicSql) {
SQLParsedResult result = new SQLParsedResult();
RouteContext routeContext = result.getRouteContext();
routeContext.setSqlStatementType(SQLUtil.getTypeByStart(logicSql));
log.trace("Get {} SQL Statement", routeContext.getSqlStatementType());
SQLBuilder sqlBuilder = new SQLBuilder();
try {
sqlBuilder.append(logicSql);
} catch (final IOException ignored) {
}
routeContext.setSqlBuilder(sqlBuilder);
result.getConditionContexts().add(new ConditionContext());
return result;
}
SQLRouteResult routeSQL(final SQLParsedResult parsedResult, final List<Object> parameters) {
Context context = MetricsContext.start("Route SQL");
SQLRouteResult result = new SQLRouteResult(parsedResult.getRouteContext().getSqlStatementType(), parsedResult.getMergeContext(), parsedResult.getGeneratedKeyContext());
......@@ -105,8 +128,11 @@ public final class SQLRouteEngine {
}
private RoutingResult routeSQL(final ConditionContext conditionContext, final SQLParsedResult parsedResult) {
if (HintManagerHolder.isDatabaseShardingOnly()) {
return new DatabaseRouter(shardingRule.getDataSourceRule(), shardingRule.getDatabaseShardingStrategy(), parsedResult.getRouteContext().getSqlStatementType()).route();
}
Set<String> logicTables = Sets.newLinkedHashSet(Collections2.transform(parsedResult.getRouteContext().getTables(), new Function<Table, String>() {
@Override
public String apply(final Table input) {
return input.getName();
......
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.router.database;
import com.dangdang.ddframe.rdb.sharding.api.ShardingValue;
import com.dangdang.ddframe.rdb.sharding.api.rule.DataSourceRule;
import com.dangdang.ddframe.rdb.sharding.api.strategy.database.DatabaseShardingStrategy;
import com.dangdang.ddframe.rdb.sharding.hint.HintManagerHolder;
import com.dangdang.ddframe.rdb.sharding.hint.ShardingKey;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLStatementType;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Collection;
import java.util.Collections;
/**
* 库路由.
*
* @author gaohongtao
*/
@RequiredArgsConstructor
@Slf4j
public class DatabaseRouter {
private final DataSourceRule dataSourceRule;
private final DatabaseShardingStrategy databaseShardingStrategy;
private final SQLStatementType sqlStatementType;
/**
* 根据Hint路由到库.
*
* @return 库路由结果
*/
public DatabaseRoutingResult route() {
Optional<ShardingValue<?>> shardingValueOptional = HintManagerHolder.getDatabaseShardingValue(new ShardingKey(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME));
Preconditions.checkState(shardingValueOptional.isPresent());
log.debug("Before database sharding only db:{} sharding values: {}", dataSourceRule.getDataSourceNames(), shardingValueOptional.get());
Collection<String> routedResult = databaseShardingStrategy
.doStaticSharding(sqlStatementType, dataSourceRule.getDataSourceNames(), Collections.<ShardingValue<?>>singleton(shardingValueOptional.get()));
Preconditions.checkState(!routedResult.isEmpty(), "no database route info");
log.debug("After database sharding only result: {}", routedResult);
return new DatabaseRoutingResult(routedResult);
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.router.database;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLBuilder;
import com.dangdang.ddframe.rdb.sharding.router.RoutingResult;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import lombok.RequiredArgsConstructor;
import java.util.Collection;
/**
* 库路由结果.
*
* @author gaohongtao
*/
@RequiredArgsConstructor
public class DatabaseRoutingResult implements RoutingResult {
private final Collection<String> routedDatabaseNames;
@Override
public Collection<SQLExecutionUnit> getSQLExecutionUnits(final SQLBuilder sqlBuilder) {
return Collections2.transform(routedDatabaseNames, new Function<String, SQLExecutionUnit>() {
@Override
public SQLExecutionUnit apply(final String input) {
return new SQLExecutionUnit(input, sqlBuilder);
}
});
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.router.strategy;
import com.dangdang.ddframe.rdb.sharding.api.ShardingValue;
import java.util.Collection;
/**
* 无分片键算法接口.
*
* @param <T> 无分片键传入的数值类型
* @author gaohongtao
*/
public interface NoneKeyShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
/**
* 没有分片键的情况下,调用该方法.
*
* <p>shardingValue来源于非SQL形式的传入</p>
*
* @param availableTargetNames 所有的可用目标名称集合, 一般是数据源或表名称
* @param shardingValue 分片值
* @return 分片后指向的目标名称, 一般是数据源或表名称
*/
String doSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);
}
......@@ -73,6 +73,9 @@ public class ShardingStrategy {
@SuppressWarnings({ "unchecked", "rawtypes" })
private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) {
if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) {
return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next()));
}
if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) {
SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm;
ShardingValue shardingValue = shardingValues.iterator().next();
......
......@@ -17,6 +17,9 @@
package com.dangdang.ddframe.rdb.sharding.util;
import com.alibaba.druid.sql.parser.Lexer;
import com.dangdang.ddframe.rdb.sharding.exception.SQLParserException;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLStatementType;
import com.google.common.base.CharMatcher;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
......@@ -69,4 +72,26 @@ public class SQLUtil {
throw current;
}
}
/**
* 根据SQL第一个单词判断SQL类型.
*
* @param sql SQL语句
* @return SQL类型
*/
public static SQLStatementType getTypeByStart(final String sql) {
//TODO: Use new Lexer Util.
Lexer lexer = new Lexer(sql);
lexer.nextToken();
while (true) {
switch (lexer.token()) {
case SELECT: return SQLStatementType.SELECT;
case INSERT: return SQLStatementType.INSERT;
case UPDATE: return SQLStatementType.UPDATE;
case DELETE: return SQLStatementType.DELETE;
case EOF: throw new SQLParserException("Unsupported SQL statement: [%s]", sql);
default: lexer.nextToken();
}
}
}
}
......@@ -36,7 +36,9 @@ import com.dangdang.ddframe.rdb.integrate.dbtbl.statically.pstatement.StaticShar
import com.dangdang.ddframe.rdb.integrate.dbtbl.statically.statement.StaticShardingBothForStatementWithAggregateTest;
import com.dangdang.ddframe.rdb.integrate.dbtbl.statically.statement.StaticShardingBothForStatementWithDMLTest;
import com.dangdang.ddframe.rdb.integrate.dbtbl.statically.statement.StaticShardingBothForStatementWithSelectTest;
import com.dangdang.ddframe.rdb.integrate.hint.ShardingDataBasesOnlyForHintWithDMLTest;
import com.dangdang.ddframe.rdb.integrate.hint.RoutingDatabaseOnlyWithHintForDMLTest;
import com.dangdang.ddframe.rdb.integrate.hint.RoutingDatabaseOnlyWithHintForSelectTest;
import com.dangdang.ddframe.rdb.integrate.hint.ShardingDataBasesOnlyWithHintForDMLTest;
import com.dangdang.ddframe.rdb.integrate.hint.ShardingDataBasesOnlyWithHintForSelectTest;
import com.dangdang.ddframe.rdb.integrate.masterslave.pstatement.ShardingMasterSlaveForPStatementWithDMLTest;
import com.dangdang.ddframe.rdb.integrate.masterslave.pstatement.ShardingMasterSlaveForPStatementWithSelectTest;
......@@ -82,13 +84,15 @@ import org.junit.runners.Suite.SuiteClasses;
ShardingTablesOnlyForStatementWithSelectTest.class,
ShardingTablesOnlyForStatementWithAggregateTest.class,
ShardingTablesOnlyForStatementWithDMLTest.class,
ShardingDataBasesOnlyForHintWithDMLTest.class,
ShardingDataBasesOnlyWithHintForDMLTest.class,
ShardingDataBasesOnlyWithHintForSelectTest.class,
ShardingForNullableWithAggregateTest.class,
ShardingMasterSlaveForPStatementWithDMLTest.class,
ShardingMasterSlaveForPStatementWithSelectTest.class,
ShardingMasterSlaveForStatementWithDMLTest.class,
ShardingMasterSlaveForStatementWithSelectTest.class
ShardingMasterSlaveForStatementWithSelectTest.class,
RoutingDatabaseOnlyWithHintForDMLTest.class,
RoutingDatabaseOnlyWithHintForSelectTest.class
})
public class AllIntegrateTests {
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.integrate.fixture;
import com.dangdang.ddframe.rdb.sharding.api.ShardingValue;
import com.dangdang.ddframe.rdb.sharding.api.strategy.database.NoneKeyDatabaseShardingAlgorithm;
import java.util.Collection;
public class NoneKeyModuloDatabaseShardingAlgorithm implements NoneKeyDatabaseShardingAlgorithm<Integer> {
@Override
public String doSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {
for (String each : availableTargetNames) {
if (each.endsWith(shardingValue.getValue() % 10 + "")) {
return each;
}
}
throw new UnsupportedOperationException();
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.integrate.hint;
import com.dangdang.ddframe.rdb.integrate.fixture.NoneKeyModuloDatabaseShardingAlgorithm;
import com.dangdang.ddframe.rdb.sharding.api.rule.DataSourceRule;
import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule;
import com.dangdang.ddframe.rdb.sharding.api.strategy.database.DatabaseShardingStrategy;
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingDataSource;
abstract class AbstractRoutingDatabaseOnlyTest extends AbstractShardingDataBasesOnlyHintDBUnitTest {
protected ShardingDataSource initDataSource() {
DataSourceRule dataSourceRule = new DataSourceRule(createDataSourceMap("dataSource_%s"));
ShardingRule shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule)
.databaseShardingStrategy(new DatabaseShardingStrategy(new NoneKeyModuloDatabaseShardingAlgorithm()))
.build();
return new ShardingDataSource(shardingRule);
}
class DynamicDatabaseShardingValueHelper extends DynamicShardingValueHelper {
DynamicDatabaseShardingValueHelper(final int userId) {
super(userId, 0);
getHintManager().setDatabaseShardingValue(userId);
}
}
}
......@@ -30,6 +30,8 @@ import com.dangdang.ddframe.rdb.sharding.api.strategy.table.TableShardingStrateg
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingDataSource;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.Condition;
import com.google.common.collect.Lists;
import lombok.AccessLevel;
import lombok.Getter;
import org.dbunit.DatabaseUnitException;
import org.junit.AfterClass;
......@@ -80,6 +82,10 @@ public abstract class AbstractShardingDataBasesOnlyHintDBUnitTest extends Abstra
return shardingDataSource;
}
isShutdown = false;
return shardingDataSource = initDataSource();
}
ShardingDataSource initDataSource() {
DataSourceRule dataSourceRule = new DataSourceRule(createDataSourceMap("dataSource_%s"));
TableRule orderTableRule = TableRule.builder("t_order").dataSourceRule(dataSourceRule).build();
TableRule orderItemTableRule = TableRule.builder("t_order_item").dataSourceRule(dataSourceRule).build();
......@@ -87,8 +93,7 @@ public abstract class AbstractShardingDataBasesOnlyHintDBUnitTest extends Abstra
.bindingTableRules(Collections.singletonList(new BindingTableRule(Arrays.asList(orderTableRule, orderItemTableRule))))
.databaseShardingStrategy(new DatabaseShardingStrategy(Collections.singletonList("user_id"), new MultipleKeysModuloDatabaseShardingAlgorithm()))
.tableShardingStrategy(new TableShardingStrategy(Collections.singletonList("order_id"), new NoneTableShardingAlgorithm())).build();
shardingDataSource = new ShardingDataSource(shardingRule);
return shardingDataSource;
return new ShardingDataSource(shardingRule);
}
@AfterClass
......@@ -104,15 +109,9 @@ public abstract class AbstractShardingDataBasesOnlyHintDBUnitTest extends Abstra
}
}
protected void assertDataSet(final String expectedDataSetFile, final DynamicShardingValueHelper helper, final Connection connection, final String actualTableName, final String sql)
throws SQLException, DatabaseUnitException {
try (DynamicShardingValueHelper anotherHelper = helper) {
assertDataSet(expectedDataSetFile, connection, actualTableName, sql);
}
}
class DynamicShardingValueHelper implements AutoCloseable {
@Getter(AccessLevel.PROTECTED)
private final HintManager hintManager;
DynamicShardingValueHelper(final int userId, final int orderId) {
......
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.integrate.hint;
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingDataSource;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLStatementType;
import org.dbunit.DatabaseUnitException;
import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public class RoutingDatabaseOnlyWithHintForDMLTest extends AbstractRoutingDatabaseOnlyTest {
private ShardingDataSource shardingDataSource;
@Before
public void init() throws SQLException {
shardingDataSource = getShardingDataSource();
}
@Test
public void assertInsertWithAllPlaceholders() throws SQLException, DatabaseUnitException {
String sql = "INSERT INTO `t_order` VALUES (?, ?, ?)";
for (int i = 1; i <= 10; i++) {
try (DynamicShardingValueHelper helper = new DynamicDatabaseShardingValueHelper(i);
Connection connection = shardingDataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setInt(1, i);
preparedStatement.setInt(2, i);
preparedStatement.setString(3, "insert");
preparedStatement.executeUpdate();
}
}
assertDataSet("insert", "insert");
}
@Test
public void assertInsertWithoutPlaceholder() throws SQLException, DatabaseUnitException {
String sql = "INSERT INTO `t_order` VALUES (%s, %s, 'insert')";
for (int i = 1; i <= 10; i++) {
try (DynamicShardingValueHelper helper = new DynamicDatabaseShardingValueHelper(i);
Connection connection = shardingDataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(String.format(sql, i, i))) {
preparedStatement.executeUpdate();
}
}
assertDataSet("insert", "insert");
}
@Test
public void assertInsertWithPlaceholdersForShardingKeys() throws SQLException, DatabaseUnitException {
String sql = "INSERT INTO `t_order` VALUES (%s, %s, ?)";
for (int i = 1; i <= 10; i++) {
try (DynamicShardingValueHelper helper = new DynamicDatabaseShardingValueHelper(i);
Connection connection = shardingDataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(String.format(sql, i, i))) {
preparedStatement.setString(1, "insert");
preparedStatement.executeUpdate();
}
}
assertDataSet("insert", "insert");
}
@Test
public void assertInsertWithPlaceholdersForNotShardingKeys() throws SQLException, DatabaseUnitException {
String sql = "INSERT INTO `t_order` VALUES (%s, %s, ?)";
for (int i = 1; i <= 10; i++) {
try (DynamicShardingValueHelper helper = new DynamicDatabaseShardingValueHelper(i);
Connection connection = shardingDataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(String.format(sql, i, i))) {
preparedStatement.setString(1, "insert");
preparedStatement.executeUpdate();
}
}
assertDataSet("insert", "insert");
}
@Test
public void assertUpdateWithoutAlias() throws SQLException, DatabaseUnitException {
String sql = "UPDATE `t_order` SET `status` = ? WHERE `order_id` = ? AND `user_id` = ?";
for (int i = 10; i < 30; i++) {
for (int j = 0; j < 2; j++) {
try (DynamicShardingValueHelper helper = new DynamicDatabaseShardingValueHelper(i);
Connection connection = shardingDataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, "updated");
preparedStatement.setInt(2, i * 100 + j);
preparedStatement.setInt(3, i);
assertThat(preparedStatement.executeUpdate(), is(1));
}
}
}
assertDataSet("update", "updated");
}
@Test
public void assertUpdateWithAlias() throws SQLException, DatabaseUnitException {
String sql = "UPDATE `t_order` AS o SET o.`status` = ? WHERE o.`order_id` = ? AND o.`user_id` = ?";
for (int i = 10; i < 30; i++) {
for (int j = 0; j < 2; j++) {
try (DynamicShardingValueHelper helper = new DynamicDatabaseShardingValueHelper(i);
Connection connection = shardingDataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, "updated");
preparedStatement.setInt(2, i * 100 + j);
preparedStatement.setInt(3, i);
assertThat(preparedStatement.executeUpdate(), is(1));
}
}
}
assertDataSet("update", "updated");
}
@Test
public void assertDeleteWithoutAlias() throws SQLException, DatabaseUnitException {
String sql = "DELETE `t_order` WHERE `order_id` = ? AND `user_id` = ? AND `status` = ?";
for (int i = 10; i < 30; i++) {
for (int j = 0; j < 2; j++) {
try (DynamicShardingValueHelper helper = new DynamicDatabaseShardingValueHelper(i);
Connection connection = shardingDataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setInt(1, i * 100 + j);
preparedStatement.setInt(2, i);
preparedStatement.setString(3, "init");
assertThat(preparedStatement.executeUpdate(), is(1));
}
}
}
assertDataSet("delete", "init");
}
private void assertDataSet(final String expectedDataSetPattern, final String status) throws SQLException, DatabaseUnitException {
for (int i = 0; i < 10; i++) {
assertDataSet(String.format("integrate/dataset/db/expect/%s/db_%s.xml", expectedDataSetPattern, i),
shardingDataSource.getConnection().getConnection(String.format("dataSource_db_%s", i), SQLStatementType.SELECT), "t_order", "SELECT * FROM `t_order` WHERE `status`=?", status);
}
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.integrate.hint;
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingDataSource;
import org.dbunit.DatabaseUnitException;
import org.junit.Before;
import org.junit.Test;
import java.sql.SQLException;
public class RoutingDatabaseOnlyWithHintForSelectTest extends AbstractRoutingDatabaseOnlyTest {
private ShardingDataSource shardingDataSource;
@Before
public void init() throws SQLException {
shardingDataSource = getShardingDataSource();
}
@Test
public void assertSelectEqualsWithSingleTable() throws SQLException, DatabaseUnitException {
String sql = "SELECT * FROM `t_order` WHERE `user_id` = ? AND `order_id` = ?";
assertDataSet("integrate/dataset/db/expect/select/SelectEqualsWithSingleTable_0.xml", new AbstractRoutingDatabaseOnlyTest
.DynamicDatabaseShardingValueHelper(10), shardingDataSource.getConnection(), "t_order", sql, 10, 1000);
assertDataSet("integrate/dataset/db/expect/select/SelectEqualsWithSingleTable_1.xml", new AbstractRoutingDatabaseOnlyTest
.DynamicDatabaseShardingValueHelper(12), shardingDataSource.getConnection(), "t_order", sql, 12, 1201);
assertDataSet("integrate/dataset/Empty.xml", new AbstractRoutingDatabaseOnlyTest.DynamicDatabaseShardingValueHelper(12), shardingDataSource.getConnection(), "t_order", sql, 12, 1000);
}
}
......@@ -30,7 +30,7 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public final class ShardingDataBasesOnlyForHintWithDMLTest extends AbstractShardingDataBasesOnlyHintDBUnitTest {
public final class ShardingDataBasesOnlyWithHintForDMLTest extends AbstractShardingDataBasesOnlyHintDBUnitTest {
private ShardingDataSource shardingDataSource;
......
......@@ -101,4 +101,14 @@ public final class HintManagerTest {
assertThat(hintManager.getTableShardingValue(shardingKey).getValueRange().upperEndpoint(), is((Comparable) 10));
}
}
@Test
public void assertAddDatabaseShardingOnly() {
try (HintManager hintManager = HintManager.getInstance()) {
hintManager.setDatabaseShardingValue("1");
assertTrue(hintManager.isDatabaseShardingOnly());
assertTrue(hintManager.isShardingHint());
assertThat((String) hintManager.getDatabaseShardingValue(new ShardingKey(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME)).getValue(), is("1"));
}
}
}
......@@ -33,7 +33,8 @@ import org.junit.runners.Suite;
SingleRoutingResultTest.class,
BindingRoutingResultTest.class,
CartesianResultTest.class,
SingleRouterUtilTest.class
SingleRouterUtilTest.class,
DatabaseTest.class
})
public class AllRouterTests {
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.router;
import com.dangdang.ddframe.rdb.sharding.api.HintManager;
import com.dangdang.ddframe.rdb.sharding.api.rule.DataSourceRule;
import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule;
import com.dangdang.ddframe.rdb.sharding.api.strategy.database.DatabaseShardingStrategy;
import com.dangdang.ddframe.rdb.sharding.constants.DatabaseType;
import com.dangdang.ddframe.rdb.sharding.exception.SQLParserException;
import com.dangdang.ddframe.rdb.sharding.router.fixture.OrderDatabaseShardingAlgorithm;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import org.junit.Before;
import org.junit.Test;
import javax.sql.DataSource;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public class DatabaseTest {
private ShardingRule shardingRule;
@Before
public void setRouteRuleContext() {
Map<String, DataSource> dataSourceMap = new HashMap<>();
dataSourceMap.put("ds_0", null);
dataSourceMap.put("ds_1", null);
DataSourceRule dataSourceRule = new DataSourceRule(dataSourceMap);
shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule)
.databaseShardingStrategy(new DatabaseShardingStrategy(new OrderDatabaseShardingAlgorithm())).build();
}
@Test
public void assertSQL() {
try (HintManager hintManager = HintManager.getInstance()) {
hintManager.setDatabaseShardingValue(1);
assertTarget("select * from tesT", "ds_1");
assertTarget("insert into test values (1,2)", "ds_1");
assertTarget("update test set a = 1", "ds_1");
assertTarget("delete from test where id = 2", "ds_1");
hintManager.setDatabaseShardingValue(2);
assertTarget("select * from tesT", "ds_0");
}
}
private void assertTarget(final String originSql, final String targetDataSource) throws SQLParserException {
SQLRouteResult actual = new SQLRouteEngine(shardingRule, DatabaseType.MySQL).route(originSql, Collections.emptyList());
assertThat(actual.getExecutionUnits().size(), is(1));
Set<String> actualDataSources = new HashSet<>(Collections2.transform(actual.getExecutionUnits(), new Function<SQLExecutionUnit, String>() {
@Override
public String apply(final SQLExecutionUnit input) {
return input.getDataSource();
}
}));
assertThat(actualDataSources.size(), is(1));
assertThat(actualDataSources, hasItems(targetDataSource));
Collection<String> actualSQLs = Collections2.transform(actual.getExecutionUnits(), new Function<SQLExecutionUnit, String>() {
@Override
public String apply(final SQLExecutionUnit input) {
return input.getSql();
}
});
assertThat(originSql, is(actualSQLs.iterator().next()));
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.router.fixture;
import com.dangdang.ddframe.rdb.sharding.api.ShardingValue;
import com.dangdang.ddframe.rdb.sharding.api.strategy.database.NoneKeyDatabaseShardingAlgorithm;
import java.util.Collection;
public class OrderDatabaseShardingAlgorithm implements NoneKeyDatabaseShardingAlgorithm<Integer> {
@Override
public String doSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {
for (String each : availableTargetNames) {
if (each.endsWith(String.valueOf(shardingValue.getValue() % 2))) {
return each;
}
}
return null;
}
}
......@@ -17,6 +17,8 @@
package com.dangdang.ddframe.rdb.sharding.util;
import com.dangdang.ddframe.rdb.sharding.exception.SQLParserException;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLStatementType;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
......@@ -31,4 +33,17 @@ public class SQLUtilTest {
assertThat(SQLUtil.getExactlyValue("\"xxx\""), is("xxx"));
assertThat(SQLUtil.getExactlyValue("'xxx'"), is("xxx"));
}
@Test
public void assertSQLType() {
assertThat(SQLUtil.getTypeByStart(" /*COMMENT*/ \t \n \r \fsElecT\t\n * from table "), is(SQLStatementType.SELECT));
assertThat(SQLUtil.getTypeByStart(" - - COMMENT \t \n \r \fInsert\t\n into table "), is(SQLStatementType.INSERT));
assertThat(SQLUtil.getTypeByStart(" /*+ HINT SELECT * FROM TT*/ \t \n \r \fuPdAte\t\n table "), is(SQLStatementType.UPDATE));
assertThat(SQLUtil.getTypeByStart(" /*+ HINT SELECT * FROM TT*/ \t \n \r \fdelete\t\n table "), is(SQLStatementType.DELETE));
}
@Test(expected = SQLParserException.class)
public void assertNoSQL() {
SQLUtil.getTypeByStart("int i = 0");
}
}
......@@ -15,6 +15,8 @@
<logger name="org.dbunit" level="info" additivity="false">
<appender-ref ref="console"/>
</logger>
<logger name="com.dangdang.ddframe.rdb.sharding.executor.ExecutorExceptionHandler" level="off">
</logger>
<!-- 日志级别 -->
<root>
<level value="error" />
......
hugo-theme-learn @ 31bb6502
Subproject commit 31bb6502e4dd1334d2980a886f008ff423d40be2
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册