diff --git a/pom.xml b/pom.xml
index 728e235162e1b41d7f193245450754670f99e795..315159972ed7cf5502cd8175c7c1a061c4db9fd5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,7 +40,7 @@
1.101.1
- 4.0.4
+ 4.0.62.10.01.7.0
diff --git a/sharding-core/src/main/java/io/shardingsphere/core/constant/PoolType.java b/sharding-core/src/main/java/io/shardingsphere/core/constant/PoolType.java
index 7dd846ecd1d1255d3d1ba1c34eac4a072644d9e5..5b11f8342aa1f0c2aa9327a07228d8b07acb701b 100644
--- a/sharding-core/src/main/java/io/shardingsphere/core/constant/PoolType.java
+++ b/sharding-core/src/main/java/io/shardingsphere/core/constant/PoolType.java
@@ -31,6 +31,7 @@ public enum PoolType {
HIKARI("com.zaxxer.hikari.HikariDataSource"),
DRUID("com.alibaba.druid.pool.DruidDataSource"),
+ DRUID_XA("com.alibaba.druid.pool.xa.DruidXADataSource"),
DBCP2("org.apache.commons.dbcp2.BasicDataSource"),
DBCP2_TOMCAT("org.apache.tomcat.dbcp.dbcp2.BasicDataSource"),
UNKNOWN("");
diff --git a/sharding-core/src/main/java/io/shardingsphere/core/util/ReflectiveUtil.java b/sharding-core/src/main/java/io/shardingsphere/core/util/ReflectiveUtil.java
index 70ce055f268d8f5d16a9b2818e5eb24c01776429..f97551b15672ed09ee66d4e12aea3cee01586f91 100644
--- a/sharding-core/src/main/java/io/shardingsphere/core/util/ReflectiveUtil.java
+++ b/sharding-core/src/main/java/io/shardingsphere/core/util/ReflectiveUtil.java
@@ -37,10 +37,13 @@ public class ReflectiveUtil {
*/
@SuppressWarnings("unchecked")
public static Method findMethod(final Object target, final String methodName, final Class>... parameterTypes) throws NoSuchMethodException {
+ Method result;
Class clazz = target.getClass();
while (null != clazz) {
try {
- return clazz.getDeclaredMethod(methodName, parameterTypes);
+ result = clazz.getDeclaredMethod(methodName, parameterTypes);
+ result.setAccessible(true);
+ return result;
} catch (NoSuchMethodException ignored) {
}
clazz = clazz.getSuperclass();
diff --git a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractConnectionAdapter.java b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractConnectionAdapter.java
index 5e11c3cd9d3e292944c61ee217c333cdb2057413..676a1eb739f27959e27046f430ae00cc7f5eb063 100644
--- a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractConnectionAdapter.java
+++ b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractConnectionAdapter.java
@@ -31,9 +31,6 @@ import io.shardingsphere.spi.root.SPIRootInvokeHook;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import io.shardingsphere.transaction.core.TransactionOperationType;
-import io.shardingsphere.transaction.core.context.SagaTransactionContext;
-import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
-import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import lombok.Getter;
@@ -79,12 +76,12 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
private final TransactionType transactionType;
- private final ShardingTransactionHandler shardingTransactionHandler;
+ private final ShardingTransactionHandler shardingTransactionHandler;
protected AbstractConnectionAdapter(final TransactionType transactionType) {
rootInvokeHook.start();
this.transactionType = transactionType;
- shardingTransactionHandler = ShardingTransactionHandlerRegistry.getInstance().getHandler(transactionType);
+ shardingTransactionHandler = ShardingTransactionHandlerRegistry.getHandler(transactionType);
if (TransactionType.LOCAL != transactionType) {
Preconditions.checkNotNull(shardingTransactionHandler, "Cannot find transaction manager of [%s]", transactionType);
}
@@ -123,13 +120,13 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
} else if (!connections.isEmpty()) {
result = new ArrayList<>(connectionSize);
result.addAll(connections);
- List newConnections = createConnections(connectionMode, dataSource, connectionSize - connections.size());
+ List newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());
result.addAll(newConnections);
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, newConnections);
}
} else {
- result = new ArrayList<>(createConnections(connectionMode, dataSource, connectionSize));
+ result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize));
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, result);
}
@@ -138,23 +135,23 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- private List createConnections(final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
+ private List createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
if (1 == connectionSize) {
- return Collections.singletonList(createConnection(dataSource));
+ return Collections.singletonList(createConnection(dataSourceName, dataSource));
}
if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
- return createConnections(dataSource, connectionSize);
+ return createConnections(dataSourceName, dataSource, connectionSize);
}
synchronized (dataSource) {
- return createConnections(dataSource, connectionSize);
+ return createConnections(dataSourceName, dataSource, connectionSize);
}
}
- private List createConnections(final DataSource dataSource, final int connectionSize) throws SQLException {
+ private List createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
List result = new ArrayList<>(connectionSize);
for (int i = 0; i < connectionSize; i++) {
try {
- result.add(createConnection(dataSource));
+ result.add(createConnection(dataSourceName, dataSource));
} catch (final SQLException ex) {
for (Connection each : result) {
each.close();
@@ -165,8 +162,13 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
return result;
}
- private Connection createConnection(final DataSource dataSource) throws SQLException {
- Connection result = dataSource.getConnection();
+ private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
+ Connection result;
+ if (null != shardingTransactionHandler) {
+ result = shardingTransactionHandler.createConnection(dataSourceName, dataSource);
+ } else {
+ result = dataSource.getConnection();
+ }
replayMethodsInvocation(result);
return result;
}
@@ -190,14 +192,11 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
connection.setAutoCommit(autoCommit);
}
});
- }
- if (autoCommit) {
- return;
- }
- if (TransactionType.XA == transactionType) {
- shardingTransactionHandler.doInTransaction(new XATransactionContext(TransactionOperationType.BEGIN));
- } else if (TransactionType.BASE == transactionType) {
- shardingTransactionHandler.doInTransaction(new SagaTransactionContext(TransactionOperationType.BEGIN, this));
+ } else {
+ if (autoCommit) {
+ return;
+ }
+ shardingTransactionHandler.doInTransaction(TransactionOperationType.BEGIN);
}
}
@@ -211,10 +210,8 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
connection.commit();
}
});
- } else if (TransactionType.XA == transactionType) {
- shardingTransactionHandler.doInTransaction(new XATransactionContext(TransactionOperationType.COMMIT));
- } else if (TransactionType.BASE == transactionType) {
- shardingTransactionHandler.doInTransaction(new SagaTransactionContext(TransactionOperationType.COMMIT));
+ } else {
+ shardingTransactionHandler.doInTransaction(TransactionOperationType.COMMIT);
}
}
@@ -228,10 +225,8 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
connection.rollback();
}
});
- } else if (TransactionType.XA == transactionType) {
- shardingTransactionHandler.doInTransaction(new XATransactionContext(TransactionOperationType.ROLLBACK));
- } else if (TransactionType.BASE == transactionType) {
- shardingTransactionHandler.doInTransaction(new SagaTransactionContext(TransactionOperationType.ROLLBACK));
+ } else {
+ shardingTransactionHandler.doInTransaction(TransactionOperationType.ROLLBACK);
}
}
diff --git a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractDataSourceAdapter.java b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractDataSourceAdapter.java
index bf370fdf2bb59acd243eb7e3e8d82200a4eec783..b0c52fbcc6e365c59f419ea50e5af7fc4845486c 100644
--- a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractDataSourceAdapter.java
+++ b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/adapter/AbstractDataSourceAdapter.java
@@ -20,8 +20,9 @@ package io.shardingsphere.shardingjdbc.jdbc.adapter;
import com.google.common.base.Preconditions;
import io.shardingsphere.core.bootstrap.ShardingBootstrap;
import io.shardingsphere.core.constant.DatabaseType;
+import io.shardingsphere.core.util.ReflectiveUtil;
import io.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationDataSource;
-import io.shardingsphere.transaction.core.datasource.ShardingTransactionalDataSource;
+import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import lombok.Getter;
import lombok.Setter;
@@ -50,13 +51,14 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
private final DatabaseType databaseType;
- private final ShardingTransactionalDataSource shardingTransactionalDataSources;
+ private final Map dataSourceMap;
private PrintWriter logWriter = new PrintWriter(System.out);
public AbstractDataSourceAdapter(final Map dataSourceMap) throws SQLException {
databaseType = getDatabaseType(dataSourceMap.values());
- shardingTransactionalDataSources = new ShardingTransactionalDataSource(databaseType, dataSourceMap);
+ ShardingTransactionHandlerRegistry.registerTransactionResource(databaseType, dataSourceMap);
+ this.dataSourceMap = dataSourceMap;
}
protected final DatabaseType getDatabaseType(final Collection dataSources) throws SQLException {
@@ -78,15 +80,6 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
}
}
- /**
- * Get data source map.
- *
- * @return data source map
- */
- public final Map getDataSourceMap() {
- return shardingTransactionalDataSources.getOriginalDataSourceMap();
- }
-
@Override
public final Logger getParentLogger() {
return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
@@ -99,6 +92,11 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
@Override
public void close() {
- shardingTransactionalDataSources.close();
+ for (DataSource each : dataSourceMap.values()) {
+ try {
+ ReflectiveUtil.findMethod(each, "close").invoke(each);
+ } catch (final ReflectiveOperationException ignored) {
+ }
+ }
}
}
diff --git a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/MasterSlaveDataSource.java b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/MasterSlaveDataSource.java
index 36a3c634752af497340d549f8a74a2f248511b30..6a0adf07f69c920cc42333ffa8e5dd753cc734d2 100644
--- a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/MasterSlaveDataSource.java
+++ b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/MasterSlaveDataSource.java
@@ -81,6 +81,6 @@ public class MasterSlaveDataSource extends AbstractDataSourceAdapter {
@Override
public final MasterSlaveConnection getConnection() {
- return new MasterSlaveConnection(this, getShardingTransactionalDataSources().getDataSourceMap(), TransactionTypeHolder.get());
+ return new MasterSlaveConnection(this, getDataSourceMap(), TransactionTypeHolder.get());
}
}
diff --git a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/ShardingDataSource.java b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/ShardingDataSource.java
index 9f35d30e6dd90ececa0e114155f274f3d8707abf..f735e6e4442c0fc9e127084a63b5c9d2798af2d8 100644
--- a/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/ShardingDataSource.java
+++ b/sharding-jdbc/sharding-jdbc-core/src/main/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/ShardingDataSource.java
@@ -67,7 +67,7 @@ public class ShardingDataSource extends AbstractDataSourceAdapter {
@Override
public final ShardingConnection getConnection() {
- return new ShardingConnection(getShardingTransactionalDataSources().getDataSourceMap(), shardingContext, TransactionTypeHolder.get());
+ return new ShardingConnection(getDataSourceMap(), shardingContext, TransactionTypeHolder.get());
}
@Override
diff --git a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/adapter/ConnectionAdapterTest.java b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/adapter/ConnectionAdapterTest.java
index 02eb11e47a00c70699e11539c45cae8e5ebee536..25ef7cfbbb8d270900572772c9db00c2044bc5fa 100644
--- a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/adapter/ConnectionAdapterTest.java
+++ b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/adapter/ConnectionAdapterTest.java
@@ -25,6 +25,7 @@ import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedXAShardingTransaction
import io.shardingsphere.shardingjdbc.jdbc.util.JDBCTestSQL;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
+import io.shardingsphere.transaction.core.TransactionOperationType;
import lombok.SneakyThrows;
import org.junit.After;
import org.junit.Test;
@@ -35,7 +36,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -103,7 +103,7 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd
TransactionTypeHolder.set(TransactionType.XA);
try (ShardingConnection actual = getShardingDataSource().getConnection()) {
actual.commit();
- assertNotNull(FixedXAShardingTransactionHandler.getInvokes().get("commit"));
+ assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
}
}
@@ -122,7 +122,7 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd
TransactionTypeHolder.set(TransactionType.XA);
try (ShardingConnection actual = getShardingDataSource().getConnection()) {
actual.rollback();
- assertNotNull(FixedXAShardingTransactionHandler.getInvokes().get("rollback"));
+ assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
}
diff --git a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/connection/ShardingConnectionTest.java b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/connection/ShardingConnectionTest.java
index 8a63cb1fb1c59db0b4d7db1932467ec25d16de1f..c07bdf44a160541ade508594dfe71d2399e67b66 100644
--- a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/connection/ShardingConnectionTest.java
+++ b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/connection/ShardingConnectionTest.java
@@ -29,7 +29,7 @@ import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedBaseShardingTransacti
import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedXAShardingTransactionHandler;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
-import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
+import io.shardingsphere.transaction.core.TransactionOperationType;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -42,7 +42,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
@@ -111,21 +110,21 @@ public final class ShardingConnectionTest {
public void assertXATransactionOperation() throws SQLException {
connection = new ShardingConnection(dataSourceMap, shardingContext, TransactionType.XA);
connection.setAutoCommit(false);
- assertThat(FixedXAShardingTransactionHandler.getInvokes().get("begin"), instanceOf(ShardingTransactionContext.class));
+ assertThat(FixedXAShardingTransactionHandler.getInvokes().get("begin"), is(TransactionOperationType.BEGIN));
connection.commit();
- assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), instanceOf(ShardingTransactionContext.class));
+ assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
connection.rollback();
- assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), instanceOf(ShardingTransactionContext.class));
+ assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
@Test
public void assertBaseTransactionOperation() throws SQLException {
connection = new ShardingConnection(dataSourceMap, shardingContext, TransactionType.BASE);
connection.setAutoCommit(false);
- assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("begin"), instanceOf(ShardingTransactionContext.class));
+ assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("begin"), is(TransactionOperationType.BEGIN));
connection.commit();
- assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("commit"), instanceOf(ShardingTransactionContext.class));
+ assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
connection.rollback();
- assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("rollback"), instanceOf(ShardingTransactionContext.class));
+ assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
}
diff --git a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/ShardingDataSourceTest.java b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/ShardingDataSourceTest.java
index eed3c92eca080859a6bf5fe3941179c6f91fff99..6d4e2a2cbf83d6a4215fec5ddedaf291109507ac 100644
--- a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/ShardingDataSourceTest.java
+++ b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/datasource/ShardingDataSourceTest.java
@@ -163,7 +163,7 @@ public final class ShardingDataSourceTest {
dataSourceMap.put("ds", dataSource);
TransactionTypeHolder.set(TransactionType.XA);
ShardingDataSource shardingDataSource = createShardingDataSource(dataSourceMap);
- assertThat(shardingDataSource.getShardingTransactionalDataSources().getDataSourceMap().size(), is(1));
+ assertThat(shardingDataSource.getDataSourceMap().size(), is(1));
ShardingConnection shardingConnection = shardingDataSource.getConnection();
assertThat(shardingConnection.getDataSourceMap().size(), is(1));
}
diff --git a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/fixed/FixedBaseShardingTransactionHandler.java b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/fixed/FixedBaseShardingTransactionHandler.java
index 52e92b199cc99a77e753e07c0f064655884d63e3..627b066f8aaeea25a81de4cee46187d452fb65ab 100644
--- a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/fixed/FixedBaseShardingTransactionHandler.java
+++ b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/fixed/FixedBaseShardingTransactionHandler.java
@@ -18,8 +18,9 @@
package io.shardingsphere.shardingjdbc.jdbc.core.fixed;
import io.shardingsphere.transaction.api.TransactionType;
-import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
-import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
+import io.shardingsphere.transaction.core.TransactionOperationType;
+import io.shardingsphere.transaction.core.handler.ShardingTransactionHandlerAdapter;
+import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import java.util.HashMap;
import java.util.Map;
@@ -29,35 +30,40 @@ import java.util.Map;
*
* @author zhaojun
*/
-public final class FixedBaseShardingTransactionHandler implements ShardingTransactionHandler {
+public final class FixedBaseShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
- private static final Map INVOKES = new HashMap<>();
+ private static final Map INVOKES = new HashMap<>();
/**
* Get invoke map.
*
* @return map
*/
- public static Map getInvokes() {
+ public static Map getInvokes() {
return INVOKES;
}
@Override
- public void doInTransaction(final ShardingTransactionContext context) {
- switch (context.getOperationType()) {
+ public void doInTransaction(final TransactionOperationType transactionOperationType) {
+ switch (transactionOperationType) {
case BEGIN:
- INVOKES.put("begin", context);
+ INVOKES.put("begin", transactionOperationType);
return;
case COMMIT:
- INVOKES.put("commit", context);
+ INVOKES.put("commit", transactionOperationType);
return;
case ROLLBACK:
- INVOKES.put("rollback", context);
+ INVOKES.put("rollback", transactionOperationType);
return;
default:
}
}
+ @Override
+ public ShardingTransactionManager getShardingTransactionManager() {
+ return null;
+ }
+
@Override
public TransactionType getTransactionType() {
return TransactionType.BASE;
diff --git a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/fixed/FixedXAShardingTransactionHandler.java b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/fixed/FixedXAShardingTransactionHandler.java
index 987f47cfceb5e44e262f676f3385968100a689e6..6c1d25d516156b03a912fb5594cfefa8eb9151a5 100644
--- a/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/fixed/FixedXAShardingTransactionHandler.java
+++ b/sharding-jdbc/sharding-jdbc-core/src/test/java/io/shardingsphere/shardingjdbc/jdbc/core/fixed/FixedXAShardingTransactionHandler.java
@@ -18,8 +18,9 @@
package io.shardingsphere.shardingjdbc.jdbc.core.fixed;
import io.shardingsphere.transaction.api.TransactionType;
-import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
-import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
+import io.shardingsphere.transaction.core.TransactionOperationType;
+import io.shardingsphere.transaction.core.handler.ShardingTransactionHandlerAdapter;
+import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import java.util.HashMap;
import java.util.Map;
@@ -29,35 +30,40 @@ import java.util.Map;
*
* @author zhaojun
*/
-public final class FixedXAShardingTransactionHandler implements ShardingTransactionHandler {
+public final class FixedXAShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
- private static final Map INVOKES = new HashMap<>();
+ private static final Map INVOKES = new HashMap<>();
/**
* Get invoke map.
*
* @return map
*/
- public static Map getInvokes() {
+ public static Map getInvokes() {
return INVOKES;
}
@Override
- public void doInTransaction(final ShardingTransactionContext context) {
- switch (context.getOperationType()) {
+ public void doInTransaction(final TransactionOperationType transactionOperationType) {
+ switch (transactionOperationType) {
case BEGIN:
- INVOKES.put("begin", context);
+ INVOKES.put("begin", transactionOperationType);
return;
case COMMIT:
- INVOKES.put("commit", context);
+ INVOKES.put("commit", transactionOperationType);
return;
case ROLLBACK:
- INVOKES.put("rollback", context);
+ INVOKES.put("rollback", transactionOperationType);
return;
default:
}
}
+ @Override
+ public ShardingTransactionManager getShardingTransactionManager() {
+ return null;
+ }
+
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendTransactionManager.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendTransactionManager.java
index cb0685f25e5a8c47ee22aae956197ea338a143b0..845f86a7aeaaf3e7758426f8a163096ad0b14bd9 100644
--- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendTransactionManager.java
+++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/connection/BackendTransactionManager.java
@@ -20,8 +20,6 @@ package io.shardingsphere.shardingproxy.backend.jdbc.connection;
import com.google.common.base.Preconditions;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
-import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
-import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import lombok.RequiredArgsConstructor;
@@ -41,7 +39,7 @@ public final class BackendTransactionManager implements TransactionManager {
@Override
public void doInTransaction(final TransactionOperationType operationType) throws SQLException {
TransactionType transactionType = connection.getTransactionType();
- ShardingTransactionHandler shardingTransactionHandler = ShardingTransactionHandlerRegistry.getInstance().getHandler(transactionType);
+ ShardingTransactionHandler shardingTransactionHandler = ShardingTransactionHandlerRegistry.getHandler(transactionType);
if (null != transactionType && transactionType != TransactionType.LOCAL) {
Preconditions.checkNotNull(shardingTransactionHandler, String.format("Cannot find transaction manager of [%s]", transactionType));
}
@@ -52,7 +50,7 @@ public final class BackendTransactionManager implements TransactionManager {
if (TransactionType.LOCAL == transactionType) {
new LocalTransactionManager(connection).doInTransaction(operationType);
} else if (TransactionType.XA == transactionType) {
- shardingTransactionHandler.doInTransaction(new XATransactionContext(operationType));
+ shardingTransactionHandler.doInTransaction(operationType);
if (TransactionOperationType.BEGIN != operationType) {
connection.getStateHandler().getAndSetStatus(ConnectionStatus.TERMINATED);
}
diff --git a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java
index d7b45438e157012edf230398e074bea824d43460..6062a0ed0859870bf0d90055b32909eae12f7840 100644
--- a/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java
+++ b/sharding-proxy/src/main/java/io/shardingsphere/shardingproxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java
@@ -20,7 +20,7 @@ package io.shardingsphere.shardingproxy.backend.jdbc.datasource;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
-import io.shardingsphere.transaction.xa.convert.datasource.XADataSourceFactory;
+import io.shardingsphere.transaction.xa.jta.datasource.XADataSourceFactory;
import io.shardingsphere.transaction.xa.manager.XATransactionManagerSPILoader;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java
index b80103fb3645ecfadec74a90c39f5fc4c520a99a..f4f6ab733dbdcdc7fe46b484bdf0b3ee2d0775c5 100644
--- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java
+++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/ComQueryPacketTest.java
@@ -34,7 +34,7 @@ import io.shardingsphere.shardingproxy.transport.mysql.packet.command.query.Fiel
import io.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.TextResultSetRowPacket;
import io.shardingsphere.shardingproxy.transport.mysql.packet.generic.OKPacket;
import io.shardingsphere.transaction.api.TransactionType;
-import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
+import io.shardingsphere.transaction.core.TransactionOperationType;
import lombok.SneakyThrows;
import org.hamcrest.CoreMatchers;
import org.junit.After;
@@ -50,7 +50,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -145,7 +144,7 @@ public final class ComQueryPacketTest {
Optional actual = packet.execute();
assertTrue(actual.isPresent());
assertOKPacket(actual.get());
- assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), instanceOf(ShardingTransactionContext.class));
+ assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
@Test
@@ -157,7 +156,7 @@ public final class ComQueryPacketTest {
Optional actual = packet.execute();
assertTrue(actual.isPresent());
assertOKPacket(actual.get());
- assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), instanceOf(ShardingTransactionContext.class));
+ assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
}
private void assertOKPacket(final CommandResponsePackets actual) {
diff --git a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/FixedXAShardingTransactionHandler.java b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/FixedXAShardingTransactionHandler.java
index e7acbd4671fd2d0e6775e8e24a44432d1bfb666b..e5440c8db6b64b48533839ae5576e4f5195a6eb5 100644
--- a/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/FixedXAShardingTransactionHandler.java
+++ b/sharding-proxy/src/test/java/io/shardingsphere/shardingproxy/transport/mysql/packet/command/query/text/query/FixedXAShardingTransactionHandler.java
@@ -18,8 +18,9 @@
package io.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.query;
import io.shardingsphere.transaction.api.TransactionType;
-import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
-import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
+import io.shardingsphere.transaction.core.TransactionOperationType;
+import io.shardingsphere.transaction.core.handler.ShardingTransactionHandlerAdapter;
+import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import java.util.HashMap;
import java.util.Map;
@@ -29,35 +30,40 @@ import java.util.Map;
*
* @author zhaojun
*/
-public final class FixedXAShardingTransactionHandler implements ShardingTransactionHandler {
+public final class FixedXAShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
- private static final Map INVOKES = new HashMap<>();
+ private static final Map INVOKES = new HashMap<>();
/**
* Get invoke map.
*
* @return map
*/
- static Map getInvokes() {
+ static Map getInvokes() {
return INVOKES;
}
@Override
- public void doInTransaction(final ShardingTransactionContext context) {
- switch (context.getOperationType()) {
+ public void doInTransaction(final TransactionOperationType transactionOperationType) {
+ switch (transactionOperationType) {
case BEGIN:
- INVOKES.put("begin", context);
+ INVOKES.put("begin", transactionOperationType);
return;
case COMMIT:
- INVOKES.put("commit", context);
+ INVOKES.put("commit", transactionOperationType);
return;
case ROLLBACK:
- INVOKES.put("rollback", context);
+ INVOKES.put("rollback", transactionOperationType);
return;
default:
}
}
+ @Override
+ public ShardingTransactionManager getShardingTransactionManager() {
+ return null;
+ }
+
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-spi/src/main/java/io/shardingsphere/transaction/spi/xa/XATransactionManager.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-spi/src/main/java/io/shardingsphere/transaction/spi/xa/XATransactionManager.java
index ebeb844ba291adefd2c07cf9d9544e77bb3e3015..88a86854ce87e83009772979ec08bd1e9c8e6553 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-spi/src/main/java/io/shardingsphere/transaction/spi/xa/XATransactionManager.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-spi/src/main/java/io/shardingsphere/transaction/spi/xa/XATransactionManager.java
@@ -19,7 +19,6 @@ package io.shardingsphere.transaction.spi.xa;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
-import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import javax.sql.DataSource;
@@ -32,7 +31,12 @@ import javax.transaction.TransactionManager;
* @author zhangliang
* @author zhaojun
*/
-public interface XATransactionManager extends ShardingTransactionManager {
+public interface XATransactionManager extends ShardingTransactionManager {
+
+ /**
+ * Startup XA transaction manager.
+ */
+ void startup();
/**
* destroy the transaction manager and could be helpful with shutdown gracefully.
@@ -56,4 +60,21 @@ public interface XATransactionManager extends ShardingTransactionManagercommons-dbcp2
test
+
+ com.h2database
+ h2
+ provided
+
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/XADataSourceConverter.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/XADataSourceConverter.java
index 2fbd2193096255a74149a706738d29ab00362477..a084c26c0d332f40635550a2fc5ff316471b40bd 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/XADataSourceConverter.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/XADataSourceConverter.java
@@ -22,7 +22,7 @@ import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.spi.TransactionalDataSourceConverter;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
-import io.shardingsphere.transaction.xa.convert.datasource.XADataSourceFactory;
+import io.shardingsphere.transaction.xa.jta.datasource.XADataSourceFactory;
import io.shardingsphere.transaction.xa.convert.swap.DataSourceSwapperRegistry;
import io.shardingsphere.transaction.xa.manager.XATransactionManagerSPILoader;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/swap/impl/DruidParameterSwapper.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/swap/impl/DruidParameterSwapper.java
index 4bfb0f028397c7d314bcaefe086fc2e5f769f1b0..b21d621ed26b0a6af3de06c7ec1e5544263a27ae 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/swap/impl/DruidParameterSwapper.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/swap/impl/DruidParameterSwapper.java
@@ -30,7 +30,8 @@ import java.util.Collections;
*/
public final class DruidParameterSwapper extends DataSourceSwapperAdapter {
- private static final String DRUID_CLASS_NAME = "com.alibaba.druid.pool.DruidDataSource";
+ private static final String
+ DRUID_CLASS_NAME = "com.alibaba.druid.pool.DruidDataSource";
@Override
protected void convertProperties(final AdvancedMapUpdater updater) {
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/handler/XAShardingTransactionHandler.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/handler/XAShardingTransactionHandler.java
index 0fdb62fc7d0ed7e166d63f5c55e212f17f40df88..55777e4802afcb5261a5bee0bf9a54e01e665ac8 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/handler/XAShardingTransactionHandler.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/handler/XAShardingTransactionHandler.java
@@ -17,26 +17,92 @@
package io.shardingsphere.transaction.xa.handler;
+import com.atomikos.jdbc.AtomikosDataSourceBean;
+import io.shardingsphere.core.constant.DatabaseType;
+import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.transaction.api.TransactionType;
-import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.core.handler.ShardingTransactionHandlerAdapter;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
+import io.shardingsphere.transaction.spi.xa.XATransactionManager;
+import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
+import io.shardingsphere.transaction.xa.jta.datasource.ShardingXADataSource;
import io.shardingsphere.transaction.xa.manager.XATransactionManagerSPILoader;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.sql.DataSource;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
/**
* XA sharding transaction handler.
*
* @author zhaojun
*/
-public final class XAShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
+@Slf4j
+public final class XAShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
+
+ private final Map cachedShardingXADataSourceMap = new HashMap<>();
+
+ private final XATransactionManager xaTransactionManager = XATransactionManagerSPILoader.getInstance().getTransactionManager();
@Override
- protected ShardingTransactionManager getShardingTransactionManager() {
- return XATransactionManagerSPILoader.getInstance().getTransactionManager();
+ public ShardingTransactionManager getShardingTransactionManager() {
+ return xaTransactionManager;
}
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
}
+
+ @Override
+ public void registerTransactionalResource(final DatabaseType databaseType, final Map dataSourceMap) {
+ for (Map.Entry entry : dataSourceMap.entrySet()) {
+ DataSource dataSource = entry.getValue();
+ if (dataSource instanceof AtomikosDataSourceBean) {
+ continue;
+ }
+ ShardingXADataSource shardingXADataSource = new ShardingXADataSource(databaseType, entry.getKey(), entry.getValue());
+ cachedShardingXADataSourceMap.put(entry.getKey(), shardingXADataSource);
+ xaTransactionManager.registerRecoveryResource(entry.getKey(), shardingXADataSource.getXaDataSource());
+ }
+ xaTransactionManager.startup();
+ }
+
+ @Override
+ public void clearTransactionalResource() {
+ if (!cachedShardingXADataSourceMap.isEmpty()) {
+ for (ShardingXADataSource each : cachedShardingXADataSourceMap.values()) {
+ xaTransactionManager.removeRecoveryResource(each.getResourceName(), each.getXaDataSource());
+ }
+ }
+ cachedShardingXADataSourceMap.clear();
+ }
+
+ @Override
+ public Connection createConnection(final String dataSourceName, final DataSource dataSource) {
+ Connection result;
+ ShardingXADataSource shardingXADataSource = cachedShardingXADataSourceMap.get(dataSourceName);
+ try {
+ Transaction transaction = xaTransactionManager.getUnderlyingTransactionManager().getTransaction();
+ if (null != transaction && Status.STATUS_NO_TRANSACTION != transaction.getStatus()) {
+ ShardingXAConnection shardingXAConnection = shardingXADataSource.getXAConnection();
+ transaction.enlistResource(shardingXAConnection.getXAResource());
+ result = shardingXAConnection.getConnection();
+ } else {
+ result = shardingXADataSource.getConnectionFromOriginalDataSource();
+ }
+ } catch (final SQLException | RollbackException | SystemException ex) {
+ log.error("Failed to synchronize transactional resource");
+ throw new ShardingException(ex);
+ }
+ return result;
+ }
}
+
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/ShardingXAResource.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/ShardingXAResource.java
new file mode 100644
index 0000000000000000000000000000000000000000..ea0ee2b162429306c3a2f0f9035c678ccd4140c7
--- /dev/null
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/ShardingXAResource.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2016-2018 shardingsphere.io.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.shardingsphere.transaction.xa.jta;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+/**
+ * Sharding XA resource.
+ *
+ * @author zhaojun
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ShardingXAResource implements XAResource {
+
+ private final String resourceName;
+
+ private final XAResource delegate;
+
+ @Override
+ public void commit(final Xid xid, final boolean b) throws XAException {
+ delegate.commit(xid, b);
+ }
+
+ @Override
+ public void end(final Xid xid, final int i) throws XAException {
+ delegate.end(xid, i);
+ }
+
+ @Override
+ public void forget(final Xid xid) throws XAException {
+ delegate.forget(xid);
+ }
+
+ @Override
+ public int getTransactionTimeout() throws XAException {
+ return delegate.getTransactionTimeout();
+ }
+
+ @Override
+ public boolean isSameRM(final XAResource xaResource) {
+ ShardingXAResource shardingXAResource = (ShardingXAResource) xaResource;
+ return resourceName.equals(shardingXAResource.getResourceName());
+ }
+
+ @Override
+ public int prepare(final Xid xid) throws XAException {
+ return delegate.prepare(xid);
+ }
+
+ @Override
+ public Xid[] recover(final int i) throws XAException {
+ return delegate.recover(i);
+ }
+
+ @Override
+ public void rollback(final Xid xid) throws XAException {
+ delegate.rollback(xid);
+ }
+
+ @Override
+ public boolean setTransactionTimeout(final int i) throws XAException {
+ return delegate.setTransactionTimeout(i);
+ }
+
+ @Override
+ public void start(final Xid xid, final int i) throws XAException {
+ delegate.start(xid, i);
+ }
+}
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/connection/ShardingXAConnection.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/connection/ShardingXAConnection.java
new file mode 100644
index 0000000000000000000000000000000000000000..fd12d965fe9f26496ac65b073222d97969123a28
--- /dev/null
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/connection/ShardingXAConnection.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2016-2018 shardingsphere.io.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.shardingsphere.transaction.xa.jta.connection;
+
+import io.shardingsphere.core.constant.DatabaseType;
+import io.shardingsphere.transaction.xa.jta.connection.dialect.H2ShardingXAConnectionWrapper;
+import io.shardingsphere.transaction.xa.jta.connection.dialect.MySQLShardingXAConnectionWrapper;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import javax.sql.XADataSource;
+import java.sql.Connection;
+
+/**
+ * Sharding XA connection wrapper.
+ *
+ * @author zhaojun
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ShardingXAConnectionFactory {
+
+ /**
+ * Create a sharding XA connection from normal connection.
+ *
+ * @param databaseType database type
+ * @param connection normal connection
+ * @param resourceName resource name
+ * @param xaDataSource XA data source
+ * @return sharding XA connection
+ */
+ public static ShardingXAConnection createShardingXAConnection(final DatabaseType databaseType, final String resourceName, final XADataSource xaDataSource, final Connection connection) {
+ switch (databaseType) {
+ case MySQL:
+ return new MySQLShardingXAConnectionWrapper().wrap(resourceName, xaDataSource, connection);
+ case H2:
+ return new H2ShardingXAConnectionWrapper().wrap(resourceName, xaDataSource, connection);
+ default:
+ throw new UnsupportedOperationException(String.format("Cannot support database type: `%s`", databaseType));
+ }
+ }
+}
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/connection/ShardingXAConnectionWrapper.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/connection/ShardingXAConnectionWrapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..be85f99f3f09316ce3817e6ab1f1b2c15ff7cfce
--- /dev/null
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/connection/ShardingXAConnectionWrapper.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2016-2018 shardingsphere.io.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.shardingsphere.transaction.xa.jta.connection;
+
+import javax.sql.XADataSource;
+import java.sql.Connection;
+
+/**
+ * Sharding XA connection wrapper.
+ *
+ * @author zhaojun
+ */
+public interface ShardingXAConnectionWrapper {
+
+ /**
+ * Wrap a normal connection to sharding XA connection.
+ *
+ * @param resourceName resource name
+ * @param xaDataSource XA data source
+ * @param connection connection
+ * @return sharding XA connection
+ */
+ ShardingXAConnection wrap(String resourceName, XADataSource xaDataSource, Connection connection);
+}
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/connection/dialect/H2ShardingXAConnectionWrapper.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/connection/dialect/H2ShardingXAConnectionWrapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..6b166e0a0014686b6f0d76dac51e5bc511b23398
--- /dev/null
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/connection/dialect/H2ShardingXAConnectionWrapper.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2016-2018 shardingsphere.io.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.shardingsphere.transaction.xa.jta.datasource;
+
+import javax.sql.XAConnection;
+import javax.sql.XADataSource;
+import java.io.PrintWriter;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.logging.Logger;
+
+/**
+ * Abstract unsupported sharding XA data source.
+ *
+ * @author zhaojun
+ */
+public abstract class AbstractUnsupportedShardingXADataSource implements XADataSource {
+
+ @Override
+ public final XAConnection getXAConnection(final String user, final String password) throws SQLException {
+ throw new SQLFeatureNotSupportedException("getXAConnection by user and password");
+ }
+
+ @Override
+ public final PrintWriter getLogWriter() throws SQLException {
+ throw new SQLFeatureNotSupportedException("getLogWriter");
+ }
+
+ @Override
+ public final void setLogWriter(final PrintWriter out) throws SQLException {
+ throw new SQLFeatureNotSupportedException("setLogWriter");
+ }
+
+ @Override
+ public final void setLoginTimeout(final int seconds) throws SQLException {
+ throw new SQLFeatureNotSupportedException("setLoginTimeout");
+ }
+
+ @Override
+ public final int getLoginTimeout() throws SQLException {
+ throw new SQLFeatureNotSupportedException("getLoginTimeout");
+ }
+
+ @Override
+ public final Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ throw new SQLFeatureNotSupportedException("getParentLogger");
+ }
+}
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/ShardingXADataSource.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/ShardingXADataSource.java
new file mode 100644
index 0000000000000000000000000000000000000000..cc31a9fc4dba67dc58243e94cc729887065c3ecf
--- /dev/null
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/ShardingXADataSource.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2016-2018 shardingsphere.io.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.shardingsphere.transaction.xa.jta.datasource;
+
+import io.shardingsphere.core.constant.DatabaseType;
+import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
+import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnectionFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.sql.DataSource;
+import javax.sql.XADataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * Sharding XA data source.
+ *
+ * @author zhaojun
+ */
+@Getter
+@Slf4j
+public final class ShardingXADataSource extends AbstractUnsupportedShardingXADataSource {
+
+ private final DatabaseType databaseType;
+
+ private final String resourceName;
+
+ private final DataSource originalDataSource;
+
+ private final XADataSource xaDataSource;
+
+ private boolean isOriginalXADataSource;
+
+ public ShardingXADataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource) {
+ this.databaseType = databaseType;
+ this.resourceName = resourceName;
+ this.originalDataSource = dataSource;
+ if (dataSource instanceof XADataSource) {
+ this.xaDataSource = (XADataSource) dataSource;
+ this.isOriginalXADataSource = true;
+ } else {
+ this.xaDataSource = XADataSourceFactory.build(databaseType, dataSource);
+ }
+ }
+
+ @Override
+ public ShardingXAConnection getXAConnection() throws SQLException {
+ return isOriginalXADataSource ? new ShardingXAConnection(resourceName, xaDataSource.getXAConnection())
+ : ShardingXAConnectionFactory.createShardingXAConnection(databaseType, resourceName, xaDataSource, originalDataSource.getConnection());
+ }
+
+ /**
+ * Get connection from original data source.
+ *
+ * @return connection
+ * @throws SQLException SQL exception
+ */
+ public Connection getConnectionFromOriginalDataSource() throws SQLException {
+ return originalDataSource.getConnection();
+ }
+}
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/XADataSourceFactory.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/XADataSourceFactory.java
similarity index 65%
rename from sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/XADataSourceFactory.java
rename to sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/XADataSourceFactory.java
index 163aa56b63717d4f1152627c950b0359e4395538..227b96d8a7c5c55596d33be01ac0f85775834b53 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/XADataSourceFactory.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/XADataSourceFactory.java
@@ -15,16 +15,23 @@
*
*/
-package io.shardingsphere.transaction.xa.convert.datasource;
+package io.shardingsphere.transaction.xa.jta.datasource;
+import com.atomikos.beans.PropertyException;
+import com.atomikos.beans.PropertyUtils;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
+import io.shardingsphere.core.rule.DataSourceParameter;
+import io.shardingsphere.transaction.xa.convert.swap.DataSourceSwapperRegistry;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import javax.sql.DataSource;
import javax.sql.XADataSource;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
/**
* XA data source factory.
@@ -32,6 +39,7 @@ import java.util.Map;
* @author zhaojun
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
public final class XADataSourceFactory {
private static final Map XA_DRIVER_CLASS_NAMES = new HashMap<>(DatabaseType.values().length, 1);
@@ -51,6 +59,30 @@ public final class XADataSourceFactory {
* @return XA DataSource instance
*/
public static XADataSource build(final DatabaseType databaseType) {
+ return newXADataSourceInstance(databaseType);
+ }
+
+ /**
+ * Create XA data source through general data source.
+ *
+ * @param databaseType database type
+ * @param dataSource data source
+ * @return XA data source
+ */
+ public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) {
+ try {
+ DataSourceParameter dataSourceParameter = DataSourceSwapperRegistry.getSwapper(dataSource.getClass()).swap(dataSource);
+ XADataSource xaDataSource = newXADataSourceInstance(databaseType);
+ Properties xaProperties = XAPropertiesFactory.createXAProperties(databaseType).build(dataSourceParameter);
+ PropertyUtils.setProperties(xaDataSource, xaProperties);
+ return xaDataSource;
+ } catch (final PropertyException ex) {
+ log.error("Failed to create ShardingXADataSource");
+ throw new ShardingException(ex);
+ }
+ }
+
+ private static XADataSource newXADataSourceInstance(final DatabaseType databaseType) {
String xaDataSourceClassName = XA_DRIVER_CLASS_NAMES.get(databaseType);
Class xaDataSourceClass;
try {
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/XAProperties.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/XAProperties.java
similarity index 94%
rename from sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/XAProperties.java
rename to sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/XAProperties.java
index 599b021cc3bd19fa1e3a5ba3b9b36e08d4c0cd39..923db0c7060d98373b7bea934126718995893870 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/XAProperties.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/XAProperties.java
@@ -15,7 +15,7 @@
*
*/
-package io.shardingsphere.transaction.xa.convert.datasource;
+package io.shardingsphere.transaction.xa.jta.datasource;
import io.shardingsphere.core.rule.DataSourceParameter;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/XAPropertiesFactory.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/XAPropertiesFactory.java
similarity index 76%
rename from sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/XAPropertiesFactory.java
rename to sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/XAPropertiesFactory.java
index 5316734ca4fc6c357b8a8008f9acbada7777f1c9..65d2ed4a133397c25867400f63eb903c440a9907 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/XAPropertiesFactory.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/XAPropertiesFactory.java
@@ -15,14 +15,14 @@
*
*/
-package io.shardingsphere.transaction.xa.convert.datasource;
+package io.shardingsphere.transaction.xa.jta.datasource;
import io.shardingsphere.core.constant.DatabaseType;
-import io.shardingsphere.transaction.xa.convert.datasource.dialect.H2XAProperties;
-import io.shardingsphere.transaction.xa.convert.datasource.dialect.MySQLXAProperties;
-import io.shardingsphere.transaction.xa.convert.datasource.dialect.OracleXAProperties;
-import io.shardingsphere.transaction.xa.convert.datasource.dialect.PostgreSQLXAProperties;
-import io.shardingsphere.transaction.xa.convert.datasource.dialect.SQLServerXAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.dialect.H2XAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.dialect.MySQLXAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.dialect.OracleXAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.dialect.PostgreSQLXAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.dialect.SQLServerXAProperties;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/H2XAProperties.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/H2XAProperties.java
similarity index 89%
rename from sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/H2XAProperties.java
rename to sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/H2XAProperties.java
index a0be0ae90ce7fead661c7c41d65c0edc040923e2..6f542dfbb55e0f7e76333c57fc7130fd724159d9 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/H2XAProperties.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/H2XAProperties.java
@@ -15,11 +15,11 @@
*
*/
-package io.shardingsphere.transaction.xa.convert.datasource.dialect;
+package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.rule.DataSourceParameter;
-import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/MySQLXAProperties.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/MySQLXAProperties.java
similarity index 94%
rename from sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/MySQLXAProperties.java
rename to sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/MySQLXAProperties.java
index d611a2671b80e4622c9dc79ff57c0bb062282163..770a656f6ddbb631a3ccd5bcadebfb6f8ce9a919 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/MySQLXAProperties.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/MySQLXAProperties.java
@@ -15,11 +15,11 @@
*
*/
-package io.shardingsphere.transaction.xa.convert.datasource.dialect;
+package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.rule.DataSourceParameter;
-import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/OracleXAProperties.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/OracleXAProperties.java
similarity index 92%
rename from sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/OracleXAProperties.java
rename to sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/OracleXAProperties.java
index 038e6004a7ba0a57763592e3b711257a9ea83bd6..9ad111f9bfdd0cc16a913bfca238a5f2905e6b33 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/OracleXAProperties.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/OracleXAProperties.java
@@ -15,12 +15,12 @@
*
*/
-package io.shardingsphere.transaction.xa.convert.datasource.dialect;
+package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.metadata.datasource.dialect.OracleDataSourceMetaData;
import io.shardingsphere.core.rule.DataSourceParameter;
-import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/PostgreSQLXAProperties.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/PostgreSQLXAProperties.java
similarity index 92%
rename from sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/PostgreSQLXAProperties.java
rename to sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/PostgreSQLXAProperties.java
index ab84293bc12dbd690813a66e9dda2b6e9c82e765..bd691018c5d38deb73f5b9491ca00d065f936e8b 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/PostgreSQLXAProperties.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/PostgreSQLXAProperties.java
@@ -15,12 +15,12 @@
*
*/
-package io.shardingsphere.transaction.xa.convert.datasource.dialect;
+package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.metadata.datasource.dialect.PostgreSQLDataSourceMetaData;
import io.shardingsphere.core.rule.DataSourceParameter;
-import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/SQLServerXAProperties.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/SQLServerXAProperties.java
similarity index 92%
rename from sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/SQLServerXAProperties.java
rename to sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/SQLServerXAProperties.java
index 71fdf8a4bc7d1b1dee7442e8edf1b5a0b84a8541..b78ab1cd290bc5c1207a3e0fb1070797953a529f 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/convert/datasource/dialect/SQLServerXAProperties.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/jta/datasource/dialect/SQLServerXAProperties.java
@@ -15,12 +15,12 @@
*
*/
-package io.shardingsphere.transaction.xa.convert.datasource.dialect;
+package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.metadata.datasource.dialect.SQLServerDataSourceMetaData;
import io.shardingsphere.core.rule.DataSourceParameter;
-import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
+import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosDataSourceBeanWrapper.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosDataSourceBeanWrapper.java
index 76e43f774029b2dc0be194646a3f3fb1bcf3a541..982e7090d6232f7326cffcd90c13db24d97d5914 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosDataSourceBeanWrapper.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosDataSourceBeanWrapper.java
@@ -22,7 +22,7 @@ import com.atomikos.beans.PropertyUtils;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
-import io.shardingsphere.transaction.xa.convert.datasource.XAPropertiesFactory;
+import io.shardingsphere.transaction.xa.jta.datasource.XAPropertiesFactory;
import javax.sql.DataSource;
import javax.sql.XADataSource;
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosTransactionManager.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosTransactionManager.java
index 8a3b2091373c8460882c154564e01f6432429342..571ecb0d64d29a089baa2d5bbe81779a16faa5a1 100644
--- a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosTransactionManager.java
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosTransactionManager.java
@@ -17,11 +17,12 @@
package io.shardingsphere.transaction.xa.manager;
+import com.atomikos.icatch.config.UserTransactionService;
+import com.atomikos.icatch.config.UserTransactionServiceImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.rule.DataSourceParameter;
-import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
import javax.sql.DataSource;
@@ -41,29 +42,22 @@ import javax.transaction.TransactionManager;
*/
public final class AtomikosTransactionManager implements XATransactionManager {
- private final UserTransactionManager underlyingTransactionManager;
+ private final UserTransactionManager underlyingTransactionManager = new UserTransactionManager();
- public AtomikosTransactionManager() {
- underlyingTransactionManager = new UserTransactionManager();
- init();
- }
+ private final UserTransactionService userTransactionService = new UserTransactionServiceImp();
- private void init() {
- try {
- underlyingTransactionManager.init();
- } catch (SystemException ex) {
- throw new ShardingException(ex);
- }
+ @Override
+ public void startup() {
+ userTransactionService.init();
}
@Override
public void destroy() {
- underlyingTransactionManager.setForceShutdown(true);
- underlyingTransactionManager.close();
+ userTransactionService.shutdown(true);
}
@Override
- public void begin(final XATransactionContext transactionContext) throws ShardingException {
+ public void begin() throws ShardingException {
try {
underlyingTransactionManager.begin();
} catch (final SystemException | NotSupportedException ex) {
@@ -72,7 +66,7 @@ public final class AtomikosTransactionManager implements XATransactionManager {
}
@Override
- public void commit(final XATransactionContext transactionContext) throws ShardingException {
+ public void commit() throws ShardingException {
try {
underlyingTransactionManager.commit();
} catch (final RollbackException | HeuristicMixedException | HeuristicRollbackException | SystemException ex) {
@@ -81,7 +75,7 @@ public final class AtomikosTransactionManager implements XATransactionManager {
}
@Override
- public void rollback(final XATransactionContext transactionContext) throws ShardingException {
+ public void rollback() throws ShardingException {
try {
if (Status.STATUS_NO_TRANSACTION != getStatus()) {
underlyingTransactionManager.rollback();
@@ -113,4 +107,15 @@ public final class AtomikosTransactionManager implements XATransactionManager {
public TransactionManager getUnderlyingTransactionManager() {
return underlyingTransactionManager;
}
+
+ @Override
+ public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {
+ userTransactionService.registerResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource));
+ }
+
+ @Override
+ public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {
+ userTransactionService.removeResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource));
+ }
}
+
diff --git a/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosXARecoverableResource.java b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosXARecoverableResource.java
new file mode 100644
index 0000000000000000000000000000000000000000..f2606cbda7e6baecad5dce54a9f2b33d2a884211
--- /dev/null
+++ b/sharding-transaction/sharding-transaction-2pc/sharding-transaction-2pc-xa/src/main/java/io/shardingsphere/transaction/xa/manager/AtomikosXARecoverableResource.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-2018 shardingsphere.io.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *