提交 e9c36fb5 编写于 作者: T terrymanu

pass DatabaseType to transaction module

上级 1e9caf28
......@@ -22,8 +22,8 @@ import io.shardingsphere.core.bootstrap.ShardingBootstrap;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.util.ReflectiveUtil;
import io.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationDataSource;
import io.shardingsphere.transaction.spi.xa.DataSourceMapConverter;
import io.shardingsphere.transaction.core.loader.SPIDataSourceMapConverter;
import io.shardingsphere.transaction.spi.xa.DataSourceMapConverter;
import lombok.Getter;
import lombok.Setter;
......@@ -63,7 +63,7 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
this.dataSourceMap = dataSourceMap;
databaseType = getDatabaseType(dataSourceMap.values());
xaDataSourceMap = dataSourceMapConverter.convert(dataSourceMap, databaseType);
xaDataSourceMap = dataSourceMapConverter.convert(databaseType, dataSourceMap);
}
protected final DatabaseType getDatabaseType(final Collection<DataSource> dataSources) throws SQLException {
......
......@@ -29,7 +29,7 @@ import java.util.Map.Entry;
public final class FixedDataSourceMapConverter implements DataSourceMapConverter {
@Override
public Map<String, DataSource> convert(final Map<String, DataSource> dataSourceMap, final DatabaseType databaseType) {
public Map<String, DataSource> convert(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
Map<String, DataSource> result = new HashMap<>(dataSourceMap.size(), 1);
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
HikariDataSource dataSource = new HikariDataSource();
......
......@@ -25,8 +25,7 @@ import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class SPIDataSourceMapConverterTest {
......@@ -35,14 +34,13 @@ public class SPIDataSourceMapConverterTest {
private Map<String, DataSource> dataSourceMap = new HashMap<>();
@Test
public void createBackendDatasourceSuccess() {
Map<String, DataSource> backendDatasourceMap = converter.convert(dataSourceMap, DatabaseType.MySQL);
assertThat(backendDatasourceMap != null, is(true));
assertThat(backendDatasourceMap.size(), is(0));
public void assertCreateBackendDatasourceSuccess() {
Map<String, DataSource> backendDatasourceMap = converter.convert(DatabaseType.MySQL, dataSourceMap);
assertTrue(backendDatasourceMap.isEmpty());
}
@Test
public void createBackendDatasourceFailed() {
public void assertCreateBackendDatasourceFailed() {
// TODO
}
}
......@@ -50,6 +50,6 @@ public final class JDBCXABackendDataSourceFactory implements JDBCBackendDataSour
@Override
public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) {
XATransactionManager xaTransactionManager = XATransactionManagerSPILoader.getInstance().getTransactionManager();
return xaTransactionManager.wrapDataSource(XADataSourceFactory.build(DatabaseType.MySQL), dataSourceName, dataSourceParameter);
return xaTransactionManager.wrapDataSource(DatabaseType.MySQL, XADataSourceFactory.build(DatabaseType.MySQL), dataSourceName, dataSourceParameter);
}
}
......@@ -17,8 +17,9 @@
package io.shardingsphere.transaction.spi.xa;
import io.shardingsphere.transaction.core.internal.context.XATransactionContext;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.core.internal.context.XATransactionContext;
import io.shardingsphere.transaction.core.internal.manager.ShardingTransactionManager;
import javax.sql.DataSource;
......@@ -41,12 +42,13 @@ public interface XATransactionManager extends ShardingTransactionManager<XATrans
/**
* Get specific {@link XADataSource} and enroll it with a JTA.
*
* @param databaseType database type
* @param xaDataSource XA data source
* @param dataSourceName data source name
* @param dataSourceParameter data source parameter
* @return XA data source
*/
DataSource wrapDataSource(XADataSource xaDataSource, String dataSourceName, DataSourceParameter dataSourceParameter);
DataSource wrapDataSource(DatabaseType databaseType, XADataSource xaDataSource, String dataSourceName, DataSourceParameter dataSourceParameter);
/**
* Get transaction manager for vendor provided.
......
......@@ -40,12 +40,12 @@ public final class XADataSourceMapConverter implements DataSourceMapConverter {
private final XATransactionManager xaTransactionManager = XATransactionManagerSPILoader.getInstance().getTransactionManager();
@Override
public Map<String, DataSource> convert(final Map<String, DataSource> dataSourceMap, final DatabaseType databaseType) {
public Map<String, DataSource> convert(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
Map<String, DataSource> result = new HashMap<>(dataSourceMap.size(), 1);
try {
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
DataSourceParameter parameter = DataSourceSwapperRegistry.getSwapper(entry.getValue().getClass()).swap(entry.getValue());
DataSource dataSource = xaTransactionManager.wrapDataSource(XADataSourceFactory.build(databaseType), entry.getKey(), parameter);
DataSource dataSource = xaTransactionManager.wrapDataSource(databaseType, XADataSourceFactory.build(databaseType), entry.getKey(), parameter);
result.put(entry.getKey(), dataSource);
}
return result;
......
......@@ -24,7 +24,6 @@ import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
/**
* XA Data source registry.
......@@ -54,19 +53,4 @@ public final class XADataSourceRegistry {
Preconditions.checkState(XA_DATA_SOURCE_NAMES.containsKey(databaseType), "Cannot support database type: `%s`", databaseType);
return XA_DATA_SOURCE_NAMES.get(databaseType);
}
/**
* Get database type.
*
* @param driverClassName driver class name
* @return database type
*/
public static DatabaseType getDatabaseType(final String driverClassName) {
for (Entry<DatabaseType, String> entry : XA_DATA_SOURCE_NAMES.entrySet()) {
if (entry.getValue().equals(driverClassName)) {
return entry.getKey();
}
}
throw new IllegalArgumentException(String.format("Cannot find database type for `%s`", driverClassName));
}
}
......@@ -20,8 +20,8 @@ package io.shardingsphere.transaction.xa.manager;
import com.atomikos.beans.PropertyException;
import com.atomikos.beans.PropertyUtils;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.xa.convert.datasource.XADataSourceRegistry;
import io.shardingsphere.transaction.xa.convert.datasource.XAPropertiesFactory;
import javax.sql.DataSource;
......@@ -38,14 +38,15 @@ public final class AtomikosDataSourceBeanWrapper implements XADataSourceWrapper
private final AtomikosDataSourceBean delegate = new AtomikosDataSourceBean();
@Override
public DataSource wrap(final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter parameter) throws PropertyException {
setAtomikosDatasourceBean(xaDataSource, dataSourceName, parameter);
public DataSource wrap(final DatabaseType databaseType, final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter parameter) throws PropertyException {
setAtomikosDatasourceBean(databaseType, xaDataSource, dataSourceName, parameter);
return delegate;
}
private void setAtomikosDatasourceBean(final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter parameter) throws PropertyException {
private void setAtomikosDatasourceBean(
final DatabaseType databaseType, final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter parameter) throws PropertyException {
setPoolProperties(parameter);
setXAProperties(xaDataSource, dataSourceName, parameter);
setXAProperties(databaseType, xaDataSource, dataSourceName, parameter);
}
private void setPoolProperties(final DataSourceParameter parameter) {
......@@ -57,10 +58,10 @@ public final class AtomikosDataSourceBeanWrapper implements XADataSourceWrapper
delegate.setMaxIdleTime((int) parameter.getIdleTimeoutMilliseconds() / 1000);
}
private void setXAProperties(final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter parameter) throws PropertyException {
private void setXAProperties(final DatabaseType databaseType, final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter parameter) throws PropertyException {
delegate.setXaDataSourceClassName(xaDataSource.getClass().getName());
delegate.setUniqueResourceName(dataSourceName);
Properties xaProperties = XAPropertiesFactory.build(XADataSourceRegistry.getDatabaseType(xaDataSource.getClass().getName()), parameter);
Properties xaProperties = XAPropertiesFactory.build(databaseType, parameter);
PropertyUtils.setProperties(xaDataSource, xaProperties);
delegate.setXaProperties(xaProperties);
delegate.setXaDataSource(xaDataSource);
......
......@@ -18,6 +18,7 @@
package io.shardingsphere.transaction.xa.manager;
import com.atomikos.icatch.jta.UserTransactionManager;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.core.internal.context.XATransactionContext;
......@@ -100,9 +101,9 @@ public final class AtomikosTransactionManager implements XATransactionManager {
}
@Override
public DataSource wrapDataSource(final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter dataSourceParameter) {
public DataSource wrapDataSource(final DatabaseType databaseType, final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter dataSourceParameter) {
try {
return new AtomikosDataSourceBeanWrapper().wrap(xaDataSource, dataSourceName, dataSourceParameter);
return new AtomikosDataSourceBeanWrapper().wrap(databaseType, xaDataSource, dataSourceName, dataSourceParameter);
} catch (final Exception ex) {
throw new ShardingException("Failed to wrap XADataSource to transactional datasource pool", ex);
}
......
......@@ -18,6 +18,7 @@
package io.shardingsphere.transaction.xa.manager;
import com.atomikos.beans.PropertyException;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import javax.sql.DataSource;
......@@ -33,11 +34,12 @@ public interface XADataSourceWrapper {
/**
* Get a wrapper datasource pool for XA.
*
* @param databaseType database type
* @param xaDataSource xa data source
* @param dataSourceName data source name
* @param parameter data source parameter
* @return wrapper xa data source pool
* @throws PropertyException property exception
*/
DataSource wrap(XADataSource xaDataSource, String dataSourceName, DataSourceParameter parameter) throws PropertyException;
DataSource wrap(DatabaseType databaseType, XADataSource xaDataSource, String dataSourceName, DataSourceParameter parameter) throws PropertyException;
}
......@@ -45,32 +45,32 @@ public class XADataSourceMapConverterTest {
private XADataSourceMapConverter xaDataSourceMapConverter = new XADataSourceMapConverter();
@Test
public void assertGetMysqlXATransactionalDataSourceSuccess() {
Map<String, DataSource> xaDataSourceMap = xaDataSourceMapConverter.convert(createDataSourceMap(PoolType.DBCP2, DatabaseType.MySQL), DatabaseType.MySQL);
public void assertGetH2XATransactionalDataSourceSuccess() {
Map<String, DataSource> xaDataSourceMap = xaDataSourceMapConverter.convert(DatabaseType.H2, createDataSourceMap(PoolType.DRUID, DatabaseType.H2));
assertThat(xaDataSourceMap.size(), is(2));
assertThat(xaDataSourceMap.get("ds1"), instanceOf(AtomikosDataSourceBean.class));
assertThat(xaDataSourceMap.get("ds2"), instanceOf(AtomikosDataSourceBean.class));
}
@Test
public void assertGetH2XATransactionalDataSourceSuccess() {
Map<String, DataSource> xaDataSourceMap = xaDataSourceMapConverter.convert(createDataSourceMap(PoolType.DRUID, DatabaseType.H2), DatabaseType.H2);
public void assertGetMySQLXATransactionalDataSourceSuccess() {
Map<String, DataSource> xaDataSourceMap = xaDataSourceMapConverter.convert(DatabaseType.MySQL, createDataSourceMap(PoolType.DBCP2, DatabaseType.MySQL));
assertThat(xaDataSourceMap.size(), is(2));
assertThat(xaDataSourceMap.get("ds1"), instanceOf(AtomikosDataSourceBean.class));
assertThat(xaDataSourceMap.get("ds2"), instanceOf(AtomikosDataSourceBean.class));
}
@Test
public void assertGetPGXATransactionalDataSourceSuccess() {
Map<String, DataSource> xaDataSourceMap = xaDataSourceMapConverter.convert(createDataSourceMap(PoolType.DRUID, DatabaseType.PostgreSQL), DatabaseType.PostgreSQL);
public void assertGetPostgreSQLXATransactionalDataSourceSuccess() {
Map<String, DataSource> xaDataSourceMap = xaDataSourceMapConverter.convert(DatabaseType.PostgreSQL, createDataSourceMap(PoolType.DRUID, DatabaseType.PostgreSQL));
assertThat(xaDataSourceMap.size(), is(2));
assertThat(xaDataSourceMap.get("ds1"), instanceOf(AtomikosDataSourceBean.class));
assertThat(xaDataSourceMap.get("ds2"), instanceOf(AtomikosDataSourceBean.class));
}
@Test
public void assertGetMSXATransactionalDataSourceSuccess() {
Map<String, DataSource> xaDataSourceMap = xaDataSourceMapConverter.convert(createDataSourceMap(PoolType.HIKARI, DatabaseType.SQLServer), DatabaseType.SQLServer);
public void assertGetSQLServerXATransactionalDataSourceSuccess() {
Map<String, DataSource> xaDataSourceMap = xaDataSourceMapConverter.convert(DatabaseType.SQLServer, createDataSourceMap(PoolType.HIKARI, DatabaseType.SQLServer));
assertThat(xaDataSourceMap.size(), is(2));
assertThat(xaDataSourceMap.get("ds1"), instanceOf(AtomikosDataSourceBean.class));
assertThat(xaDataSourceMap.get("ds2"), instanceOf(AtomikosDataSourceBean.class));
......@@ -79,9 +79,9 @@ public class XADataSourceMapConverterTest {
@Test
public void assertGetTransactionDataSourceFailed() {
XATransactionManager xaTransactionManager = mock(XATransactionManager.class);
doThrow(ShardingException.class).when(xaTransactionManager).wrapDataSource((XADataSource) any(), anyString(), (DataSourceParameter) any());
doThrow(ShardingException.class).when(xaTransactionManager).wrapDataSource((DatabaseType) any(), (XADataSource) any(), anyString(), (DataSourceParameter) any());
ReflectiveUtil.setProperty(xaDataSourceMapConverter, "xaTransactionManager", xaTransactionManager);
Map<String, DataSource> actualDataSourceMap = xaDataSourceMapConverter.convert(createDataSourceMap(PoolType.HIKARI, DatabaseType.SQLServer), DatabaseType.SQLServer);
Map<String, DataSource> actualDataSourceMap = xaDataSourceMapConverter.convert(DatabaseType.SQLServer, createDataSourceMap(PoolType.HIKARI, DatabaseType.SQLServer));
assertThat(actualDataSourceMap.size(), is(0));
}
......
......@@ -17,8 +17,9 @@
package io.shardingsphere.transaction.xa.fixture;
import io.shardingsphere.transaction.core.internal.context.XATransactionContext;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.core.internal.context.XATransactionContext;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
import javax.sql.DataSource;
......@@ -49,7 +50,7 @@ public final class FixtureXATransactionManager implements XATransactionManager {
}
@Override
public DataSource wrapDataSource(final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter dataSourceParameter) {
public DataSource wrapDataSource(final DatabaseType databaseType, final XADataSource xaDataSource, final String dataSourceName, final DataSourceParameter dataSourceParameter) {
return null;
}
......
......@@ -50,7 +50,7 @@ public class AtomikosDataSourceBeanWrapperTest {
@Test
public void assertWrapToAtomikosDataSourceBean() throws PropertyException {
AtomikosDataSourceBeanWrapper atomikosDataSourceBeanWrapper = new AtomikosDataSourceBeanWrapper();
AtomikosDataSourceBean targetDataSource = (AtomikosDataSourceBean) atomikosDataSourceBeanWrapper.wrap(xaDataSource, "ds1", parameter);
AtomikosDataSourceBean targetDataSource = (AtomikosDataSourceBean) atomikosDataSourceBeanWrapper.wrap(DatabaseType.MySQL, xaDataSource, "ds1", parameter);
assertThat(targetDataSource, Matchers.instanceOf(AtomikosDataSourceBean.class));
assertThat(targetDataSource.getXaDataSource(), is(xaDataSource));
assertThat(targetDataSource.getXaDataSourceClassName(), is(XADataSourceRegistry.getXADataSourceClassName(DatabaseType.MySQL)));
......
......@@ -20,6 +20,7 @@ package io.shardingsphere.transaction.xa.manager;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.core.internal.TransactionOperationType;
......@@ -130,7 +131,7 @@ public final class AtomikosTransactionManagerTest {
XADataSource xaDataSource = mock(XADataSource.class);
DataSourceParameter dataSourceParameter = new DataSourceParameter();
dataSourceParameter.setMaxPoolSize(10);
atomikosTransactionManager.wrapDataSource(xaDataSource, "ds_name", dataSourceParameter);
atomikosTransactionManager.wrapDataSource(DatabaseType.MySQL, xaDataSource, "ds_name", dataSourceParameter);
}
@Test
......@@ -141,7 +142,7 @@ public final class AtomikosTransactionManagerTest {
dataSourceParameter.setPassword("root");
dataSourceParameter.setUrl("db:url");
dataSourceParameter.setMaxPoolSize(10);
DataSource actual = atomikosTransactionManager.wrapDataSource(xaDataSource, "ds_name", dataSourceParameter);
DataSource actual = atomikosTransactionManager.wrapDataSource(DatabaseType.H2, xaDataSource, "ds_name", dataSourceParameter);
assertThat(actual, instanceOf(AtomikosDataSourceBean.class));
}
......@@ -152,6 +153,6 @@ public final class AtomikosTransactionManagerTest {
dataSourceParameter.setUrl("db:url");
dataSourceParameter.setMaxPoolSize(0);
XADataSource xaDataSource = new MysqlXADataSource();
atomikosTransactionManager.wrapDataSource(xaDataSource, "ds_name", dataSourceParameter);
atomikosTransactionManager.wrapDataSource(DatabaseType.MySQL, xaDataSource, "ds_name", dataSourceParameter);
}
}
......@@ -49,15 +49,8 @@ public final class SPIDataSourceMapConverter implements DataSourceMapConverter {
dataSourceMapConverter = converters.iterator().next();
}
/**
* Convert normal datasource to xa transactional datasource.
*
* @param dataSourceMap data source map
* @param databaseType database type
* @return xa transactional datasource map
*/
@Override
public Map<String, DataSource> convert(final Map<String, DataSource> dataSourceMap, final DatabaseType databaseType) {
return null != dataSourceMapConverter ? dataSourceMapConverter.convert(dataSourceMap, databaseType) : null;
public Map<String, DataSource> convert(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
return null == dataSourceMapConverter ? null : dataSourceMapConverter.convert(databaseType, dataSourceMap);
}
}
......@@ -32,10 +32,10 @@ public interface DataSourceMapConverter {
/**
* Convert data source map.
*
* @param dataSourceMap data source map
* @param databaseType database type
* @param dataSourceMap data source map
* @return data source map
*/
Map<String, DataSource> convert(Map<String, DataSource> dataSourceMap, DatabaseType databaseType);
Map<String, DataSource> convert(DatabaseType databaseType, Map<String, DataSource> dataSourceMap);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册