提交 6adf0cf6 编写于 作者: T terrymanu

refactor transaction SPI

上级 2f63835e
......@@ -57,7 +57,7 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
databaseType = getDatabaseType(dataSourceMap.values());
ShardingTransactionEngineRegistry.registerTransactionResource(databaseType, dataSourceMap);
ShardingTransactionEngineRegistry.init(databaseType, dataSourceMap);
this.dataSourceMap = dataSourceMap;
}
......@@ -91,12 +91,13 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
}
@Override
public void close() {
public void close() throws Exception {
for (DataSource each : dataSourceMap.values()) {
try {
ReflectiveUtil.findMethod(each, "close").invoke(each);
} catch (final ReflectiveOperationException ignored) {
}
}
ShardingTransactionEngineRegistry.close();
}
}
......@@ -71,7 +71,7 @@ public class ShardingDataSource extends AbstractDataSourceAdapter {
}
@Override
public final void close() {
public final void close() throws Exception {
super.close();
shardingContext.close();
}
......
......@@ -59,7 +59,7 @@ public abstract class AbstractSQLTest {
ex.printStackTrace();
}
}
private static void createJdbcSchema(final DatabaseType dbType) {
try {
Connection conn;
......@@ -113,7 +113,7 @@ public abstract class AbstractSQLTest {
}
@AfterClass
public static void clear() {
public static void clear() throws Exception {
if (shardingDataSource == null) {
return;
}
......
......@@ -39,11 +39,7 @@ public abstract class AbstractShardingTransactionEngineFixture implements Shardi
private static Collection<TransactionOperationType> invocations = new LinkedList<>();
@Override
public final void registerTransactionalResources(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
}
@Override
public final void clearTransactionalResources() {
public final void init(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
}
@Override
......@@ -70,4 +66,8 @@ public abstract class AbstractShardingTransactionEngineFixture implements Shardi
public final void rollback() {
invocations.add(TransactionOperationType.ROLLBACK);
}
@Override
public final void close() {
}
}
......@@ -88,7 +88,7 @@ public class OrchestrationMasterSlaveDataSource extends AbstractOrchestrationDat
}
@Override
public final void close() {
public final void close() throws Exception {
dataSource.close();
getShardingOrchestrationFacade().close();
}
......
......@@ -87,7 +87,7 @@ public class OrchestrationShardingDataSource extends AbstractOrchestrationDataSo
}
@Override
public final void close() {
public final void close() throws Exception {
dataSource.close();
getShardingOrchestrationFacade().close();
}
......
......@@ -30,10 +30,7 @@ import org.junit.runners.Parameterized;
import javax.sql.DataSource;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
......@@ -57,7 +54,7 @@ public class YamlOrchestrationMasterSlaveIntegrateTest extends AbstractYamlDataS
}
@Test
public void assertWithDataSource() throws SQLException, URISyntaxException, IOException {
public void assertWithDataSource() throws Exception {
File yamlFile = new File(YamlOrchestrationMasterSlaveIntegrateTest.class.getResource(filePath).toURI());
DataSource dataSource;
if (hasDataSource) {
......
......@@ -30,10 +30,7 @@ import org.junit.runners.Parameterized;
import javax.sql.DataSource;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
......@@ -57,7 +54,7 @@ public class YamlOrchestrationShardingIntegrateTest extends AbstractYamlDataSour
}
@Test
public void assertWithDataSource() throws SQLException, URISyntaxException, IOException {
public void assertWithDataSource() throws Exception {
File yamlFile = new File(YamlOrchestrationShardingIntegrateTest.class.getResource(filePath).toURI());
DataSource dataSource;
if (hasDataSource) {
......
......@@ -30,10 +30,7 @@ import org.junit.runners.Parameterized;
import javax.sql.DataSource;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
......@@ -60,7 +57,7 @@ public class YamlOrchestrationShardingWithMasterSlaveIntegrateTest extends Abstr
}
@Test
public void assertWithDataSource() throws SQLException, URISyntaxException, IOException {
public void assertWithDataSource() throws Exception {
File yamlFile = new File(YamlOrchestrationShardingWithMasterSlaveIntegrateTest.class.getResource(filePath).toURI());
DataSource dataSource;
if (hasDataSource) {
......
......@@ -40,16 +40,12 @@ public final class ShardingTransactionEngineFixture implements ShardingTransacti
private static Collection<TransactionOperationType> invocations = new LinkedList<>();
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
}
@Override
public void registerTransactionalResources(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
public void init(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
}
@Override
public void clearTransactionalResources() {
public TransactionType getTransactionType() {
return TransactionType.XA;
}
@Override
......@@ -76,4 +72,8 @@ public final class ShardingTransactionEngineFixture implements ShardingTransacti
public void rollback() {
invocations.add(TransactionOperationType.ROLLBACK);
}
@Override
public void close() {
}
}
......@@ -46,12 +46,7 @@ public final class XAShardingTransactionEngine implements ShardingTransactionEng
private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
}
@Override
public void registerTransactionalResources(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
public void init(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
DataSource dataSource = entry.getValue();
if (dataSource instanceof AtomikosDataSourceBean) {
......@@ -62,15 +57,12 @@ public final class XAShardingTransactionEngine implements ShardingTransactionEng
cachedShardingXADataSourceMap.put(resourceName, shardingXADataSource);
xaTransactionManager.registerRecoveryResource(resourceName, shardingXADataSource.getXaDataSource());
}
xaTransactionManager.startup();
xaTransactionManager.init();
}
@Override
public void clearTransactionalResources() {
for (ShardingXADataSource each : cachedShardingXADataSourceMap.values()) {
xaTransactionManager.removeRecoveryResource(each.getResourceName(), each.getXaDataSource());
}
cachedShardingXADataSourceMap.clear();
public TransactionType getTransactionType() {
return TransactionType.XA;
}
@SneakyThrows
......@@ -101,4 +93,13 @@ public final class XAShardingTransactionEngine implements ShardingTransactionEng
public void rollback() {
xaTransactionManager.rollback();
}
@Override
public void close() throws Exception {
for (ShardingXADataSource each : cachedShardingXADataSourceMap.values()) {
xaTransactionManager.removeRecoveryResource(each.getResourceName(), each.getXaDataSource());
}
cachedShardingXADataSourceMap.clear();
xaTransactionManager.close();
}
}
......@@ -41,13 +41,6 @@ public final class XATransactionManagerLoader {
private XATransactionManagerLoader() {
transactionManager = load();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
transactionManager.destroy();
}
}));
}
private XATransactionManager load() {
......
......@@ -39,7 +39,7 @@ public final class AtomikosTransactionManager implements XATransactionManager {
private final UserTransactionService userTransactionService = new UserTransactionServiceImp();
@Override
public void startup() {
public void init() {
userTransactionService.init();
}
......@@ -87,7 +87,7 @@ public final class AtomikosTransactionManager implements XATransactionManager {
}
@Override
public void destroy() {
public void close() {
userTransactionService.shutdown(true);
}
}
......@@ -82,7 +82,7 @@ public class XAShardingTransactionEngineTest {
@Test
public void assertRegisterXATransactionalDataSources() {
Map<String, DataSource> dataSourceMap = createDataSourceMap(DruidXADataSource.class, DatabaseType.MySQL);
xaShardingTransactionEngine.registerTransactionalResources(DatabaseType.MySQL, dataSourceMap);
xaShardingTransactionEngine.init(DatabaseType.MySQL, dataSourceMap);
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
verify(xaTransactionManager).registerRecoveryResource(entry.getKey(), (XADataSource) entry.getValue());
}
......@@ -91,14 +91,14 @@ public class XAShardingTransactionEngineTest {
@Test
public void assertRegisterAtomikosDataSourceBeans() {
Map<String, DataSource> dataSourceMap = createAtomikosDataSourceBeanMap();
xaShardingTransactionEngine.registerTransactionalResources(DatabaseType.MySQL, dataSourceMap);
xaShardingTransactionEngine.init(DatabaseType.MySQL, dataSourceMap);
verify(xaTransactionManager, times(0)).registerRecoveryResource(anyString(), any(XADataSource.class));
}
@Test
public void assertRegisterNoneXATransactionalDAtaSources() {
Map<String, DataSource> dataSourceMap = createDataSourceMap(HikariDataSource.class, DatabaseType.MySQL);
xaShardingTransactionEngine.registerTransactionalResources(DatabaseType.MySQL, dataSourceMap);
xaShardingTransactionEngine.init(DatabaseType.MySQL, dataSourceMap);
Map<String, ShardingXADataSource> cachedXADatasourceMap = getCachedShardingXADataSourceMap();
assertThat(cachedXADatasourceMap.size(), is(2));
}
......@@ -124,9 +124,9 @@ public class XAShardingTransactionEngineTest {
}
@Test
public void assertClearTransactionalDataSources() {
public void assertClose() throws Exception {
setCachedShardingXADataSourceMap("ds1");
xaShardingTransactionEngine.clearTransactionalResources();
xaShardingTransactionEngine.close();
Map<String, ShardingXADataSource> cachedShardingXADataSourceMap = getCachedShardingXADataSourceMap();
verify(xaTransactionManager).removeRecoveryResource(anyString(), any(XADataSource.class));
assertThat(cachedShardingXADataSourceMap.size(), is(0));
......
......@@ -26,7 +26,7 @@ import javax.transaction.xa.XAResource;
public final class FixtureXATransactionManager implements XATransactionManager {
@Override
public void startup() {
public void init() {
}
@Override
......@@ -59,6 +59,6 @@ public final class FixtureXATransactionManager implements XATransactionManager {
}
@Override
public void destroy() {
public void close() {
}
}
......@@ -56,8 +56,8 @@ public final class AtomikosTransactionManagerTest {
}
@Test
public void assertStartup() {
atomikosTransactionManager.startup();
public void assertInit() {
atomikosTransactionManager.init();
verify(userTransactionService).init();
}
......@@ -101,8 +101,8 @@ public final class AtomikosTransactionManagerTest {
}
@Test
public void assertShutdown() {
atomikosTransactionManager.destroy();
public void assertClose() {
atomikosTransactionManager.close();
verify(userTransactionService).shutdown(true);
}
}
......@@ -28,18 +28,18 @@ import javax.transaction.xa.XAResource;
* @author zhangliang
* @author zhaojun
*/
public interface XATransactionManager extends ShardingTransactionManager {
public interface XATransactionManager extends ShardingTransactionManager, AutoCloseable {
/**
* Startup XA transaction manager.
* Initialize XA transaction manager.
*/
void startup();
void init();
/**
* Register recovery resource.
*
* @param dataSourceName data source name
* @param xaDataSource XA data source
* @param xaDataSource XA data source
*/
void registerRecoveryResource(String dataSourceName, XADataSource xaDataSource);
......@@ -57,9 +57,4 @@ public interface XATransactionManager extends ShardingTransactionManager {
* @param xaResource XA resource
*/
void enlistResource(XAResource xaResource);
/**
* Destroy the transaction manager and could be helpful with shutdown gracefully.
*/
void destroy();
}
......@@ -70,14 +70,25 @@ public final class ShardingTransactionEngineRegistry {
}
/**
* Register transaction resource.
* Initialize sharding transaction engines.
*
* @param databaseType database type
* @param dataSourceMap data source map
*/
public static void registerTransactionResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
public static void init(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
for (Entry<TransactionType, ShardingTransactionEngine> entry : ENGINES.entrySet()) {
entry.getValue().registerTransactionalResources(databaseType, dataSourceMap);
entry.getValue().init(databaseType, dataSourceMap);
}
}
/**
* Close sharding transaction engines.
*
* @throws Exception exception
*/
public static void close() throws Exception {
for (Entry<TransactionType, ShardingTransactionEngine> entry : ENGINES.entrySet()) {
entry.getValue().close();
}
}
}
......@@ -31,27 +31,22 @@ import java.util.Map;
* @author zhaojun
*
*/
public interface ShardingTransactionEngine {
public interface ShardingTransactionEngine extends AutoCloseable {
/**
* Get transaction type.
*
* @return transaction type
*/
TransactionType getTransactionType();
/**
* Register transaction data sources.
* Initialize sharding transaction engine.
*
* @param databaseType database type
* @param dataSourceMap data source map
*/
void registerTransactionalResources(DatabaseType databaseType, Map<String, DataSource> dataSourceMap);
void init(DatabaseType databaseType, Map<String, DataSource> dataSourceMap);
/**
* Clear transactional resources.
* Get transaction type.
*
* @return transaction type
*/
void clearTransactionalResources();
TransactionType getTransactionType();
/**
* Judge is in transaction or not.
......
......@@ -42,7 +42,7 @@ public final class ShardingTransactionEngineRegistryTest {
Runnable caller = mock(Runnable.class);
ShardingTransactionEngineFixture shardingTransactionEngine = (ShardingTransactionEngineFixture) ShardingTransactionEngineRegistry.getEngine(TransactionType.XA);
shardingTransactionEngine.setCaller(caller);
ShardingTransactionEngineRegistry.registerTransactionResource(DatabaseType.H2, mock(Map.class));
ShardingTransactionEngineRegistry.init(DatabaseType.H2, mock(Map.class));
verify(caller).run();
}
}
......@@ -28,16 +28,12 @@ import java.util.Map;
public final class OtherShardingTransactionEngineFixture implements ShardingTransactionEngine {
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
}
@Override
public void registerTransactionalResources(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
public void init(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
}
@Override
public void clearTransactionalResources() {
public TransactionType getTransactionType() {
return TransactionType.XA;
}
@Override
......@@ -61,4 +57,8 @@ public final class OtherShardingTransactionEngineFixture implements ShardingTran
@Override
public void rollback() {
}
@Override
public void close() {
}
}
......@@ -32,17 +32,13 @@ public final class ShardingTransactionEngineFixture implements ShardingTransacti
private Runnable caller;
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
}
@Override
public void registerTransactionalResources(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
public void init(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
caller.run();
}
@Override
public void clearTransactionalResources() {
public TransactionType getTransactionType() {
return TransactionType.XA;
}
@Override
......@@ -66,4 +62,8 @@ public final class ShardingTransactionEngineFixture implements ShardingTransacti
@Override
public void rollback() {
}
@Override
public void close() {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册