提交 7614092d 编写于 作者: T tuohai666

Merge remote-tracking branch 'upstrem/dev' into dev

......@@ -20,6 +20,7 @@ package io.shardingsphere.core.constant.properties;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import io.shardingsphere.core.util.StringUtil;
import lombok.Getter;
import java.util.ArrayList;
import java.util.Collection;
......@@ -31,9 +32,11 @@ import java.util.Set;
*
* @author gaohongtao
* @author zhangliang
* @author panjuan
*/
public final class ShardingProperties {
@Getter
private final Properties props;
public ShardingProperties(final Properties props) {
......
......@@ -29,4 +29,4 @@ import org.junit.runners.Suite.SuiteClasses;
OrchestrationSpringBootShardingTest.class
})
public final class AllTests {
}
}
\ No newline at end of file
......@@ -19,6 +19,7 @@ package io.shardingsphere.jdbc.spring.boot.type;
import io.shardingsphere.core.api.ConfigMapContext;
import io.shardingsphere.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingsphere.jdbc.orchestration.internal.OrchestrationMasterSlaveDataSource;
import io.shardingsphere.jdbc.spring.boot.util.EmbedTestingServer;
import org.apache.commons.dbcp2.BasicDataSource;
import org.junit.BeforeClass;
......@@ -31,6 +32,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -53,9 +55,12 @@ public class OrchestrationSpringBootMasterSlaveTest {
}
@Test
public void assertWithMasterSlaveDataSource() {
assertTrue(dataSource instanceof MasterSlaveDataSource);
for (DataSource each : ((MasterSlaveDataSource) dataSource).getAllDataSources().values()) {
public void assertWithMasterSlaveDataSource() throws ReflectiveOperationException {
assertTrue(dataSource instanceof OrchestrationMasterSlaveDataSource);
Field field = OrchestrationMasterSlaveDataSource.class.getDeclaredField("dataSource");
field.setAccessible(true);
MasterSlaveDataSource masterSlaveDataSource = (MasterSlaveDataSource) field.get(dataSource);
for (DataSource each : masterSlaveDataSource.getAllDataSources().values()) {
assertThat(((BasicDataSource) each).getMaxTotal(), is(16));
assertThat(((BasicDataSource) each).getUsername(), is("root"));
}
......
......@@ -22,6 +22,7 @@ import io.shardingsphere.core.constant.properties.ShardingProperties;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.jdbc.orchestration.internal.OrchestrationShardingDataSource;
import io.shardingsphere.jdbc.spring.boot.util.EmbedTestingServer;
import org.apache.commons.dbcp2.BasicDataSource;
import org.junit.BeforeClass;
......@@ -58,11 +59,14 @@ public class OrchestrationSpringBootShardingTest {
@Test
public void assertWithShardingDataSource() throws NoSuchFieldException, IllegalAccessException {
assertTrue(dataSource instanceof ShardingDataSource);
assertTrue(dataSource instanceof OrchestrationShardingDataSource);
Field dataSourceField = OrchestrationShardingDataSource.class.getDeclaredField("dataSource");
dataSourceField.setAccessible(true);
ShardingDataSource shardingDataSource = (ShardingDataSource) dataSourceField.get(dataSource);
Field field = ShardingDataSource.class.getDeclaredField("shardingContext");
field.setAccessible(true);
ShardingContext shardingContext = (ShardingContext) field.get(dataSource);
for (DataSource each : shardingContext.getDataSourceMap().values()) {
ShardingContext shardingContext = (ShardingContext) field.get(shardingDataSource);
for (DataSource each : shardingDataSource.getDataSourceMap().values()) {
assertThat(((BasicDataSource) each).getMaxTotal(), is(16));
}
assertTrue(shardingContext.isShowSQL());
......@@ -72,7 +76,7 @@ public class OrchestrationSpringBootShardingTest {
Field propertiesField = ShardingDataSource.class.getDeclaredField("shardingProperties");
propertiesField.setAccessible(true);
ShardingProperties shardingProperties = (ShardingProperties) propertiesField.get(dataSource);
ShardingProperties shardingProperties = (ShardingProperties) propertiesField.get(shardingDataSource);
assertTrue((Boolean) shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW));
assertThat((Integer) shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE), is(100));
}
......
......@@ -35,6 +35,6 @@ public class SpringShardingDataSource extends ShardingDataSource {
public SpringShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig,
final Map<String, Object> configMap, final Properties props) throws SQLException {
super(getRawDataSourceMap(dataSourceMap), new ShardingRule(getShardingRuleConfiguration(dataSourceMap, shardingRuleConfig), dataSourceMap.keySet()), configMap, props);
super(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), configMap, props);
}
}
......@@ -79,8 +79,9 @@ public class OrchestrationMasterSlaveNamespaceTest extends AbstractJUnit4SpringC
}
private MasterSlaveRule getMasterSlaveRule(final String masterSlaveDataSourceName) {
MasterSlaveDataSource masterSlaveDataSource = this.applicationContext.getBean(masterSlaveDataSourceName, MasterSlaveDataSource.class);
return (MasterSlaveRule) FieldValueUtil.getFieldValue(masterSlaveDataSource, "masterSlaveRule", true);
OrchestrationMasterSlaveDataSource masterSlaveDataSource = this.applicationContext.getBean(masterSlaveDataSourceName, OrchestrationMasterSlaveDataSource.class);
MasterSlaveDataSource dataSource = (MasterSlaveDataSource) FieldValueUtil.getFieldValue(masterSlaveDataSource, "dataSource", false);
return dataSource.getMasterSlaveRule();
}
@Test
......@@ -89,7 +90,7 @@ public class OrchestrationMasterSlaveNamespaceTest extends AbstractJUnit4SpringC
Map<String, Object> configMap = new HashMap<>();
configMap.put("key1", "value1");
assertThat(ConfigMapContext.getInstance().getMasterSlaveConfig(), is(configMap));
assertThat(masterSlaveDataSource, instanceOf(MasterSlaveDataSource.class));
assertThat(masterSlaveDataSource, instanceOf(OrchestrationMasterSlaveDataSource.class));
}
@Test
......@@ -99,7 +100,8 @@ public class OrchestrationMasterSlaveNamespaceTest extends AbstractJUnit4SpringC
}
private ShardingProperties getShardingProperties(final String masterSlaveDataSourceName) {
MasterSlaveDataSource masterSlaveDataSource = this.applicationContext.getBean(masterSlaveDataSourceName, MasterSlaveDataSource.class);
return (ShardingProperties) FieldValueUtil.getFieldValue(masterSlaveDataSource, "shardingProperties", true);
OrchestrationMasterSlaveDataSource masterSlaveDataSource = this.applicationContext.getBean(masterSlaveDataSourceName, OrchestrationMasterSlaveDataSource.class);
MasterSlaveDataSource dataSource = (MasterSlaveDataSource) FieldValueUtil.getFieldValue(masterSlaveDataSource, "dataSource", false);
return dataSource.getShardingProperties();
}
}
......@@ -19,6 +19,7 @@ package io.shardingsphere.jdbc.orchestration.spring;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.jdbc.orchestration.internal.OrchestrationShardingDataSource;
import io.shardingsphere.jdbc.orchestration.spring.util.EmbedTestingServer;
import io.shardingsphere.jdbc.orchestration.spring.util.FieldValueUtil;
import org.junit.BeforeClass;
......@@ -58,14 +59,15 @@ public class OrchestrationShardingMasterSlaveNamespaceTest extends AbstractJUnit
@SuppressWarnings("unchecked")
private Map<String, DataSource> getDataSourceMap() {
ShardingDataSource shardingDataSource = applicationContext.getBean("defaultShardingDataSource", ShardingDataSource.class);
Object shardingContext = FieldValueUtil.getFieldValue(shardingDataSource, "shardingContext", true);
return (Map) FieldValueUtil.getFieldValue(shardingContext, "dataSourceMap");
OrchestrationShardingDataSource shardingDataSource = applicationContext.getBean("defaultShardingDataSource", OrchestrationShardingDataSource.class);
ShardingDataSource dataSource = (ShardingDataSource) FieldValueUtil.getFieldValue(shardingDataSource, "dataSource");
return dataSource.getDataSourceMap();
}
private ShardingRule getShardingRule() {
ShardingDataSource shardingDataSource = applicationContext.getBean("defaultShardingDataSource", ShardingDataSource.class);
Object shardingContext = FieldValueUtil.getFieldValue(shardingDataSource, "shardingContext", true);
OrchestrationShardingDataSource shardingDataSource = applicationContext.getBean("defaultShardingDataSource", OrchestrationShardingDataSource.class);
ShardingDataSource dataSource = (ShardingDataSource) FieldValueUtil.getFieldValue(shardingDataSource, "dataSource");
Object shardingContext = FieldValueUtil.getFieldValue(dataSource, "shardingContext");
return (ShardingRule) FieldValueUtil.getFieldValue(shardingContext, "shardingRule");
}
}
......@@ -30,6 +30,7 @@ import io.shardingsphere.core.rule.BindingTableRule;
import io.shardingsphere.core.rule.DataNode;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.core.rule.TableRule;
import io.shardingsphere.jdbc.orchestration.internal.OrchestrationShardingDataSource;
import io.shardingsphere.jdbc.orchestration.spring.algorithm.DefaultComplexKeysShardingAlgorithm;
import io.shardingsphere.jdbc.orchestration.spring.algorithm.DefaultHintShardingAlgorithm;
import io.shardingsphere.jdbc.orchestration.spring.algorithm.PreciseModuloDatabaseShardingAlgorithm;
......@@ -183,13 +184,14 @@ public class OrchestrationShardingNamespaceTest extends AbstractJUnit4SpringCont
@Test
public void assertPropsDataSource() {
ShardingDataSource shardingDataSource = this.applicationContext.getBean("propsDataSource", ShardingDataSource.class);
OrchestrationShardingDataSource shardingDataSource = this.applicationContext.getBean("propsDataSource", OrchestrationShardingDataSource.class);
ShardingDataSource dataSource = (ShardingDataSource) FieldValueUtil.getFieldValue(shardingDataSource, "dataSource");
Map<String, Object> configMap = new HashMap<>();
configMap.put("key1", "value1");
assertThat(ConfigMapContext.getInstance().getShardingConfig(), is(configMap));
Object shardingContext = FieldValueUtil.getFieldValue(shardingDataSource, "shardingContext", true);
Object shardingContext = FieldValueUtil.getFieldValue(dataSource, "shardingContext");
assertTrue((boolean) FieldValueUtil.getFieldValue(shardingContext, "showSQL"));
ShardingProperties shardingProperties = (ShardingProperties) FieldValueUtil.getFieldValue(shardingDataSource, "shardingProperties", true);
ShardingProperties shardingProperties = (ShardingProperties) FieldValueUtil.getFieldValue(dataSource, "shardingProperties");
boolean showSql = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
assertTrue(showSql);
int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
......@@ -199,13 +201,14 @@ public class OrchestrationShardingNamespaceTest extends AbstractJUnit4SpringCont
@Test
public void assertShardingDataSourceType() {
assertTrue(this.applicationContext.getBean("simpleShardingDataSource") instanceof ShardingDataSource);
assertTrue(this.applicationContext.getBean("simpleShardingDataSource") instanceof OrchestrationShardingDataSource);
}
@Test
public void assertDefaultActualDataNodes() {
ShardingDataSource multiTableRulesDataSource = this.applicationContext.getBean("multiTableRulesDataSource", ShardingDataSource.class);
Object shardingContext = FieldValueUtil.getFieldValue(multiTableRulesDataSource, "shardingContext", true);
OrchestrationShardingDataSource multiTableRulesDataSource = this.applicationContext.getBean("multiTableRulesDataSource", OrchestrationShardingDataSource.class);
ShardingDataSource dataSource = (ShardingDataSource) FieldValueUtil.getFieldValue(multiTableRulesDataSource, "dataSource");
Object shardingContext = FieldValueUtil.getFieldValue(dataSource, "shardingContext");
ShardingRule shardingRule = (ShardingRule) FieldValueUtil.getFieldValue(shardingContext, "shardingRule");
assertThat(shardingRule.getTableRules().size(), is(2));
Iterator<TableRule> tableRules = shardingRule.getTableRules().iterator();
......@@ -221,14 +224,15 @@ public class OrchestrationShardingNamespaceTest extends AbstractJUnit4SpringCont
@SuppressWarnings("unchecked")
private Map<String, DataSource> getDataSourceMap(final String shardingDataSourceName) {
ShardingDataSource shardingDataSource = this.applicationContext.getBean(shardingDataSourceName, ShardingDataSource.class);
Object shardingContext = FieldValueUtil.getFieldValue(shardingDataSource, "shardingContext", true);
return (Map) FieldValueUtil.getFieldValue(shardingContext, "dataSourceMap");
OrchestrationShardingDataSource shardingDataSource = this.applicationContext.getBean(shardingDataSourceName, OrchestrationShardingDataSource.class);
ShardingDataSource dataSource = (ShardingDataSource) FieldValueUtil.getFieldValue(shardingDataSource, "dataSource");
return dataSource.getDataSourceMap();
}
private ShardingRule getShardingRule(final String shardingDataSourceName) {
ShardingDataSource shardingDataSource = this.applicationContext.getBean(shardingDataSourceName, ShardingDataSource.class);
Object shardingContext = FieldValueUtil.getFieldValue(shardingDataSource, "shardingContext", true);
OrchestrationShardingDataSource shardingDataSource = this.applicationContext.getBean(shardingDataSourceName, OrchestrationShardingDataSource.class);
ShardingDataSource dataSource = (ShardingDataSource) FieldValueUtil.getFieldValue(shardingDataSource, "dataSource");
Object shardingContext = FieldValueUtil.getFieldValue(dataSource, "shardingContext");
return (ShardingRule) FieldValueUtil.getFieldValue(shardingContext, "shardingRule");
}
}
......@@ -19,6 +19,7 @@ package io.shardingsphere.jdbc.orchestration.api;
import com.google.common.base.Preconditions;
import io.shardingsphere.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingsphere.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingsphere.jdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingsphere.jdbc.orchestration.internal.OrchestrationFacade;
import io.shardingsphere.jdbc.orchestration.internal.OrchestrationMasterSlaveDataSource;
......@@ -57,9 +58,8 @@ public final class OrchestrationMasterSlaveDataSourceFactory {
if (null == masterSlaveRuleConfig || null == masterSlaveRuleConfig.getMasterDataSourceName()) {
return createDataSource(orchestrationConfig);
}
OrchestrationMasterSlaveDataSource result = new OrchestrationMasterSlaveDataSource(dataSourceMap, masterSlaveRuleConfig, configMap, props, new OrchestrationFacade(orchestrationConfig));
result.init();
return result;
MasterSlaveDataSource masterSlaveDataSource = new MasterSlaveDataSource(dataSourceMap, masterSlaveRuleConfig, configMap, props);
return new OrchestrationMasterSlaveDataSource(masterSlaveDataSource, new OrchestrationFacade(orchestrationConfig));
}
/**
......@@ -74,9 +74,7 @@ public final class OrchestrationMasterSlaveDataSourceFactory {
ConfigurationService configService = orchestrationFacade.getConfigService();
MasterSlaveRuleConfiguration masterSlaveRuleConfig = configService.loadMasterSlaveRuleConfiguration();
Preconditions.checkNotNull(masterSlaveRuleConfig, "Missing the master-slave rule configuration on register center");
OrchestrationMasterSlaveDataSource result = new OrchestrationMasterSlaveDataSource(
configService.loadDataSourceMap(), masterSlaveRuleConfig, configService.loadMasterSlaveConfigMap(), configService.loadMasterSlaveProperties(), orchestrationFacade);
result.init();
return result;
MasterSlaveDataSource masterSlaveDataSource = new MasterSlaveDataSource( configService.loadDataSourceMap(), masterSlaveRuleConfig, configService.loadMasterSlaveConfigMap(), configService.loadMasterSlaveProperties());
return new OrchestrationMasterSlaveDataSource(masterSlaveDataSource, orchestrationFacade);
}
}
......@@ -19,6 +19,8 @@ package io.shardingsphere.jdbc.orchestration.api;
import com.google.common.base.Preconditions;
import io.shardingsphere.core.api.config.ShardingRuleConfiguration;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.jdbc.orchestration.api.config.OrchestrationConfiguration;
import io.shardingsphere.jdbc.orchestration.internal.OrchestrationFacade;
import io.shardingsphere.jdbc.orchestration.internal.OrchestrationShardingDataSource;
......@@ -57,9 +59,8 @@ public final class OrchestrationShardingDataSourceFactory {
if (null == shardingRuleConfig || shardingRuleConfig.getTableRuleConfigs().isEmpty()) {
return createDataSource(orchestrationConfig);
}
OrchestrationShardingDataSource result = new OrchestrationShardingDataSource(dataSourceMap, shardingRuleConfig, configMap, props, new OrchestrationFacade(orchestrationConfig));
result.init();
return result;
ShardingDataSource shardingDataSource = new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), configMap, props);
return new OrchestrationShardingDataSource(shardingDataSource, new OrchestrationFacade(orchestrationConfig));
}
/**
......@@ -74,9 +75,7 @@ public final class OrchestrationShardingDataSourceFactory {
ConfigurationService configService = orchestrationFacade.getConfigService();
ShardingRuleConfiguration shardingRuleConfig = configService.loadShardingRuleConfiguration();
Preconditions.checkNotNull(shardingRuleConfig, "Missing the sharding rule configuration on register center");
OrchestrationShardingDataSource result = new OrchestrationShardingDataSource(
configService.loadDataSourceMap(), shardingRuleConfig, configService.loadShardingConfigMap(), configService.loadShardingProperties(), orchestrationFacade);
result.init();
return result;
ShardingDataSource shardingDataSource = new ShardingDataSource(configService.loadDataSourceMap(), new ShardingRule(shardingRuleConfig, configService.loadDataSourceMap().keySet()), configService.loadShardingConfigMap(), configService.loadShardingProperties());
return new OrchestrationShardingDataSource(shardingDataSource, orchestrationFacade);
}
}
......@@ -17,14 +17,26 @@
package io.shardingsphere.jdbc.orchestration.internal;
import com.google.common.eventbus.Subscribe;
import io.shardingsphere.core.api.ConfigMapContext;
import io.shardingsphere.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.jdbc.adapter.AbstractDataSourceAdapter;
import io.shardingsphere.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingsphere.core.rule.MasterSlaveRule;
import io.shardingsphere.jdbc.orchestration.internal.event.config.MasterSlaveConfigurationEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.state.CircuitStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.state.DisabledStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSource;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
/**
* Orchestration master-slave datasource.
......@@ -33,37 +45,96 @@ import java.util.Properties;
* @author panjuan
*/
@Slf4j
public final class OrchestrationMasterSlaveDataSource extends MasterSlaveDataSource implements AutoCloseable {
public final class OrchestrationMasterSlaveDataSource extends AbstractDataSourceAdapter implements AutoCloseable {
private MasterSlaveDataSource dataSource;
private final OrchestrationFacade orchestrationFacade;
private final Map<String, DataSource> dataSourceMap;
private final MasterSlaveRuleConfiguration masterSlaveRuleConfig;
private boolean isCircuitBreak;
private final Map<String, Object> configMap;
public OrchestrationMasterSlaveDataSource(final MasterSlaveDataSource masterSlaveDataSource, final OrchestrationFacade orchestrationFacade) throws SQLException {
super(getAllDataSources(masterSlaveDataSource.getDataSourceMap(), masterSlaveDataSource.getMasterSlaveRule().getMasterDataSourceName(), masterSlaveDataSource.getMasterSlaveRule().getSlaveDataSourceNames()));
this.dataSource = masterSlaveDataSource;
this.orchestrationFacade = orchestrationFacade;
initOrchestrationFacade(masterSlaveDataSource);
this.dataSourceMap = masterSlaveDataSource.getDataSourceMap();
ShardingEventBusInstance.getInstance().register(this);
}
private final Properties props;
private void initOrchestrationFacade(final MasterSlaveDataSource masterSlaveDataSource) {
MasterSlaveRule masterSlaveRule = masterSlaveDataSource.getMasterSlaveRule();
MasterSlaveRuleConfiguration masterSlaveRuleConfiguration = new MasterSlaveRuleConfiguration(masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), masterSlaveRule.getSlaveDataSourceNames(), masterSlaveRule.getLoadBalanceAlgorithm());
this.orchestrationFacade.init(masterSlaveDataSource.getDataSourceMap(), masterSlaveRuleConfiguration, ConfigMapContext.getInstance().getMasterSlaveConfig(), masterSlaveDataSource.getShardingProperties().getProps());
}
public OrchestrationMasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig,
final Map<String, Object> configMap, final Properties props, final OrchestrationFacade orchestrationFacade) throws SQLException {
super(dataSourceMap, masterSlaveRuleConfig, configMap, props);
this.orchestrationFacade = orchestrationFacade;
this.dataSourceMap = dataSourceMap;
this.masterSlaveRuleConfig = masterSlaveRuleConfig;
this.configMap = configMap;
this.props = props;
private static Collection<DataSource> getAllDataSources(final Map<String, DataSource> dataSourceMap, final String masterDataSourceName, final Collection<String> slaveDataSourceNames) {
Collection<DataSource> result = new LinkedList<>();
result.add(dataSourceMap.get(masterDataSourceName));
for (String each : slaveDataSourceNames) {
result.add(dataSourceMap.get(each));
}
return result;
}
/**
* Initialize for master-slave orchestration.
*/
public void init() {
orchestrationFacade.init(dataSourceMap, masterSlaveRuleConfig, configMap, props);
@Override
public Connection getConnection() {
if (isCircuitBreak) {
return new CircuitBreakerDataSource().getConnection();
}
return dataSource.getConnection();
}
@Override
public void close() {
dataSource.close();
orchestrationFacade.close();
}
/**
* Renew master-slave data source.
*
* @param masterSlaveEvent master slave configuration event bus event
* @throws SQLException sql exception
*/
@Subscribe
public void renew(final MasterSlaveConfigurationEventBusEvent masterSlaveEvent) throws SQLException {
MasterSlaveRuleConfiguration masterSlaveRuleConfig = masterSlaveEvent.getMasterSlaveRuleConfig();
super.renew(getAllDataSources(masterSlaveEvent.getDataSourceMap(), masterSlaveRuleConfig.getMasterDataSourceName(), masterSlaveRuleConfig.getSlaveDataSourceNames()));
dataSource.close();
dataSource = new MasterSlaveDataSource(masterSlaveEvent.getDataSourceMap(), masterSlaveEvent.getMasterSlaveRuleConfig(), ConfigMapContext.getInstance().getMasterSlaveConfig(), masterSlaveEvent.getProps());
}
/**
* Renew disable dataSource names.
*
* @param disabledStateEventBusEvent jdbc disabled event bus event
* @throws SQLException sql exception
*/
@Subscribe
public void renew(final DisabledStateEventBusEvent disabledStateEventBusEvent) throws SQLException {
Map<String, DataSource> newDataSourceMap = getAvailableDataSourceMap(disabledStateEventBusEvent.getDisabledDataSourceNames());
dataSource = new MasterSlaveDataSource(newDataSourceMap, dataSource.getMasterSlaveRule(), new LinkedHashMap<String, Object>(), dataSource.getShardingProperties());
}
private Map<String, DataSource> getAvailableDataSourceMap(final Collection<String> disabledDataSourceNames) {
Map<String, DataSource> result = new LinkedHashMap<>(dataSourceMap);
for (String each : disabledDataSourceNames) {
result.remove(each);
}
return result;
}
/**
/**
* Renew circuit breaker dataSource names.
*
* @param circuitStateEventBusEvent jdbc circuit event bus event
*/
@Subscribe
public void renew(final CircuitStateEventBusEvent circuitStateEventBusEvent) {
isCircuitBreak = circuitStateEventBusEvent.isCircuitBreak();
}
}
......@@ -17,15 +17,23 @@
package io.shardingsphere.jdbc.orchestration.internal;
import io.shardingsphere.core.api.config.ShardingRuleConfiguration;
import com.google.common.eventbus.Subscribe;
import io.shardingsphere.core.api.ConfigMapContext;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.jdbc.adapter.AbstractDataSourceAdapter;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.jdbc.orchestration.internal.event.config.ShardingConfigurationEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.state.CircuitStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.state.DisabledStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSource;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
/**
* Orchestration sharding datasource.
......@@ -33,38 +41,77 @@ import java.util.Properties;
* @author caohao
*/
@Slf4j
public final class OrchestrationShardingDataSource extends ShardingDataSource {
public final class OrchestrationShardingDataSource extends AbstractDataSourceAdapter implements AutoCloseable {
private final OrchestrationFacade orchestrationFacade;
private final Map<String, DataSource> dataSourceMap;
private ShardingDataSource dataSource;
private final ShardingRuleConfiguration shardingRuleConfig;
private final OrchestrationFacade orchestrationFacade;
private final Map<String, Object> configMap;
private Map<String, DataSource> dataSourceMap;
private final Properties props;
private boolean isCircuitBreak;
public OrchestrationShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig,
final Map<String, Object> configMap, final Properties props, final OrchestrationFacade orchestrationFacade) throws SQLException {
super(getRawDataSourceMap(dataSourceMap), new ShardingRule(getShardingRuleConfiguration(dataSourceMap, shardingRuleConfig), getRawDataSourceMap(dataSourceMap).keySet()), configMap, props);
public OrchestrationShardingDataSource(final ShardingDataSource shardingDataSource, final OrchestrationFacade orchestrationFacade) throws SQLException {
super(shardingDataSource.getDataSourceMap().values());
this.dataSource = shardingDataSource;
this.orchestrationFacade = orchestrationFacade;
this.dataSourceMap = getRawDataSourceMap(dataSourceMap);
this.shardingRuleConfig = getShardingRuleConfiguration(dataSourceMap, shardingRuleConfig);
this.configMap = configMap;
this.props = props;
this.orchestrationFacade.init(shardingDataSource.getDataSourceMap(), shardingDataSource.getShardingContext().getShardingRule().getShardingRuleConfig(), ConfigMapContext.getInstance().getShardingConfig(), shardingDataSource.getShardingProperties().getProps());
this.dataSourceMap = shardingDataSource.getDataSourceMap();
ShardingEventBusInstance.getInstance().register(this);
}
/**
* Initialize for sharding orchestration.
*/
public void init() {
orchestrationFacade.init(dataSourceMap, shardingRuleConfig, configMap, props);
@Override
public Connection getConnection() {
if (isCircuitBreak) {
return new CircuitBreakerDataSource().getConnection();
}
return dataSource.getConnection();
}
@Override
public void close() {
super.close();
dataSource.close();
orchestrationFacade.close();
}
/**
* Renew sharding data source.
*
* @param shardingEvent sharding configuration event bus event.
* @throws SQLException sql exception
*/
@Subscribe
public void renew(final ShardingConfigurationEventBusEvent shardingEvent) throws SQLException {
super.renew(shardingEvent.getDataSourceMap().values());
dataSource = new ShardingDataSource(shardingEvent.getDataSourceMap(), shardingEvent.getShardingRule(), new LinkedHashMap<String, Object>(), shardingEvent.getProps());
}
/**
* Renew disable dataSource names.
*
* @param disabledStateEventBusEvent jdbc disabled event bus event
*/
@Subscribe
public void renew(final DisabledStateEventBusEvent disabledStateEventBusEvent) {
Map<String, DataSource> newDataSourceMap = getAvailableDataSourceMap(disabledStateEventBusEvent.getDisabledDataSourceNames());
dataSource = new ShardingDataSource(newDataSourceMap, dataSource.getShardingContext(), dataSource.getShardingProperties(), dataSource.getDatabaseType());
}
private Map<String, DataSource> getAvailableDataSourceMap(final Collection<String> disabledDataSourceNames) {
Map<String, DataSource> result = new LinkedHashMap<>(dataSourceMap);
for (String each : disabledDataSourceNames) {
result.remove(each);
}
return result;
}
/**
* Renew circuit breaker dataSource names.
*
* @param circuitStateEventBusEvent jdbc circuit event bus event
*/
@Subscribe
public void renew(final CircuitStateEventBusEvent circuitStateEventBusEvent) {
isCircuitBreak = circuitStateEventBusEvent.isCircuitBreak();
}
}
......@@ -18,9 +18,9 @@
package io.shardingsphere.jdbc.orchestration.internal.config;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.event.orche.config.MasterSlaveConfigurationEventBusEvent;
import io.shardingsphere.core.event.orche.config.ShardingConfigurationEventBusEvent;
import io.shardingsphere.core.event.orche.config.ProxyConfigurationEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.config.MasterSlaveConfigurationEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.config.ShardingConfigurationEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.config.ProxyConfigurationEventBusEvent;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.jdbc.orchestration.internal.listener.ListenerManager;
import io.shardingsphere.jdbc.orchestration.internal.state.datasource.DataSourceService;
......@@ -56,12 +56,12 @@ public final class ConfigurationListenerManager implements ListenerManager {
@Override
public void watchSharding() {
shardingStart(ConfigurationNode.DATA_SOURCE_NODE_PATH);
shardingStart(ConfigurationNode.SHARDING_RULE_NODE_PATH);
shardingStart(ConfigurationNode.SHARDING_PROPS_NODE_PATH);
watchSharding(ConfigurationNode.DATA_SOURCE_NODE_PATH);
watchSharding(ConfigurationNode.SHARDING_RULE_NODE_PATH);
watchSharding(ConfigurationNode.SHARDING_PROPS_NODE_PATH);
}
private void shardingStart(final String node) {
private void watchSharding(final String node) {
String cachePath = configNode.getFullPath(node);
regCenter.watch(cachePath, new EventListener() {
......@@ -79,11 +79,12 @@ public final class ConfigurationListenerManager implements ListenerManager {
@Override
public void watchMasterSlave() {
masterSlaveStart(ConfigurationNode.DATA_SOURCE_NODE_PATH);
masterSlaveStart(ConfigurationNode.MASTER_SLAVE_RULE_NODE_PATH);
watchMasterSlave(ConfigurationNode.DATA_SOURCE_NODE_PATH);
watchMasterSlave(ConfigurationNode.MASTER_SLAVE_RULE_NODE_PATH);
watchMasterSlave(ConfigurationNode.MASTER_SLAVE_PROPS_NODE_PATH);
}
private void masterSlaveStart(final String node) {
private void watchMasterSlave(final String node) {
String cachePath = configNode.getFullPath(node);
regCenter.watch(cachePath, new EventListener() {
......@@ -91,7 +92,7 @@ public final class ConfigurationListenerManager implements ListenerManager {
public void onChange(final DataChangedEvent event) {
if (DataChangedEvent.Type.UPDATED == event.getEventType()) {
MasterSlaveConfigurationEventBusEvent masterSlaveEvent = new MasterSlaveConfigurationEventBusEvent(dataSourceService.getAvailableDataSources(),
dataSourceService.getAvailableMasterSlaveRuleConfiguration());
dataSourceService.getAvailableMasterSlaveRuleConfiguration(), configService.loadMasterSlaveProperties());
ShardingEventBusInstance.getInstance().post(masterSlaveEvent);
}
}
......@@ -100,11 +101,11 @@ public final class ConfigurationListenerManager implements ListenerManager {
@Override
public void watchProxy() {
proxyStart(ConfigurationNode.DATA_SOURCE_NODE_PATH);
proxyStart(ConfigurationNode.PROXY_RULE_NODE_PATH);
watchProxy(ConfigurationNode.DATA_SOURCE_NODE_PATH);
watchProxy(ConfigurationNode.PROXY_RULE_NODE_PATH);
}
private void proxyStart(final String node) {
private void watchProxy(final String node) {
String cachePath = configNode.getFullPath(node);
regCenter.watch(cachePath, new EventListener() {
......
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.core.event.orche.config;
package io.shardingsphere.jdbc.orchestration.internal.event.config;
import io.shardingsphere.core.api.config.MasterSlaveRuleConfiguration;
import lombok.Getter;
......@@ -23,6 +23,7 @@ import lombok.RequiredArgsConstructor;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Properties;
/**
* Master slave config event bus event.
......@@ -36,4 +37,6 @@ public final class MasterSlaveConfigurationEventBusEvent {
private final Map<String, DataSource> dataSourceMap;
private final MasterSlaveRuleConfiguration masterSlaveRuleConfig;
private final Properties props;
}
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.core.event.orche.config;
package io.shardingsphere.jdbc.orchestration.internal.event.config;
import io.shardingsphere.core.api.config.ProxyBasicRule;
import io.shardingsphere.core.rule.DataSourceParameter;
......
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.core.event.orche.config;
package io.shardingsphere.jdbc.orchestration.internal.event.config;
import io.shardingsphere.core.rule.ShardingRule;
import lombok.Getter;
......
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.core.event.orche.state;
package io.shardingsphere.jdbc.orchestration.internal.event.state;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
......
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.core.event.orche.state;
package io.shardingsphere.jdbc.orchestration.internal.event.state;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
......
......@@ -15,12 +15,12 @@
* </p>
*/
package io.shardingsphere.core.orche.connection;
package io.shardingsphere.jdbc.orchestration.internal.jdbc.connection;
import io.shardingsphere.core.jdbc.unsupported.AbstractUnsupportedOperationConnection;
import io.shardingsphere.core.orche.metadata.CircuitBreakerDatabaseMetaData;
import io.shardingsphere.core.orche.statement.CircuitBreakerPreparedStatement;
import io.shardingsphere.core.orche.statement.CircuitBreakerStatement;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.metadata.CircuitBreakerDatabaseMetaData;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.statement.CircuitBreakerPreparedStatement;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.statement.CircuitBreakerStatement;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
......
......@@ -15,10 +15,10 @@
* </p>
*/
package io.shardingsphere.core.orche.datasource;
package io.shardingsphere.jdbc.orchestration.internal.jdbc.datasource;
import io.shardingsphere.core.jdbc.unsupported.AbstractUnsupportedOperationDataSource;
import io.shardingsphere.core.orche.connection.CircuitBreakerConnection;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.connection.CircuitBreakerConnection;
import java.io.PrintWriter;
import java.sql.Connection;
......
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.core.orche.metadata;
package io.shardingsphere.jdbc.orchestration.internal.jdbc.metadata;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
......
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.core.orche.resultset;
package io.shardingsphere.jdbc.orchestration.internal.jdbc.resultset;
import io.shardingsphere.core.jdbc.unsupported.AbstractUnsupportedOperationResultSet;
......
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.core.orche.resultset;
package io.shardingsphere.jdbc.orchestration.internal.jdbc.resultset;
import java.sql.ResultSetMetaData;
......
......@@ -15,11 +15,11 @@
* </p>
*/
package io.shardingsphere.core.orche.statement;
package io.shardingsphere.jdbc.orchestration.internal.jdbc.statement;
import io.shardingsphere.core.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
import io.shardingsphere.core.orche.connection.CircuitBreakerConnection;
import io.shardingsphere.core.orche.resultset.CircuitBreakerResultSet;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.connection.CircuitBreakerConnection;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.resultset.CircuitBreakerResultSet;
import lombok.Getter;
import java.io.InputStream;
......
......@@ -15,10 +15,10 @@
* </p>
*/
package io.shardingsphere.core.orche.statement;
package io.shardingsphere.jdbc.orchestration.internal.jdbc.statement;
import io.shardingsphere.core.jdbc.unsupported.AbstractUnsupportedOperationStatement;
import io.shardingsphere.core.orche.connection.CircuitBreakerConnection;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.connection.CircuitBreakerConnection;
import lombok.Getter;
import java.sql.Connection;
......
......@@ -18,7 +18,7 @@
package io.shardingsphere.jdbc.orchestration.internal.state.datasource;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.event.orche.state.DisabledStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.state.DisabledStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.listener.ListenerManager;
import io.shardingsphere.jdbc.orchestration.internal.state.StateNode;
import io.shardingsphere.jdbc.orchestration.reg.api.RegistryCenter;
......
......@@ -18,8 +18,7 @@
package io.shardingsphere.jdbc.orchestration.internal.state.instance;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.event.orche.state.CircuitStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.config.ConfigurationService;
import io.shardingsphere.jdbc.orchestration.internal.event.state.CircuitStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.listener.ListenerManager;
import io.shardingsphere.jdbc.orchestration.internal.state.StateNode;
import io.shardingsphere.jdbc.orchestration.internal.state.StateNodeStatus;
......
......@@ -18,10 +18,14 @@
package io.shardingsphere.jdbc.orchestration.internal;
import io.shardingsphere.jdbc.orchestration.internal.config.ConfigurationNodeTest;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.AllOrcheTests;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses(ConfigurationNodeTest.class)
@Suite.SuiteClasses({
ConfigurationNodeTest.class,
AllOrcheTests.class
})
public final class AllInternalTests {
}
......@@ -15,11 +15,11 @@
* </p>
*/
package io.shardingsphere.core.orche;
package io.shardingsphere.jdbc.orchestration.internal.jdbc;
import io.shardingsphere.core.orche.connection.CircuitBreakerConnectionTest;
import io.shardingsphere.core.orche.datasource.CircuitBreakerDataSourceTest;
import io.shardingsphere.core.orche.metadata.CircuitBreakerDatabaseMetaDataTest;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.connection.CircuitBreakerConnectionTest;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.datasource.CircuitBreakerDataSourceTest;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.metadata.CircuitBreakerDatabaseMetaDataTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
......
......@@ -15,11 +15,11 @@
* </p>
*/
package io.shardingsphere.core.orche.connection;
package io.shardingsphere.jdbc.orchestration.internal.jdbc.connection;
import io.shardingsphere.core.orche.metadata.CircuitBreakerDatabaseMetaData;
import io.shardingsphere.core.orche.statement.CircuitBreakerPreparedStatement;
import io.shardingsphere.core.orche.statement.CircuitBreakerStatement;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.metadata.CircuitBreakerDatabaseMetaData;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.statement.CircuitBreakerPreparedStatement;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.statement.CircuitBreakerStatement;
import org.junit.Test;
import java.sql.Connection;
......
......@@ -15,9 +15,9 @@
* </p>
*/
package io.shardingsphere.core.orche.datasource;
package io.shardingsphere.jdbc.orchestration.internal.jdbc.datasource;
import io.shardingsphere.core.orche.connection.CircuitBreakerConnection;
import io.shardingsphere.jdbc.orchestration.internal.jdbc.connection.CircuitBreakerConnection;
import org.junit.Test;
import static org.junit.Assert.assertNull;
......
......@@ -55,7 +55,7 @@ public class SpringBootShardingTest {
Field field = ShardingDataSource.class.getDeclaredField("shardingContext");
field.setAccessible(true);
ShardingContext shardingContext = (ShardingContext) field.get(dataSource);
for (DataSource each : shardingContext.getDataSourceMap().values()) {
for (DataSource each : ((ShardingDataSource)dataSource).getDataSourceMap().values()) {
assertThat(((BasicDataSource) each).getMaxTotal(), is(100));
}
assertTrue(shardingContext.isShowSQL());
......
......@@ -36,6 +36,6 @@ public class SpringShardingDataSource extends ShardingDataSource {
public SpringShardingDataSource(final Map<String, DataSource> dataSourceMap,
final ShardingRuleConfiguration shardingRuleConfig, final Map<String, Object> configMap, final Properties props) throws SQLException {
super(getRawDataSourceMap(dataSourceMap), new ShardingRule(getShardingRuleConfiguration(dataSourceMap, shardingRuleConfig), dataSourceMap.keySet()), configMap, props);
super(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), configMap, props);
}
}
......@@ -90,8 +90,7 @@ public class MasterSlaveNamespaceTest extends AbstractJUnit4SpringContextTests {
@SuppressWarnings("unchecked")
private Map<String, DataSource> getDataSourceMap(final String shardingDataSourceName) {
ShardingDataSource shardingDataSource = this.applicationContext.getBean(shardingDataSourceName, ShardingDataSource.class);
Object shardingContext = FieldValueUtil.getFieldValue(shardingDataSource, "shardingContext", true);
return (Map) FieldValueUtil.getFieldValue(shardingContext, "dataSourceMap");
return shardingDataSource.getDataSourceMap();
}
private ShardingRule getShardingRule(final String shardingDataSourceName) {
......
......@@ -216,8 +216,7 @@ public class ShardingNamespaceTest extends AbstractJUnit4SpringContextTests {
@SuppressWarnings("unchecked")
private Map<String, DataSource> getDataSourceMap(final String shardingDataSourceName) {
ShardingDataSource shardingDataSource = this.applicationContext.getBean(shardingDataSourceName, ShardingDataSource.class);
Object shardingContext = FieldValueUtil.getFieldValue(shardingDataSource, "shardingContext", true);
return (Map) FieldValueUtil.getFieldValue(shardingContext, "dataSourceMap");
return shardingDataSource.getDataSourceMap();
}
private ShardingRule getShardingRule(final String shardingDataSourceName) {
......
......@@ -19,7 +19,6 @@ package io.shardingsphere.core.jdbc.adapter;
import com.google.common.base.Preconditions;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.jdbc.unsupported.AbstractUnsupportedOperationDataSource;
import io.shardingsphere.core.listener.JDBCListenerRegister;
......@@ -51,7 +50,10 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
public AbstractDataSourceAdapter(final Collection<DataSource> dataSources) throws SQLException {
databaseType = getDatabaseType(dataSources);
ShardingEventBusInstance.getInstance().register(this);
}
public AbstractDataSourceAdapter(final DatabaseType databaseType) {
this.databaseType = databaseType;
}
protected final DatabaseType getDatabaseType(final Collection<DataSource> dataSources) throws SQLException {
......
......@@ -17,27 +17,19 @@
package io.shardingsphere.core.jdbc.core;
import com.google.common.eventbus.Subscribe;
import io.shardingsphere.core.constant.ConnectionMode;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.event.orche.state.CircuitStateEventBusEvent;
import io.shardingsphere.core.event.orche.state.DisabledStateEventBusEvent;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.jdbc.metadata.JDBCTableMetaDataConnectionManager;
import io.shardingsphere.core.metadata.ShardingMetaData;
import io.shardingsphere.core.orche.datasource.CircuitBreakerDataSource;
import io.shardingsphere.core.rule.ShardingRule;
import lombok.AccessLevel;
import lombok.Getter;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
......@@ -50,8 +42,6 @@ import java.util.Map.Entry;
@Getter
public final class ShardingContext implements AutoCloseable {
private Map<String, DataSource> dataSourceMap;
private ShardingRule shardingRule;
private DatabaseType databaseType;
......@@ -64,63 +54,19 @@ public final class ShardingContext implements AutoCloseable {
private boolean showSQL;
@Getter(AccessLevel.NONE)
private Collection<String> disabledDataSourceNames = new LinkedList<>();
@Getter(AccessLevel.NONE)
private boolean isCircuitBreak;
public ShardingContext(final Map<String, DataSource> dataSourceMap,
final ShardingRule shardingRule, final DatabaseType databaseType, final ShardingExecuteEngine executeEngine, final ConnectionMode connectionMode, final boolean showSQL) {
init(dataSourceMap, shardingRule, databaseType, executeEngine, connectionMode, showSQL);
ShardingEventBusInstance.getInstance().register(this);
}
private void init(final Map<String, DataSource> dataSourceMap,
final ShardingRule shardingRule, final DatabaseType databaseType, final ShardingExecuteEngine executeEngine, final ConnectionMode connectionMode, final boolean showSQL) {
this.dataSourceMap = dataSourceMap;
this.shardingRule = shardingRule;
this.executeEngine = executeEngine;
this.databaseType = databaseType;
this.connectionMode = connectionMode;
this.showSQL = showSQL;
metaData = new ShardingMetaData(getDataSourceURLs(getDataSourceMap()), shardingRule, databaseType, executeEngine, new JDBCTableMetaDataConnectionManager(getDataSourceMap()));
}
/**
* Renew sharding context.
*
* @param dataSourceMap data source map
* @param shardingRule sharding rule
* @param databaseType data type
* @param executeEngine sharding executor engine
* @param connectionMode connection mode
* @param showSQL show sql
*/
public void renew(final Map<String, DataSource> dataSourceMap,
final ShardingRule shardingRule, final DatabaseType databaseType, final ShardingExecuteEngine executeEngine, final ConnectionMode connectionMode, final boolean showSQL) {
close();
init(dataSourceMap, shardingRule, databaseType, executeEngine, connectionMode, showSQL);
}
/**
* Renew disable dataSource names.
*
* @param disabledStateEventBusEvent jdbc disabled event bus event
*/
@Subscribe
public void renewDisabledDataSourceNames(final DisabledStateEventBusEvent disabledStateEventBusEvent) {
disabledDataSourceNames = disabledStateEventBusEvent.getDisabledDataSourceNames();
}
/**
* Renew circuit breaker dataSource names.
*
* @param circuitStateEventBusEvent jdbc disabled event bus event
*/
@Subscribe
public void renewCircuitBreakerDataSourceNames(final CircuitStateEventBusEvent circuitStateEventBusEvent) {
isCircuitBreak = circuitStateEventBusEvent.isCircuitBreak();
metaData = new ShardingMetaData(getDataSourceURLs(dataSourceMap), shardingRule, databaseType, executeEngine, new JDBCTableMetaDataConnectionManager(dataSourceMap));
}
private static Map<String, String> getDataSourceURLs(final Map<String, DataSource> dataSourceMap) {
......@@ -139,49 +85,8 @@ public final class ShardingContext implements AutoCloseable {
}
}
/**
* Get available data source map.
*
* @return available data source map
*/
public Map<String, DataSource> getDataSourceMap() {
if (isCircuitBreak) {
return getCircuitBreakerDataSourceMap();
}
if (!disabledDataSourceNames.isEmpty()) {
return getAvailableDataSourceMap();
}
return dataSourceMap;
}
private Map<String, DataSource> getAvailableDataSourceMap() {
Map<String, DataSource> result = new LinkedHashMap<>(dataSourceMap);
for (String each : disabledDataSourceNames) {
result.remove(each);
}
return result;
}
private Map<String, DataSource> getCircuitBreakerDataSourceMap() {
Map<String, DataSource> result = new LinkedHashMap<>();
for (String each : dataSourceMap.keySet()) {
result.put(each, new CircuitBreakerDataSource());
}
return result;
}
@Override
public void close() {
closeOriginalDataSources();
executeEngine.close();
}
private void closeOriginalDataSources() {
for (DataSource each : dataSourceMap.values()) {
try {
each.getClass().getDeclaredMethod("close").invoke(each);
} catch (final ReflectiveOperationException ignored) {
}
}
}
}
......@@ -18,7 +18,7 @@
package io.shardingsphere.core.jdbc.core.connection;
import io.shardingsphere.core.jdbc.adapter.AbstractConnectionAdapter;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.core.jdbc.core.statement.ShardingPreparedStatement;
import io.shardingsphere.core.jdbc.core.statement.ShardingStatement;
import io.shardingsphere.core.rule.MasterSlaveRule;
......@@ -45,7 +45,7 @@ import java.util.Map;
public final class ShardingConnection extends AbstractConnectionAdapter {
@Getter
private final ShardingContext shardingContext;
private final ShardingDataSource shardingDataSource;
/**
* Release connection.
......@@ -62,14 +62,14 @@ public final class ShardingConnection extends AbstractConnectionAdapter {
@Override
protected Map<String, DataSource> getDataSourceMap() {
return shardingContext.getDataSourceMap();
return shardingDataSource.getDataSourceMap();
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
Collection<MasterSlaveRule> masterSlaveRules = shardingContext.getShardingRule().getMasterSlaveRules();
Collection<MasterSlaveRule> masterSlaveRules = shardingDataSource.getShardingContext().getShardingRule().getMasterSlaveRules();
if (masterSlaveRules.isEmpty()) {
return getConnection(shardingContext.getDataSourceMap().keySet().iterator().next()).getMetaData();
return getConnection(shardingDataSource.getDataSourceMap().keySet().iterator().next()).getMetaData();
}
for (MasterSlaveRule each : masterSlaveRules) {
if (getDataSourceMap().containsKey(each.getMasterDataSourceName())) {
......
......@@ -17,17 +17,12 @@
package io.shardingsphere.core.jdbc.core.datasource;
import com.google.common.eventbus.Subscribe;
import io.shardingsphere.core.api.ConfigMapContext;
import io.shardingsphere.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingsphere.core.constant.properties.ShardingProperties;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.event.orche.config.MasterSlaveConfigurationEventBusEvent;
import io.shardingsphere.core.event.orche.state.CircuitStateEventBusEvent;
import io.shardingsphere.core.event.orche.state.DisabledStateEventBusEvent;
import io.shardingsphere.core.jdbc.adapter.AbstractDataSourceAdapter;
import io.shardingsphere.core.jdbc.core.connection.MasterSlaveConnection;
import io.shardingsphere.core.orche.datasource.CircuitBreakerDataSource;
import io.shardingsphere.core.rule.MasterSlaveRule;
import lombok.Getter;
......@@ -37,7 +32,6 @@ import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
......@@ -51,15 +45,11 @@ import java.util.Properties;
@Getter
public class MasterSlaveDataSource extends AbstractDataSourceAdapter implements AutoCloseable {
private Map<String, DataSource> dataSourceMap;
private final Map<String, DataSource> dataSourceMap;
private MasterSlaveRule masterSlaveRule;
private final MasterSlaveRule masterSlaveRule;
private ShardingProperties shardingProperties;
private Collection<String> disabledDataSourceNames = new LinkedList<>();
private boolean isCircuitBreak;
private final ShardingProperties shardingProperties;
public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig,
final Map<String, Object> configMap, final Properties props) throws SQLException {
......@@ -67,13 +57,20 @@ public class MasterSlaveDataSource extends AbstractDataSourceAdapter implements
if (!configMap.isEmpty()) {
ConfigMapContext.getInstance().getMasterSlaveConfig().putAll(configMap);
}
this.dataSourceMap = dataSourceMap;
this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
init(dataSourceMap, masterSlaveRuleConfig);
}
private void init(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig) {
public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRule masterSlaveRule,
final Map<String, Object> configMap, final ShardingProperties props) throws SQLException {
super(getAllDataSources(dataSourceMap, masterSlaveRule.getMasterDataSourceName(), masterSlaveRule.getSlaveDataSourceNames()));
if (!configMap.isEmpty()) {
ConfigMapContext.getInstance().getMasterSlaveConfig().putAll(configMap);
}
this.dataSourceMap = dataSourceMap;
this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
this.masterSlaveRule = masterSlaveRule;
this.shardingProperties = props;
}
private static Collection<DataSource> getAllDataSources(final Map<String, DataSource> dataSourceMap, final String masterDataSourceName, final Collection<String> slaveDataSourceNames) {
......@@ -99,19 +96,6 @@ public class MasterSlaveDataSource extends AbstractDataSourceAdapter implements
return result;
}
/**
* Renew master-slave data source.
*
* @param masterSlaveEvent master slave configuration event bus event
*/
@Subscribe
public void renew(final MasterSlaveConfigurationEventBusEvent masterSlaveEvent) {
MasterSlaveRuleConfiguration masterSlaveRuleConfig = masterSlaveEvent.getMasterSlaveRuleConfig();
super.renew(getAllDataSources(dataSourceMap, masterSlaveRuleConfig.getMasterDataSourceName(), masterSlaveRuleConfig.getSlaveDataSourceNames()));
closeOriginalDataSources();
init(dataSourceMap, masterSlaveRuleConfig);
}
private void closeOriginalDataSources() {
for (DataSource each : getDataSourceMap().values()) {
try {
......@@ -140,57 +124,5 @@ public class MasterSlaveDataSource extends AbstractDataSourceAdapter implements
public boolean showSQL() {
return shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
}
/**
* Get available data source map.
*
* @return available data source map
*/
public Map<String, DataSource> getDataSourceMap() {
if (isCircuitBreak) {
return getCircuitBreakerDataSourceMap();
}
if (!getDisabledDataSourceNames().isEmpty()) {
return getAvailableDataSourceMap();
}
return dataSourceMap;
}
private Map<String, DataSource> getAvailableDataSourceMap() {
Map<String, DataSource> result = new LinkedHashMap<>(dataSourceMap);
for (String each : getDisabledDataSourceNames()) {
result.remove(each);
}
return result;
}
private Map<String, DataSource> getCircuitBreakerDataSourceMap() {
Map<String, DataSource> result = new LinkedHashMap<>();
for (String each : dataSourceMap.keySet()) {
result.put(each, new CircuitBreakerDataSource());
}
return result;
}
/**
* Renew disable dataSource names.
*
* @param disabledStateEventBusEvent jdbc disabled event bus event
*/
@Subscribe
public void renewDisabledDataSourceNames(final DisabledStateEventBusEvent disabledStateEventBusEvent) {
disabledDataSourceNames = disabledStateEventBusEvent.getDisabledDataSourceNames();
}
/**
* Renew circuit breaker dataSource names.
*
* @param circuitStateEventBusEvent jdbc circuit event bus event
*/
@Subscribe
public void renewCircuitBreakerDataSourceNames(final CircuitStateEventBusEvent circuitStateEventBusEvent) {
isCircuitBreak = circuitStateEventBusEvent.isCircuitBreak();
}
}
......@@ -17,14 +17,13 @@
package io.shardingsphere.core.jdbc.core.datasource;
import com.google.common.eventbus.Subscribe;
import io.shardingsphere.core.api.ConfigMapContext;
import io.shardingsphere.core.api.config.MasterSlaveRuleConfiguration;
import io.shardingsphere.core.api.config.ShardingRuleConfiguration;
import io.shardingsphere.core.constant.ConnectionMode;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.constant.properties.ShardingProperties;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.event.orche.config.ShardingConfigurationEventBusEvent;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.jdbc.adapter.AbstractDataSourceAdapter;
import io.shardingsphere.core.jdbc.core.ShardingContext;
......@@ -35,11 +34,8 @@ import lombok.Getter;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
......@@ -50,12 +46,14 @@ import java.util.concurrent.ConcurrentHashMap;
* @author zhaojun
* @author panjuan
*/
@Getter
public class ShardingDataSource extends AbstractDataSourceAdapter implements AutoCloseable {
@Getter
private ShardingProperties shardingProperties;
private ShardingContext shardingContext;
private final Map<String, DataSource> dataSourceMap;
private final ShardingContext shardingContext;
private final ShardingProperties shardingProperties;
public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule) throws SQLException {
this(dataSourceMap, shardingRule, new ConcurrentHashMap<String, Object>(), new Properties());
......@@ -66,8 +64,16 @@ public class ShardingDataSource extends AbstractDataSourceAdapter implements Aut
if (!configMap.isEmpty()) {
ConfigMapContext.getInstance().getShardingConfig().putAll(configMap);
}
shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
shardingContext = getShardingContext(dataSourceMap, shardingRule);
this.dataSourceMap = getRawDataSourceMap(dataSourceMap);
this.shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
this.shardingContext = getShardingContext(getRawDataSourceMap(dataSourceMap), getRevisedShardingRule(dataSourceMap, shardingRule));
}
public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingContext shardingContext, final ShardingProperties shardingProperties, final DatabaseType databaseType) {
super(databaseType);
this.dataSourceMap = getRawDataSourceMap(dataSourceMap);
this.shardingContext = shardingContext;
this.shardingProperties = shardingProperties;
}
private ShardingContext getShardingContext(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule) {
......@@ -78,38 +84,12 @@ public class ShardingDataSource extends AbstractDataSourceAdapter implements Aut
return new ShardingContext(dataSourceMap, shardingRule, getDatabaseType(), executeEngine, connectionMode, showSQL);
}
/**
* Renew sharding data source.
*
* @param shardingEvent sharding configuration event bus event.
*/
@Subscribe
public void renew(final ShardingConfigurationEventBusEvent shardingEvent) {
super.renew(shardingEvent.getDataSourceMap().values());
shardingProperties = new ShardingProperties(null == shardingEvent.getProps() ? new Properties() : shardingEvent.getProps());
int newExecutorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
boolean newShowSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
ShardingExecuteEngine newExecuteEngine = new ShardingExecuteEngine(newExecutorSize);
ConnectionMode newConnectionMode = ConnectionMode.valueOf(shardingProperties.<String>getValue(ShardingPropertiesConstant.CONNECTION_MODE));
shardingContext.renew(shardingEvent.getDataSourceMap(), shardingEvent.getShardingRule(), getDatabaseType(), newExecuteEngine, newConnectionMode, newShowSQL);
}
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(shardingContext);
}
@Override
public void close() {
shardingContext.close();
}
protected static Map<String, DataSource> getRawDataSourceMap(final Map<String, DataSource> dataSourceMap) {
private Map<String, DataSource> getRawDataSourceMap(final Map<String, DataSource> dataSourceMap) {
Map<String, DataSource> result = new LinkedHashMap<>();
if (null == dataSourceMap) {
return result;
}
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
String dataSourceName = entry.getKey();
DataSource dataSource = entry.getValue();
if (dataSource instanceof MasterSlaveDataSource) {
......@@ -121,20 +101,39 @@ public class ShardingDataSource extends AbstractDataSourceAdapter implements Aut
return result;
}
protected static ShardingRuleConfiguration getShardingRuleConfiguration(final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig) {
Collection<MasterSlaveRuleConfiguration> masterSlaveRuleConfigs = new LinkedList<>();
if (null == dataSourceMap || !shardingRuleConfig.getMasterSlaveRuleConfigs().isEmpty()) {
return shardingRuleConfig;
private ShardingRule getRevisedShardingRule(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule) {
if (null == dataSourceMap || !shardingRule.getMasterSlaveRules().isEmpty()) {
return shardingRule;
}
ShardingRuleConfiguration shardingRuleConfiguration = shardingRule.getShardingRuleConfig();
for (DataSource each : dataSourceMap.values()) {
if (!(each instanceof MasterSlaveDataSource)) {
continue;
}
MasterSlaveRule masterSlaveRule = ((MasterSlaveDataSource) each).getMasterSlaveRule();
masterSlaveRuleConfigs.add(new MasterSlaveRuleConfiguration(
shardingRuleConfiguration.getMasterSlaveRuleConfigs().add(new MasterSlaveRuleConfiguration(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), masterSlaveRule.getSlaveDataSourceNames(), masterSlaveRule.getLoadBalanceAlgorithm()));
}
shardingRuleConfig.setMasterSlaveRuleConfigs(masterSlaveRuleConfigs);
return shardingRuleConfig;
return new ShardingRule(shardingRuleConfiguration, dataSourceMap.keySet());
}
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(this);
}
@Override
public void close() {
closeOriginalDataSources();
shardingContext.close();
}
private void closeOriginalDataSources() {
for (DataSource each : dataSourceMap.values()) {
try {
each.getClass().getDeclaredMethod("close").invoke(each);
} catch (final ReflectiveOperationException ignored) {
}
}
}
}
......@@ -126,7 +126,7 @@ public final class ShardingPreparedStatement extends AbstractShardingPreparedSta
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
this.sql = sql;
ShardingContext shardingContext = connection.getShardingContext();
ShardingContext shardingContext = connection.getShardingDataSource().getShardingContext();
routingEngine = new PreparedStatementRoutingEngine(sql, shardingContext.getShardingRule(),
shardingContext.getMetaData().getTable(), shardingContext.getDatabaseType(), shardingContext.isShowSQL(), shardingContext.getMetaData().getDataSource());
}
......@@ -139,7 +139,7 @@ public final class ShardingPreparedStatement extends AbstractShardingPreparedSta
sqlRoute();
List<ResultSet> resultSets = getPreparedStatementExecutor().executeQuery();
MergeEngine mergeEngine = MergeEngineFactory.newInstance(
connection.getShardingContext().getShardingRule(), getQueryResults(resultSets), routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable());
connection.getShardingDataSource().getShardingContext().getShardingRule(), getQueryResults(resultSets), routeResult.getSqlStatement(), connection.getShardingDataSource().getShardingContext().getMetaData().getTable());
result = new ShardingResultSet(resultSets, merge(mergeEngine), this);
} finally {
clearBatch();
......@@ -151,7 +151,7 @@ public final class ShardingPreparedStatement extends AbstractShardingPreparedSta
private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
List<QueryResult> result = new ArrayList<>(resultSets.size());
for (ResultSet each : resultSets) {
if (ConnectionMode.MEMORY_STRICTLY == connection.getShardingContext().getConnectionMode()) {
if (ConnectionMode.MEMORY_STRICTLY == connection.getShardingDataSource().getShardingContext().getConnectionMode()) {
result.add(new StreamQueryResult(each));
} else {
result.add(new MemoryQueryResult(each));
......@@ -188,9 +188,9 @@ public final class ShardingPreparedStatement extends AbstractShardingPreparedSta
private void refreshTableMetaData() throws SQLException {
if (null != routeResult && null != connection && SQLType.DDL == routeResult.getSqlStatement().getType() && !routeResult.getSqlStatement().getTables().isEmpty()) {
String logicTableName = routeResult.getSqlStatement().getTables().getSingleTableName();
TableMetaDataLoader tableMetaDataLoader = new TableMetaDataLoader(connection.getShardingContext().getMetaData().getDataSource(),
connection.getShardingContext().getExecuteEngine(), new JDBCTableMetaDataConnectionManager(connection.getShardingContext().getDataSourceMap()));
connection.getShardingContext().getMetaData().getTable().put(logicTableName, tableMetaDataLoader.load(logicTableName, connection.getShardingContext().getShardingRule()));
TableMetaDataLoader tableMetaDataLoader = new TableMetaDataLoader(connection.getShardingDataSource().getShardingContext().getMetaData().getDataSource(),
connection.getShardingDataSource().getShardingContext().getExecuteEngine(), new JDBCTableMetaDataConnectionManager(connection.getShardingDataSource().getDataSourceMap()));
connection.getShardingDataSource().getShardingContext().getMetaData().getTable().put(logicTableName, tableMetaDataLoader.load(logicTableName, connection.getShardingDataSource().getShardingContext().getShardingRule()));
}
}
......@@ -244,8 +244,8 @@ public final class ShardingPreparedStatement extends AbstractShardingPreparedSta
}
private PreparedStatementExecutor getPreparedStatementExecutor() throws SQLException {
ConnectionMode connectionMode = connection.getShardingContext().getConnectionMode();
SQLExecuteTemplate sqlExecuteTemplate = new SQLExecuteTemplate(connection.getShardingContext().getExecuteEngine(), connectionMode);
ConnectionMode connectionMode = connection.getShardingDataSource().getShardingContext().getConnectionMode();
SQLExecuteTemplate sqlExecuteTemplate = new SQLExecuteTemplate(connection.getShardingDataSource().getShardingContext().getExecuteEngine(), connectionMode);
Collection<PreparedStatementUnit> executeUnits = ConnectionMode.MEMORY_STRICTLY == connectionMode ? getExecuteUnitsForMemoryStrictly() : getExecuteUnitsForConnectionStrictly();
return new PreparedStatementExecutor(sqlExecuteTemplate, routeResult.getSqlStatement().getType(), executeUnits);
}
......@@ -302,8 +302,8 @@ public final class ShardingPreparedStatement extends AbstractShardingPreparedSta
@Override
public int[] executeBatch() throws SQLException {
try {
return new BatchPreparedStatementExecutor(new SQLExecuteTemplate(connection.getShardingContext().getExecuteEngine(), connection.getShardingContext().getConnectionMode()),
connection.getShardingContext().getDatabaseType(), routeResult.getSqlStatement().getType(), batchStatementUnits, batchCount).executeBatch();
return new BatchPreparedStatementExecutor(new SQLExecuteTemplate(connection.getShardingDataSource().getShardingContext().getExecuteEngine(), connection.getShardingDataSource().getShardingContext().getConnectionMode()),
connection.getShardingDataSource().getShardingContext().getDatabaseType(), routeResult.getSqlStatement().getType(), batchStatementUnits, batchCount).executeBatch();
} finally {
clearBatch();
}
......@@ -346,7 +346,7 @@ public final class ShardingPreparedStatement extends AbstractShardingPreparedSta
}
if (routeResult.getSqlStatement() instanceof SelectStatement || routeResult.getSqlStatement() instanceof DALStatement) {
MergeEngine mergeEngine = MergeEngineFactory.newInstance(
connection.getShardingContext().getShardingRule(), queryResults, routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable());
connection.getShardingDataSource().getShardingContext().getShardingRule(), queryResults, routeResult.getSqlStatement(), connection.getShardingDataSource().getShardingContext().getMetaData().getTable());
currentResultSet = new ShardingResultSet(resultSets, merge(mergeEngine), this);
}
return currentResultSet;
......
......@@ -116,7 +116,7 @@ public final class ShardingStatement extends AbstractStatementAdapter {
sqlRoute(sql);
List<ResultSet> resultSets = getStatementExecutor().executeQuery();
MergeEngine mergeEngine = MergeEngineFactory.newInstance(
connection.getShardingContext().getShardingRule(), getQueryResults(resultSets), routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable());
connection.getShardingDataSource().getShardingContext().getShardingRule(), getQueryResults(resultSets), routeResult.getSqlStatement(), connection.getShardingDataSource().getShardingContext().getMetaData().getTable());
result = new ShardingResultSet(resultSets, merge(mergeEngine), this);
} finally {
currentResultSet = null;
......@@ -128,7 +128,7 @@ public final class ShardingStatement extends AbstractStatementAdapter {
private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
List<QueryResult> result = new ArrayList<>(resultSets.size());
for (ResultSet each : resultSets) {
if (ConnectionMode.MEMORY_STRICTLY == connection.getShardingContext().getConnectionMode()) {
if (ConnectionMode.MEMORY_STRICTLY == connection.getShardingDataSource().getShardingContext().getConnectionMode()) {
result.add(new StreamQueryResult(each));
} else {
result.add(new MemoryQueryResult(each));
......@@ -238,8 +238,8 @@ public final class ShardingStatement extends AbstractStatementAdapter {
}
private StatementExecutor getStatementExecutor() throws SQLException {
ConnectionMode connectionMode = connection.getShardingContext().getConnectionMode();
SQLExecuteTemplate sqlExecuteTemplate = new SQLExecuteTemplate(connection.getShardingContext().getExecuteEngine(), connectionMode);
ConnectionMode connectionMode = connection.getShardingDataSource().getShardingContext().getConnectionMode();
SQLExecuteTemplate sqlExecuteTemplate = new SQLExecuteTemplate(connection.getShardingDataSource().getShardingContext().getExecuteEngine(), connectionMode);
Collection<StatementUnit> executeUnits = ConnectionMode.MEMORY_STRICTLY == connectionMode ? getExecuteUnitsForMemoryStrictly() : getExecuteUnitsForConnectionStrictly();
return new StatementExecutor(sqlExecuteTemplate, routeResult.getSqlStatement().getType(), executeUnits);
}
......@@ -279,7 +279,7 @@ public final class ShardingStatement extends AbstractStatementAdapter {
}
private void sqlRoute(final String sql) {
ShardingContext shardingContext = connection.getShardingContext();
ShardingContext shardingContext = connection.getShardingDataSource().getShardingContext();
RoutingEvent event = new RoutingEvent(sql);
ShardingEventBusInstance.getInstance().post(event);
try {
......@@ -300,9 +300,9 @@ public final class ShardingStatement extends AbstractStatementAdapter {
private void refreshTableMetaData() throws SQLException {
if (null != routeResult && null != connection && SQLType.DDL == routeResult.getSqlStatement().getType() && !routeResult.getSqlStatement().getTables().isEmpty()) {
String logicTableName = routeResult.getSqlStatement().getTables().getSingleTableName();
TableMetaDataLoader tableMetaDataLoader = new TableMetaDataLoader(connection.getShardingContext().getMetaData().getDataSource(),
connection.getShardingContext().getExecuteEngine(), new JDBCTableMetaDataConnectionManager(connection.getShardingContext().getDataSourceMap()));
connection.getShardingContext().getMetaData().getTable().put(logicTableName, tableMetaDataLoader.load(logicTableName, connection.getShardingContext().getShardingRule()));
TableMetaDataLoader tableMetaDataLoader = new TableMetaDataLoader(connection.getShardingDataSource().getShardingContext().getMetaData().getDataSource(),
connection.getShardingDataSource().getShardingContext().getExecuteEngine(), new JDBCTableMetaDataConnectionManager(connection.getShardingDataSource().getDataSourceMap()));
connection.getShardingDataSource().getShardingContext().getMetaData().getTable().put(logicTableName, tableMetaDataLoader.load(logicTableName, connection.getShardingDataSource().getShardingContext().getShardingRule()));
}
}
......@@ -343,7 +343,7 @@ public final class ShardingStatement extends AbstractStatementAdapter {
}
if (routeResult.getSqlStatement() instanceof SelectStatement || routeResult.getSqlStatement() instanceof DALStatement) {
MergeEngine mergeEngine = MergeEngineFactory.newInstance(
connection.getShardingContext().getShardingRule(), queryResults, routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable());
connection.getShardingDataSource().getShardingContext().getShardingRule(), queryResults, routeResult.getSqlStatement(), connection.getShardingDataSource().getShardingContext().getMetaData().getTable());
currentResultSet = new ShardingResultSet(resultSets, merge(mergeEngine), this);
}
return currentResultSet;
......
......@@ -20,7 +20,6 @@ package io.shardingsphere.core;
import io.shardingsphere.core.api.AllApiTests;
import io.shardingsphere.core.executor.AllExecutorTests;
import io.shardingsphere.core.jdbc.AllJDBCTests;
import io.shardingsphere.core.orche.AllOrcheTests;
import io.shardingsphere.core.util.AllUtilTests;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
......@@ -31,8 +30,7 @@ import org.junit.runners.Suite.SuiteClasses;
AllApiTests.class,
AllExecutorTests.class,
AllJDBCTests.class,
AllUtilTests.class,
AllOrcheTests.class
AllUtilTests.class
})
public final class AllUnitTests {
}
......@@ -20,7 +20,6 @@ package io.shardingsphere.core.common.base;
import com.google.common.collect.Sets;
import io.shardingsphere.core.common.env.DatabaseEnvironment;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import org.apache.commons.dbcp2.BasicDataSource;
......@@ -138,8 +137,7 @@ public abstract class AbstractSQLTest {
throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
Field field = shardingDataSource.getClass().getDeclaredField("shardingContext");
field.setAccessible(true);
ShardingContext shardingContext = (ShardingContext) field.get(shardingDataSource);
return shardingContext.getDataSourceMap();
return shardingDataSource.getDataSourceMap();
}
protected final void importDataSet() {
......
......@@ -23,6 +23,7 @@ import io.shardingsphere.core.api.config.TableRuleConfiguration;
import io.shardingsphere.core.fixture.TestDataSource;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.core.rule.ShardingRule;
import org.junit.After;
import org.junit.Before;
......@@ -75,8 +76,9 @@ public final class ShardingConnectionTest {
dataSourceMap.put(DS_NAME, masterSlaveDataSource);
ShardingRule shardingRule = new ShardingRule(shardingRuleConfig, dataSourceMap.keySet());
shardingContext = Mockito.mock(ShardingContext.class);
when(shardingContext.getDataSourceMap()).thenReturn(dataSourceMap);
connection = new ShardingConnection(shardingContext);
ShardingDataSource shardingDataSource = Mockito.mock(ShardingDataSource.class);
when(shardingDataSource.getDataSourceMap()).thenReturn(dataSourceMap);
connection = new ShardingConnection(shardingDataSource);
}
@After
......
......@@ -24,8 +24,6 @@ import io.shardingsphere.core.api.config.ShardingRuleConfiguration;
import io.shardingsphere.core.api.config.TableRuleConfiguration;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.constant.properties.ShardingProperties;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.event.orche.config.ShardingConfigurationEventBusEvent;
import io.shardingsphere.core.rule.ShardingRule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
......@@ -45,7 +43,6 @@ import java.util.Map;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
......@@ -149,56 +146,6 @@ public final class ShardingDataSourceTest {
assertThat(createShardingDataSource(dataSourceMap).getConnection().getConnection("ds"), is(dataSource.getConnection()));
}
@Test
public void assertRenewWithoutChangeExecutorPoolEngine() throws SQLException, NoSuchFieldException, IllegalAccessException {
DataSource originalDataSource = mockDataSource("H2");
Map<String, DataSource> originalDataSourceMap = new HashMap<>(1, 1);
originalDataSourceMap.put("ds", originalDataSource);
ShardingDataSource shardingDataSource = createShardingDataSource(originalDataSourceMap);
ShardingProperties originShardingProperties = getShardingProperties(shardingDataSource);
DataSource newDataSource = mockDataSource("H2");
Map<String, DataSource> newDataSourceMap = new HashMap<>(1, 1);
newDataSourceMap.put("ds", newDataSource);
ShardingConfigurationEventBusEvent shardingEvent = new ShardingConfigurationEventBusEvent(newDataSourceMap, new ShardingRule(createShardingRuleConfig(newDataSourceMap),
newDataSourceMap.keySet()), new Properties());
shardingDataSource.renew(shardingEvent);
assertThat(originShardingProperties, not(getShardingProperties(shardingDataSource)));
}
@Test
public void assertRenewWithChangeExecuteEnginePoolSize() throws SQLException, NoSuchFieldException, IllegalAccessException {
DataSource originalDataSource = mockDataSource("H2");
Map<String, DataSource> originalDataSourceMap = new HashMap<>(1, 1);
originalDataSourceMap.put("ds", originalDataSource);
ShardingDataSource shardingDataSource = createShardingDataSource(originalDataSourceMap);
final ShardingProperties originShardingProperties = getShardingProperties(shardingDataSource);
DataSource newDataSource = mockDataSource("H2");
Map<String, DataSource> newDataSourceMap = new HashMap<>(1, 1);
newDataSourceMap.put("ds", newDataSource);
Properties props = new Properties();
props.setProperty(ShardingPropertiesConstant.EXECUTOR_SIZE.getKey(), "100");
ShardingConfigurationEventBusEvent shardingEvent = new ShardingConfigurationEventBusEvent(newDataSourceMap, new ShardingRule(createShardingRuleConfig(newDataSourceMap),
newDataSourceMap.keySet()), props);
shardingDataSource.renew(shardingEvent);
assertThat(originShardingProperties, not(getShardingProperties(shardingDataSource)));
}
// TODO to be discuss
// @Test(expected = IllegalStateException.class)
@Test
public void assertRenewWithDatabaseTypeChanged() throws SQLException {
DataSource originalDataSource = mockDataSource("H2");
Map<String, DataSource> originalDataSourceMap = new HashMap<>(1, 1);
originalDataSourceMap.put("ds", originalDataSource);
ShardingDataSource shardingDataSource = createShardingDataSource(originalDataSourceMap);
DataSource newDataSource = mockDataSource("MySQL");
Map<String, DataSource> newDataSourceMap = new HashMap<>(1, 1);
newDataSourceMap.put("ds", newDataSource);
ShardingConfigurationEventBusEvent shardingEvent = new ShardingConfigurationEventBusEvent(newDataSourceMap, new ShardingRule(createShardingRuleConfig(newDataSourceMap),
newDataSourceMap.keySet()), new Properties());
shardingDataSource.renew(shardingEvent);
}
private ShardingDataSource createShardingDataSource(final Map<String, DataSource> dataSourceMap) throws SQLException {
return new ShardingDataSource(dataSourceMap, new ShardingRule(createShardingRuleConfig(dataSourceMap), dataSourceMap.keySet()));
}
......
......@@ -208,7 +208,7 @@ public abstract class BaseIntegrateTest {
@After
public void tearDown() {
if (dataSource instanceof ShardingDataSource) {
((ShardingDataSource) dataSource).getConnection().getShardingContext().getExecuteEngine().close();
((ShardingDataSource) dataSource).getShardingContext().getExecuteEngine().close();
}
ParsingResultCache.getInstance().clear();
}
......
......@@ -30,6 +30,7 @@ import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.core.jdbc.core.statement.ShardingPreparedStatement;
import io.shardingsphere.core.jdbc.core.statement.ShardingStatement;
import io.shardingsphere.core.merger.MergeEngine;
......@@ -72,6 +73,8 @@ public final class MergeEventListenerTest {
private MergeEngine mergeEngine;
private ShardingDataSource shardingDataSource;
@BeforeClass
public static void init() {
ShardingTracer.init(TRACER);
......@@ -104,6 +107,8 @@ public final class MergeEventListenerTest {
when(shardingContext.getDatabaseType()).thenReturn(DatabaseType.MySQL);
when(shardingContext.isShowSQL()).thenReturn(true);
mergeEngine = new DALMergeEngine(null, null, new ShowDatabasesStatement(), null);
shardingDataSource = Mockito.mock(ShardingDataSource.class);
when(shardingDataSource.getShardingContext()).thenReturn(shardingContext);
}
private DataSource mockDataSource() throws SQLException {
......@@ -118,7 +123,7 @@ public final class MergeEventListenerTest {
@Test
public void assertPreparedStatementRouting() throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
ShardingPreparedStatement statement = new ShardingPreparedStatement(new ShardingConnection(shardingContext), "show databases");
ShardingPreparedStatement statement = new ShardingPreparedStatement(new ShardingConnection(shardingDataSource), "show databases");
Method mergeMethod = ShardingPreparedStatement.class.getDeclaredMethod("merge", MergeEngine.class);
mergeMethod.setAccessible(true);
mergeMethod.invoke(statement, mergeEngine);
......@@ -127,7 +132,7 @@ public final class MergeEventListenerTest {
@Test
public void assertStatementRouting() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingContext));
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingDataSource));
Method mergeMethod = ShardingStatement.class.getDeclaredMethod("merge", MergeEngine.class);
mergeMethod.setAccessible(true);
mergeMethod.invoke(statement, mergeEngine);
......@@ -138,7 +143,7 @@ public final class MergeEventListenerTest {
public void assertException() {
try {
MergeEngine errorMergeEngine = new DALMergeEngine(null, null, new ShowColumnsStatement(), null);
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingContext));
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingDataSource));
Method mergeMethod = ShardingStatement.class.getDeclaredMethod("merge", MergeEngine.class);
mergeMethod.setAccessible(true);
mergeMethod.invoke(statement, errorMergeEngine);
......
......@@ -30,6 +30,7 @@ import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.core.jdbc.core.statement.ShardingPreparedStatement;
import io.shardingsphere.core.jdbc.core.statement.ShardingStatement;
import io.shardingsphere.core.metadata.ShardingMetaData;
......@@ -66,6 +67,8 @@ public final class SqlRoutingEventListenerTest {
private ShardingContext shardingContext;
private ShardingDataSource shardingDataSource;
@BeforeClass
public static void init() {
ShardingTracer.init(TRACER);
......@@ -97,6 +100,8 @@ public final class SqlRoutingEventListenerTest {
when(shardingContext.getMetaData()).thenReturn(shardingMetaData);
when(shardingContext.getDatabaseType()).thenReturn(DatabaseType.MySQL);
when(shardingContext.isShowSQL()).thenReturn(true);
shardingDataSource = Mockito.mock(ShardingDataSource.class);
when(shardingDataSource.getShardingContext()).thenReturn(shardingContext);
}
private DataSource mockDataSource() throws SQLException {
......@@ -111,7 +116,7 @@ public final class SqlRoutingEventListenerTest {
@Test
public void assertPreparedStatementRouting() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
ShardingPreparedStatement statement = new ShardingPreparedStatement(new ShardingConnection(shardingContext), "select * from t_order");
ShardingPreparedStatement statement = new ShardingPreparedStatement(new ShardingConnection(shardingDataSource), "select * from t_order");
Method sqlRouteMethod = ShardingPreparedStatement.class.getDeclaredMethod("sqlRoute");
sqlRouteMethod.setAccessible(true);
sqlRouteMethod.invoke(statement);
......@@ -121,7 +126,7 @@ public final class SqlRoutingEventListenerTest {
@Test
public void assertStatementRouting() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingContext));
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingDataSource));
Method sqlRouteMethod = ShardingStatement.class.getDeclaredMethod("sqlRoute", String.class);
sqlRouteMethod.setAccessible(true);
sqlRouteMethod.invoke(statement, "select * from t_order");
......@@ -131,7 +136,7 @@ public final class SqlRoutingEventListenerTest {
@Test
public void assertException() {
try {
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingContext));
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingDataSource));
Method sqlRouteMethod = ShardingStatement.class.getDeclaredMethod("sqlRoute", String.class);
sqlRouteMethod.setAccessible(true);
sqlRouteMethod.invoke(statement, "111");
......
......@@ -20,6 +20,7 @@ package io.shardingsphere.opentracing.listener.merger;
import io.opentracing.tag.Tags;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.core.jdbc.core.statement.ShardingPreparedStatement;
import io.shardingsphere.core.jdbc.core.statement.ShardingStatement;
import io.shardingsphere.core.merger.MergeEngine;
......@@ -29,6 +30,7 @@ import io.shardingsphere.core.parsing.parser.dialect.mysql.statement.ShowDatabas
import io.shardingsphere.opentracing.fixture.ShardingContextBuilder;
import io.shardingsphere.opentracing.listener.BaseEventListenerTest;
import org.junit.Test;
import org.mockito.Mockito;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
......@@ -37,6 +39,7 @@ import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
public final class MergeEventListenerTest extends BaseEventListenerTest {
......@@ -44,14 +47,18 @@ public final class MergeEventListenerTest extends BaseEventListenerTest {
private final MergeEngine mergeEngine;
private final ShardingDataSource shardingDataSource;
public MergeEventListenerTest() throws SQLException {
shardingContext = ShardingContextBuilder.build();
mergeEngine = new DALMergeEngine(null, null, new ShowDatabasesStatement(), null);
shardingDataSource = Mockito.mock(ShardingDataSource.class);
when(shardingDataSource.getShardingContext()).thenReturn(shardingContext);
}
@Test
public void assertPreparedStatementRouting() throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
ShardingPreparedStatement statement = new ShardingPreparedStatement(new ShardingConnection(shardingContext), "show databases");
ShardingPreparedStatement statement = new ShardingPreparedStatement(new ShardingConnection(shardingDataSource), "show databases");
Method mergeMethod = ShardingPreparedStatement.class.getDeclaredMethod("merge", MergeEngine.class);
mergeMethod.setAccessible(true);
mergeMethod.invoke(statement, mergeEngine);
......@@ -60,7 +67,7 @@ public final class MergeEventListenerTest extends BaseEventListenerTest {
@Test
public void assertStatementRouting() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingContext));
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingDataSource));
Method mergeMethod = ShardingStatement.class.getDeclaredMethod("merge", MergeEngine.class);
mergeMethod.setAccessible(true);
mergeMethod.invoke(statement, mergeEngine);
......@@ -71,7 +78,7 @@ public final class MergeEventListenerTest extends BaseEventListenerTest {
public void assertException() {
try {
MergeEngine errorMergeEngine = new DALMergeEngine(null, null, new ShowColumnsStatement(), null);
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingContext));
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingDataSource));
Method mergeMethod = ShardingStatement.class.getDeclaredMethod("merge", MergeEngine.class);
mergeMethod.setAccessible(true);
mergeMethod.invoke(statement, errorMergeEngine);
......
......@@ -20,11 +20,13 @@ package io.shardingsphere.opentracing.listener.routing;
import io.opentracing.tag.Tags;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.core.jdbc.core.datasource.ShardingDataSource;
import io.shardingsphere.core.jdbc.core.statement.ShardingPreparedStatement;
import io.shardingsphere.core.jdbc.core.statement.ShardingStatement;
import io.shardingsphere.opentracing.fixture.ShardingContextBuilder;
import io.shardingsphere.opentracing.listener.BaseEventListenerTest;
import org.junit.Test;
import org.mockito.Mockito;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
......@@ -33,18 +35,23 @@ import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
public final class RoutingEventListenerTest extends BaseEventListenerTest {
private final ShardingContext shardingContext;
private final ShardingDataSource shardingDataSource;
public RoutingEventListenerTest() throws SQLException {
shardingContext = ShardingContextBuilder.build();
shardingDataSource = Mockito.mock(ShardingDataSource.class);
when(shardingDataSource.getShardingContext()).thenReturn(shardingContext);
}
@Test
public void assertPreparedStatementRouting() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
ShardingPreparedStatement statement = new ShardingPreparedStatement(new ShardingConnection(shardingContext), "select * from t_order");
ShardingPreparedStatement statement = new ShardingPreparedStatement(new ShardingConnection(shardingDataSource), "select * from t_order");
Method sqlRouteMethod = ShardingPreparedStatement.class.getDeclaredMethod("sqlRoute");
sqlRouteMethod.setAccessible(true);
sqlRouteMethod.invoke(statement);
......@@ -54,7 +61,7 @@ public final class RoutingEventListenerTest extends BaseEventListenerTest {
@Test
public void assertStatementRouting() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingContext));
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingDataSource));
Method sqlRouteMethod = ShardingStatement.class.getDeclaredMethod("sqlRoute", String.class);
sqlRouteMethod.setAccessible(true);
sqlRouteMethod.invoke(statement, "select * from t_order");
......@@ -64,7 +71,7 @@ public final class RoutingEventListenerTest extends BaseEventListenerTest {
@Test
public void assertException() {
try {
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingContext));
ShardingStatement statement = new ShardingStatement(new ShardingConnection(shardingDataSource));
Method sqlRouteMethod = ShardingStatement.class.getDeclaredMethod("sqlRoute", String.class);
sqlRouteMethod.setAccessible(true);
sqlRouteMethod.invoke(statement, "111");
......
......@@ -26,9 +26,9 @@ import io.shardingsphere.core.constant.properties.ShardingProperties;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.constant.transaction.TransactionType;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.event.orche.config.ProxyConfigurationEventBusEvent;
import io.shardingsphere.core.event.orche.state.CircuitStateEventBusEvent;
import io.shardingsphere.core.event.orche.state.DisabledStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.config.ProxyConfigurationEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.state.CircuitStateEventBusEvent;
import io.shardingsphere.jdbc.orchestration.internal.event.state.DisabledStateEventBusEvent;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.metadata.ShardingMetaData;
import io.shardingsphere.core.rule.DataSourceParameter;
......
......@@ -28,6 +28,7 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.Binar
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.close.ComStmtClosePacketTest;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.execute.BinaryProtocolValueTest;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.execute.BinaryResultSetRowPacketTest;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.execute.ComStmtExecutePacketTest;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.execute.NullBitmapTest;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.prepare.ComStmtPrepareOKPacketTest;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.text.TextResultSetRowPacketTest;
......@@ -53,7 +54,7 @@ import org.junit.runners.Suite.SuiteClasses;
ComQueryPacketTest.class,
ComStmtPrepareOKPacketTest.class,
BinaryResultSetRowPacketTest.class,
ComStmtExecutePacketTest.class,
ComStmtClosePacketTest.class,
ComInitDbPacketTest.class,
ComPingPacketTest.class,
......
......@@ -18,6 +18,7 @@
package io.shardingsphere.proxy.transport.mysql.packet.command.query.binary;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.fixture.BinaryStatementRegistryUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -29,7 +30,8 @@ public final class BinaryStatementRegistryTest {
private final String sql = "SELECT * FROM tbl WHERE id=?";
@Before
public void setUp() throws ReflectiveOperationException {
@After
public void reset() throws ReflectiveOperationException {
BinaryStatementRegistryUtil.reset();
}
......
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.execute;
import com.google.common.base.Optional;
import io.shardingsphere.proxy.backend.BackendHandler;
import io.shardingsphere.proxy.backend.ResultPacket;
import io.shardingsphere.proxy.backend.jdbc.connection.BackendConnection;
import io.shardingsphere.proxy.config.BackendNIOConfiguration;
import io.shardingsphere.proxy.config.RuleRegistry;
import io.shardingsphere.proxy.transport.common.packet.DatabasePacket;
import io.shardingsphere.proxy.transport.mysql.constant.ColumnType;
import io.shardingsphere.proxy.transport.mysql.packet.MySQLPacketPayload;
import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePackets;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.BinaryStatementRegistry;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.fixture.BinaryStatementRegistryUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.sql.SQLException;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class ComStmtExecutePacketTest {
@Mock
private MySQLPacketPayload payload;
@Mock
private BackendConnection backendConnection;
@Before
public void setUp() throws ReflectiveOperationException {
Field field = RuleRegistry.class.getDeclaredField("backendNIOConfig");
field.setAccessible(true);
field.set(RuleRegistry.getInstance(), new BackendNIOConfiguration(true, 1, 0));
}
@Before
@After
public void reset() throws ReflectiveOperationException {
BinaryStatementRegistryUtil.reset();
}
@Test
public void assertWrite() {
BinaryStatementRegistry.getInstance().register("SELECT id FROM tbl WHERE id=?", 1);
when(payload.readInt4()).thenReturn(1);
when(payload.readInt1()).thenReturn(0, 1);
ComStmtExecutePacket actual = new ComStmtExecutePacket(1, 1000, payload, backendConnection);
assertThat(actual.getSequenceId(), is(1));
actual.write(payload);
verify(payload, times(2)).writeInt4(1);
verify(payload, times(4)).writeInt1(1);
verify(payload).writeInt1(0);
verify(payload).writeStringLenenc("");
}
@Test
public void assertExecute() throws ReflectiveOperationException, SQLException {
BinaryStatementRegistry.getInstance().register("SELECT id FROM tbl WHERE id=?", 1);
BackendHandler backendHandler = mock(BackendHandler.class);
when(payload.readInt4()).thenReturn(1);
when(payload.readInt1()).thenReturn(0, 1);
CommandResponsePackets expectedCommandResponsePackets = new CommandResponsePackets();
when(backendHandler.execute()).thenReturn(expectedCommandResponsePackets);
when(backendHandler.next()).thenReturn(true, false);
when(backendHandler.getResultValue()).thenReturn(new ResultPacket(2, Collections.<Object>singletonList(99999L), 1, Collections.singletonList(ColumnType.MYSQL_TYPE_LONG)));
ComStmtExecutePacket packet = new ComStmtExecutePacket(1, 1000, payload, backendConnection);
setBackendHandler(packet, backendHandler);
Optional<CommandResponsePackets> actualCommandResponsePackets = packet.execute();
assertTrue(actualCommandResponsePackets.isPresent());
assertThat(actualCommandResponsePackets.get(), is(expectedCommandResponsePackets));
assertTrue(packet.next());
DatabasePacket actualResultValue = packet.getResultValue();
assertThat(actualResultValue.getSequenceId(), is(2));
assertThat(((BinaryResultSetRowPacket) actualResultValue).getData(), is(Collections.<Object>singletonList(99999L)));
assertFalse(packet.next());
verify(backendHandler).execute();
verify(backendHandler, times(2)).next();
verify(backendHandler).getResultValue();
}
private void setBackendHandler(final ComStmtExecutePacket packet, final BackendHandler backendHandler) throws ReflectiveOperationException {
Field field = ComStmtExecutePacket.class.getDeclaredField("backendHandler");
field.setAccessible(true);
field.set(packet, backendHandler);
}
}
......@@ -35,6 +35,7 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.query.ColumnDefini
import io.shardingsphere.proxy.transport.mysql.packet.command.query.binary.fixture.BinaryStatementRegistryUtil;
import io.shardingsphere.proxy.transport.mysql.packet.generic.EofPacket;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -61,6 +62,11 @@ public final class ComStmtPreparePacketTest {
@Before
public void setUp() throws ReflectiveOperationException {
setRuleRegistryMetaData();
}
@Before
@After
public void reset() throws ReflectiveOperationException {
BinaryStatementRegistryUtil.reset();
}
......
......@@ -54,6 +54,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -66,9 +67,6 @@ public final class ComQueryPacketTest {
@Mock
private BackendConnection backendConnection;
@Mock
private BackendHandler backendHandler;
private Listener listener;
@Before
......@@ -105,6 +103,7 @@ public final class ComQueryPacketTest {
@Test
public void assertExecuteWithoutTransaction() throws SQLException, ReflectiveOperationException {
when(payload.readStringEOF()).thenReturn("SELECT id FROM tbl");
BackendHandler backendHandler = mock(BackendHandler.class);
when(backendHandler.next()).thenReturn(true, false);
when(backendHandler.getResultValue()).thenReturn(new ResultPacket(1, Collections.<Object>singletonList("id"), 1, Collections.singletonList(ColumnType.MYSQL_TYPE_VARCHAR)));
FieldCountPacket expectedFieldCountPacket = new FieldCountPacket(1, 1);
......@@ -112,7 +111,7 @@ public final class ComQueryPacketTest {
when(backendHandler.next()).thenReturn(true, false);
when(backendHandler.getResultValue()).thenReturn(new ResultPacket(2, Collections.<Object>singletonList(99999L), 1, Collections.singletonList(ColumnType.MYSQL_TYPE_LONG)));
ComQueryPacket packet = new ComQueryPacket(1, 1000, payload, backendConnection);
setBackendHandler(packet);
setBackendHandler(packet, backendHandler);
Optional<CommandResponsePackets> actual = packet.execute();
assertFalse(listener.isCalled());
assertTrue(actual.isPresent());
......@@ -124,7 +123,7 @@ public final class ComQueryPacketTest {
assertFalse(packet.next());
}
private void setBackendHandler(final ComQueryPacket packet) throws ReflectiveOperationException {
private void setBackendHandler(final ComQueryPacket packet, final BackendHandler backendHandler) throws ReflectiveOperationException {
Field field = ComQueryPacket.class.getDeclaredField("backendHandler");
field.setAccessible(true);
field.set(packet, backendHandler);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册