提交 1a202874 编写于 作者: G gaohongtao

fix #118 firstly execute DQL statement then execute DML statement, DML...

fix #118 firstly execute DQL statement then execute DML statement, DML statement is executing in slave db
上级 4b71cae1
......@@ -54,6 +54,11 @@ public final class MasterSlaveDataSource extends AbstractDataSourceAdapter {
private final SlaveLoadBalanceStrategy slaveLoadBalanceStrategy = new RoundRobinSlaveLoadBalanceStrategy();
static boolean isDML(final SQLStatementType sqlStatementType) {
return SQLStatementType.SELECT != sqlStatementType || DML_FLAG.get() || HintManagerHolder.isMasterRouteOnly();
}
/**
* 获取主或从节点的数据源名称.
*
......@@ -61,7 +66,7 @@ public final class MasterSlaveDataSource extends AbstractDataSourceAdapter {
* @return 主或从节点的数据源
*/
public DataSource getDataSource(final SQLStatementType sqlStatementType) {
if (SQLStatementType.SELECT != sqlStatementType || DML_FLAG.get() || HintManagerHolder.isMasterRouteOnly()) {
if (isDML(sqlStatementType)) {
DML_FLAG.set(true);
return masterDataSource;
}
......
......@@ -23,6 +23,7 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractConnectionAdapter;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLStatementType;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
......@@ -70,20 +71,64 @@ public final class ShardingConnection extends AbstractConnectionAdapter {
}
private Connection getConnectionInternal(final String dataSourceName, final SQLStatementType sqlStatementType) throws SQLException {
if (connectionMap.containsKey(dataSourceName)) {
return connectionMap.get(dataSourceName);
Optional<Connection> connectionOptional = fetchCachedConnectionBySqlStatementType(dataSourceName, sqlStatementType);
if (connectionOptional.isPresent()) {
return connectionOptional.get();
}
Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnection", dataSourceName));
DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName);
String realDataSourceName = dataSourceName;
if (dataSource instanceof MasterSlaveDataSource) {
dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlStatementType);
realDataSourceName = getRealDataSourceName(dataSourceName, sqlStatementType);
}
Connection result = dataSource.getConnection();
MetricsContext.stop(metricsContext);
connectionMap.put(dataSourceName, result);
connectionMap.put(realDataSourceName, result);
return result;
}
private String getRealDataSourceName(final String dataSourceName, final SQLStatementType sqlStatementType) {
String slaveDataSourceName = getSlaveDataSourceName(dataSourceName);
if (!MasterSlaveDataSource.isDML(sqlStatementType)) {
return slaveDataSourceName;
}
Connection slaveConnection = connectionMap.remove(slaveDataSourceName);
if (null != slaveConnection) {
try {
slaveConnection.close();
} catch (final SQLException ignored) {
}
}
return getMasterDataSourceName(dataSourceName);
}
private Optional<Connection> fetchCachedConnectionBySqlStatementType(final String dataSourceName, final SQLStatementType sqlStatementType) {
if (connectionMap.containsKey(dataSourceName)) {
return Optional.of(connectionMap.get(dataSourceName));
}
String masterDataSourceName = getMasterDataSourceName(dataSourceName);
if (connectionMap.containsKey(masterDataSourceName)) {
return Optional.of(connectionMap.get(masterDataSourceName));
}
if (MasterSlaveDataSource.isDML(sqlStatementType)) {
return Optional.absent();
}
String slaveDataSourceName = getSlaveDataSourceName(dataSourceName);
if (connectionMap.containsKey(slaveDataSourceName)) {
return Optional.of(connectionMap.get(slaveDataSourceName));
}
return Optional.absent();
}
private String getMasterDataSourceName(final String dataSourceName) {
return Joiner.on("-").join(dataSourceName, "SHARDING-JDBC", "MASTER");
}
private String getSlaveDataSourceName(final String dataSourceName) {
return Joiner.on("-").join(dataSourceName, "SHARDING-JDBC", "SLAVE");
}
@Override
public PreparedStatement prepareStatement(final String sql) throws SQLException {
return new ShardingPreparedStatement(this, sql);
......
......@@ -20,18 +20,29 @@ package com.dangdang.ddframe.rdb.sharding.fixture;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDataSourceAdapter;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.mockito.Mockito;
import java.sql.Connection;
import java.sql.SQLException;
import static org.mockito.Mockito.doThrow;
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
public final class TestDataSource extends AbstractDataSourceAdapter {
private final String name;
@Setter
private boolean throwExceptionWhenClosing;
@Override
public Connection getConnection() throws SQLException {
return null;
Connection result = Mockito.mock(Connection.class);
if (throwExceptionWhenClosing) {
doThrow(SQLException.class).when(result).close();
}
return result;
}
}
......@@ -56,6 +56,7 @@ import org.junit.runners.Suite;
MasterSlaveDataSourceTest.class,
ParameterListTest.class,
AbstractPreparedStatementAdapterTest.class,
ShardingConnectionTest.class,
})
public class AllJDBCTests {
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.jdbc;
import com.dangdang.ddframe.rdb.sharding.api.rule.DataSourceRule;
import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule;
import com.dangdang.ddframe.rdb.sharding.api.rule.TableRule;
import com.dangdang.ddframe.rdb.sharding.fixture.TestDataSource;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLStatementType;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
public class ShardingConnectionTest {
private static final DataSource MASTER_DATA_SOURCE = new TestDataSource("test_ds_master");
private static final DataSource SLAVE_DATA_SOURCE = new TestDataSource("test_ds_slave");
private static final MasterSlaveDataSource MASTER_SLAVE_DATA_SOURCE = new MasterSlaveDataSource("test_ds", MASTER_DATA_SOURCE, Collections.singletonList(SLAVE_DATA_SOURCE));
private static final String DS_NAME = "default";
private ShardingConnection connection;
@BeforeClass
public static void init() {
((TestDataSource) SLAVE_DATA_SOURCE).setThrowExceptionWhenClosing(true);
}
@Before
public void setUp() {
Map<String, DataSource> dataSourceMap = new HashMap<>(1);
dataSourceMap.put(DS_NAME, MASTER_SLAVE_DATA_SOURCE);
DataSourceRule dataSourceRule = new DataSourceRule(dataSourceMap);
ShardingRule rule = new ShardingRule.ShardingRuleBuilder().dataSourceRule(dataSourceRule).tableRules(Collections.singleton(new TableRule.TableRuleBuilder("test").dataSourceRule(dataSourceRule).build())).build();
ShardingContext sc = new ShardingContext(rule, null, null);
connection = new ShardingConnection(sc);
}
@After
public void clear() {
try {
connection.close();
} catch (SQLException ignored) {
}
}
@Test
public void getConnectionSelectThenUpdate() throws Exception {
assertNotSame(connection.getConnection(DS_NAME, SQLStatementType.SELECT), connection.getConnection(DS_NAME, SQLStatementType.UPDATE));
}
@Test
public void getConnectionUpdateThenSelect() throws Exception {
assertSame(connection.getConnection(DS_NAME, SQLStatementType.UPDATE), connection.getConnection(DS_NAME, SQLStatementType.SELECT));
}
@Test
public void getConnectionBothSelect() throws Exception {
assertSame(connection.getConnection(DS_NAME, SQLStatementType.SELECT), connection.getConnection(DS_NAME, SQLStatementType.SELECT));
}
@Test
public void getConnectionBothUpdate() throws Exception {
assertSame(connection.getConnection(DS_NAME, SQLStatementType.UPDATE), connection.getConnection(DS_NAME, SQLStatementType.UPDATE));
}
@Test
public void getConnectionMixed() throws Exception {
Connection slaveConnection = connection.getConnection(DS_NAME, SQLStatementType.SELECT);
Connection masterConnection = connection.getConnection(DS_NAME, SQLStatementType.UPDATE);
assertNotSame(slaveConnection, masterConnection);
assertNotSame(slaveConnection, connection.getConnection(DS_NAME, SQLStatementType.SELECT));
assertNotSame(slaveConnection, connection.getConnection(DS_NAME, SQLStatementType.UPDATE));
assertSame(masterConnection, connection.getConnection(DS_NAME, SQLStatementType.SELECT));
assertSame(masterConnection, connection.getConnection(DS_NAME, SQLStatementType.UPDATE));
}
}
\ No newline at end of file
......@@ -18,6 +18,7 @@ weight = 1
### 缺陷修正
1. [ISSUE #149](https://github.com/dangdangdotcom/sharding-jdbc/issues/149) INSERT IGNORE INTO时如果数据重了忽略时返回的成-1了,应该返回0
1. [ISSUE #118](https://github.com/dangdangdotcom/sharding-jdbc/issues/118) 同一个线程内先执行DQL后执行DML,DML操作在从库上执行
## 1.3.2
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册