未验证 提交 08f6c848 编写于 作者: ShardingSphere's avatar ShardingSphere 提交者: GitHub

Merge pull request #1726 from cherrylzhao/dev

#1694 refactor sharding transaction 2PC XA 
......@@ -40,7 +40,7 @@
<commons-codec.version>1.10</commons-codec.version>
<javax.transaction.version>1.1</javax.transaction.version>
<atomikos.version>4.0.4</atomikos.version>
<atomikos.version>4.0.6</atomikos.version>
<curator.version>2.10.0</curator.version>
<grpc.version>1.7.0</grpc.version>
......
......@@ -31,6 +31,7 @@ public enum PoolType {
HIKARI("com.zaxxer.hikari.HikariDataSource"),
DRUID("com.alibaba.druid.pool.DruidDataSource"),
DRUID_XA("com.alibaba.druid.pool.xa.DruidXADataSource"),
DBCP2("org.apache.commons.dbcp2.BasicDataSource"),
DBCP2_TOMCAT("org.apache.tomcat.dbcp.dbcp2.BasicDataSource"),
UNKNOWN("");
......
......@@ -37,10 +37,13 @@ public class ReflectiveUtil {
*/
@SuppressWarnings("unchecked")
public static Method findMethod(final Object target, final String methodName, final Class<?>... parameterTypes) throws NoSuchMethodException {
Method result;
Class clazz = target.getClass();
while (null != clazz) {
try {
return clazz.getDeclaredMethod(methodName, parameterTypes);
result = clazz.getDeclaredMethod(methodName, parameterTypes);
result.setAccessible(true);
return result;
} catch (NoSuchMethodException ignored) {
}
clazz = clazz.getSuperclass();
......
......@@ -31,9 +31,6 @@ import io.shardingsphere.spi.root.SPIRootInvokeHook;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.context.SagaTransactionContext;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import lombok.Getter;
......@@ -79,12 +76,12 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
private final TransactionType transactionType;
private final ShardingTransactionHandler<ShardingTransactionContext> shardingTransactionHandler;
private final ShardingTransactionHandler shardingTransactionHandler;
protected AbstractConnectionAdapter(final TransactionType transactionType) {
rootInvokeHook.start();
this.transactionType = transactionType;
shardingTransactionHandler = ShardingTransactionHandlerRegistry.getInstance().getHandler(transactionType);
shardingTransactionHandler = ShardingTransactionHandlerRegistry.getHandler(transactionType);
if (TransactionType.LOCAL != transactionType) {
Preconditions.checkNotNull(shardingTransactionHandler, "Cannot find transaction manager of [%s]", transactionType);
}
......@@ -123,13 +120,13 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
} else if (!connections.isEmpty()) {
result = new ArrayList<>(connectionSize);
result.addAll(connections);
List<Connection> newConnections = createConnections(connectionMode, dataSource, connectionSize - connections.size());
List<Connection> newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());
result.addAll(newConnections);
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, newConnections);
}
} else {
result = new ArrayList<>(createConnections(connectionMode, dataSource, connectionSize));
result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize));
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, result);
}
......@@ -138,23 +135,23 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private List<Connection> createConnections(final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
if (1 == connectionSize) {
return Collections.singletonList(createConnection(dataSource));
return Collections.singletonList(createConnection(dataSourceName, dataSource));
}
if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
return createConnections(dataSource, connectionSize);
return createConnections(dataSourceName, dataSource, connectionSize);
}
synchronized (dataSource) {
return createConnections(dataSource, connectionSize);
return createConnections(dataSourceName, dataSource, connectionSize);
}
}
private List<Connection> createConnections(final DataSource dataSource, final int connectionSize) throws SQLException {
private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
List<Connection> result = new ArrayList<>(connectionSize);
for (int i = 0; i < connectionSize; i++) {
try {
result.add(createConnection(dataSource));
result.add(createConnection(dataSourceName, dataSource));
} catch (final SQLException ex) {
for (Connection each : result) {
each.close();
......@@ -165,8 +162,13 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
return result;
}
private Connection createConnection(final DataSource dataSource) throws SQLException {
Connection result = dataSource.getConnection();
private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
Connection result;
if (null != shardingTransactionHandler) {
result = shardingTransactionHandler.createConnection(dataSourceName, dataSource);
} else {
result = dataSource.getConnection();
}
replayMethodsInvocation(result);
return result;
}
......@@ -190,14 +192,11 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
connection.setAutoCommit(autoCommit);
}
});
}
if (autoCommit) {
return;
}
if (TransactionType.XA == transactionType) {
shardingTransactionHandler.doInTransaction(new XATransactionContext(TransactionOperationType.BEGIN));
} else if (TransactionType.BASE == transactionType) {
shardingTransactionHandler.doInTransaction(new SagaTransactionContext(TransactionOperationType.BEGIN, this));
} else {
if (autoCommit) {
return;
}
shardingTransactionHandler.doInTransaction(TransactionOperationType.BEGIN);
}
}
......@@ -211,10 +210,8 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
connection.commit();
}
});
} else if (TransactionType.XA == transactionType) {
shardingTransactionHandler.doInTransaction(new XATransactionContext(TransactionOperationType.COMMIT));
} else if (TransactionType.BASE == transactionType) {
shardingTransactionHandler.doInTransaction(new SagaTransactionContext(TransactionOperationType.COMMIT));
} else {
shardingTransactionHandler.doInTransaction(TransactionOperationType.COMMIT);
}
}
......@@ -228,10 +225,8 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
connection.rollback();
}
});
} else if (TransactionType.XA == transactionType) {
shardingTransactionHandler.doInTransaction(new XATransactionContext(TransactionOperationType.ROLLBACK));
} else if (TransactionType.BASE == transactionType) {
shardingTransactionHandler.doInTransaction(new SagaTransactionContext(TransactionOperationType.ROLLBACK));
} else {
shardingTransactionHandler.doInTransaction(TransactionOperationType.ROLLBACK);
}
}
......
......@@ -20,8 +20,9 @@ package io.shardingsphere.shardingjdbc.jdbc.adapter;
import com.google.common.base.Preconditions;
import io.shardingsphere.core.bootstrap.ShardingBootstrap;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.util.ReflectiveUtil;
import io.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationDataSource;
import io.shardingsphere.transaction.core.datasource.ShardingTransactionalDataSource;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import lombok.Getter;
import lombok.Setter;
......@@ -50,13 +51,14 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
private final DatabaseType databaseType;
private final ShardingTransactionalDataSource shardingTransactionalDataSources;
private final Map<String, DataSource> dataSourceMap;
private PrintWriter logWriter = new PrintWriter(System.out);
public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
databaseType = getDatabaseType(dataSourceMap.values());
shardingTransactionalDataSources = new ShardingTransactionalDataSource(databaseType, dataSourceMap);
ShardingTransactionHandlerRegistry.registerTransactionResource(databaseType, dataSourceMap);
this.dataSourceMap = dataSourceMap;
}
protected final DatabaseType getDatabaseType(final Collection<DataSource> dataSources) throws SQLException {
......@@ -78,15 +80,6 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
}
}
/**
* Get data source map.
*
* @return data source map
*/
public final Map<String, DataSource> getDataSourceMap() {
return shardingTransactionalDataSources.getOriginalDataSourceMap();
}
@Override
public final Logger getParentLogger() {
return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
......@@ -99,6 +92,11 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
@Override
public void close() {
shardingTransactionalDataSources.close();
for (DataSource each : dataSourceMap.values()) {
try {
ReflectiveUtil.findMethod(each, "close").invoke(each);
} catch (final ReflectiveOperationException ignored) {
}
}
}
}
......@@ -81,6 +81,6 @@ public class MasterSlaveDataSource extends AbstractDataSourceAdapter {
@Override
public final MasterSlaveConnection getConnection() {
return new MasterSlaveConnection(this, getShardingTransactionalDataSources().getDataSourceMap(), TransactionTypeHolder.get());
return new MasterSlaveConnection(this, getDataSourceMap(), TransactionTypeHolder.get());
}
}
......@@ -67,7 +67,7 @@ public class ShardingDataSource extends AbstractDataSourceAdapter {
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(getShardingTransactionalDataSources().getDataSourceMap(), shardingContext, TransactionTypeHolder.get());
return new ShardingConnection(getDataSourceMap(), shardingContext, TransactionTypeHolder.get());
}
@Override
......
......@@ -25,6 +25,7 @@ import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedXAShardingTransaction
import io.shardingsphere.shardingjdbc.jdbc.util.JDBCTestSQL;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import io.shardingsphere.transaction.core.TransactionOperationType;
import lombok.SneakyThrows;
import org.junit.After;
import org.junit.Test;
......@@ -35,7 +36,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
......@@ -103,7 +103,7 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd
TransactionTypeHolder.set(TransactionType.XA);
try (ShardingConnection actual = getShardingDataSource().getConnection()) {
actual.commit();
assertNotNull(FixedXAShardingTransactionHandler.getInvokes().get("commit"));
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
}
}
......@@ -122,7 +122,7 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd
TransactionTypeHolder.set(TransactionType.XA);
try (ShardingConnection actual = getShardingDataSource().getConnection()) {
actual.rollback();
assertNotNull(FixedXAShardingTransactionHandler.getInvokes().get("rollback"));
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
}
......
......@@ -29,7 +29,7 @@ import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedBaseShardingTransacti
import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedXAShardingTransactionHandler;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.core.TransactionOperationType;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
......@@ -42,7 +42,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
......@@ -111,21 +110,21 @@ public final class ShardingConnectionTest {
public void assertXATransactionOperation() throws SQLException {
connection = new ShardingConnection(dataSourceMap, shardingContext, TransactionType.XA);
connection.setAutoCommit(false);
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("begin"), instanceOf(ShardingTransactionContext.class));
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("begin"), is(TransactionOperationType.BEGIN));
connection.commit();
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), instanceOf(ShardingTransactionContext.class));
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
connection.rollback();
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), instanceOf(ShardingTransactionContext.class));
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
@Test
public void assertBaseTransactionOperation() throws SQLException {
connection = new ShardingConnection(dataSourceMap, shardingContext, TransactionType.BASE);
connection.setAutoCommit(false);
assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("begin"), instanceOf(ShardingTransactionContext.class));
assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("begin"), is(TransactionOperationType.BEGIN));
connection.commit();
assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("commit"), instanceOf(ShardingTransactionContext.class));
assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
connection.rollback();
assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("rollback"), instanceOf(ShardingTransactionContext.class));
assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
}
......@@ -163,7 +163,7 @@ public final class ShardingDataSourceTest {
dataSourceMap.put("ds", dataSource);
TransactionTypeHolder.set(TransactionType.XA);
ShardingDataSource shardingDataSource = createShardingDataSource(dataSourceMap);
assertThat(shardingDataSource.getShardingTransactionalDataSources().getDataSourceMap().size(), is(1));
assertThat(shardingDataSource.getDataSourceMap().size(), is(1));
ShardingConnection shardingConnection = shardingDataSource.getConnection();
assertThat(shardingConnection.getDataSourceMap().size(), is(1));
}
......
......@@ -18,8 +18,9 @@
package io.shardingsphere.shardingjdbc.jdbc.core.fixed;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.handler.ShardingTransactionHandlerAdapter;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import java.util.HashMap;
import java.util.Map;
......@@ -29,35 +30,40 @@ import java.util.Map;
*
* @author zhaojun
*/
public final class FixedBaseShardingTransactionHandler implements ShardingTransactionHandler {
public final class FixedBaseShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
private static final Map<String, Object> INVOKES = new HashMap<>();
private static final Map<String, TransactionOperationType> INVOKES = new HashMap<>();
/**
* Get invoke map.
*
* @return map
*/
public static Map<String, Object> getInvokes() {
public static Map<String, TransactionOperationType> getInvokes() {
return INVOKES;
}
@Override
public void doInTransaction(final ShardingTransactionContext context) {
switch (context.getOperationType()) {
public void doInTransaction(final TransactionOperationType transactionOperationType) {
switch (transactionOperationType) {
case BEGIN:
INVOKES.put("begin", context);
INVOKES.put("begin", transactionOperationType);
return;
case COMMIT:
INVOKES.put("commit", context);
INVOKES.put("commit", transactionOperationType);
return;
case ROLLBACK:
INVOKES.put("rollback", context);
INVOKES.put("rollback", transactionOperationType);
return;
default:
}
}
@Override
public ShardingTransactionManager getShardingTransactionManager() {
return null;
}
@Override
public TransactionType getTransactionType() {
return TransactionType.BASE;
......
......@@ -18,8 +18,9 @@
package io.shardingsphere.shardingjdbc.jdbc.core.fixed;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.handler.ShardingTransactionHandlerAdapter;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import java.util.HashMap;
import java.util.Map;
......@@ -29,35 +30,40 @@ import java.util.Map;
*
* @author zhaojun
*/
public final class FixedXAShardingTransactionHandler implements ShardingTransactionHandler {
public final class FixedXAShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
private static final Map<String, Object> INVOKES = new HashMap<>();
private static final Map<String, TransactionOperationType> INVOKES = new HashMap<>();
/**
* Get invoke map.
*
* @return map
*/
public static Map<String, Object> getInvokes() {
public static Map<String, TransactionOperationType> getInvokes() {
return INVOKES;
}
@Override
public void doInTransaction(final ShardingTransactionContext context) {
switch (context.getOperationType()) {
public void doInTransaction(final TransactionOperationType transactionOperationType) {
switch (transactionOperationType) {
case BEGIN:
INVOKES.put("begin", context);
INVOKES.put("begin", transactionOperationType);
return;
case COMMIT:
INVOKES.put("commit", context);
INVOKES.put("commit", transactionOperationType);
return;
case ROLLBACK:
INVOKES.put("rollback", context);
INVOKES.put("rollback", transactionOperationType);
return;
default:
}
}
@Override
public ShardingTransactionManager getShardingTransactionManager() {
return null;
}
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
......
......@@ -20,8 +20,6 @@ package io.shardingsphere.shardingproxy.backend.jdbc.connection;
import com.google.common.base.Preconditions;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import lombok.RequiredArgsConstructor;
......@@ -41,7 +39,7 @@ public final class BackendTransactionManager implements TransactionManager {
@Override
public void doInTransaction(final TransactionOperationType operationType) throws SQLException {
TransactionType transactionType = connection.getTransactionType();
ShardingTransactionHandler<ShardingTransactionContext> shardingTransactionHandler = ShardingTransactionHandlerRegistry.getInstance().getHandler(transactionType);
ShardingTransactionHandler shardingTransactionHandler = ShardingTransactionHandlerRegistry.getHandler(transactionType);
if (null != transactionType && transactionType != TransactionType.LOCAL) {
Preconditions.checkNotNull(shardingTransactionHandler, String.format("Cannot find transaction manager of [%s]", transactionType));
}
......@@ -52,7 +50,7 @@ public final class BackendTransactionManager implements TransactionManager {
if (TransactionType.LOCAL == transactionType) {
new LocalTransactionManager(connection).doInTransaction(operationType);
} else if (TransactionType.XA == transactionType) {
shardingTransactionHandler.doInTransaction(new XATransactionContext(operationType));
shardingTransactionHandler.doInTransaction(operationType);
if (TransactionOperationType.BEGIN != operationType) {
connection.getStateHandler().getAndSetStatus(ConnectionStatus.TERMINATED);
}
......
......@@ -20,7 +20,7 @@ package io.shardingsphere.shardingproxy.backend.jdbc.datasource;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
import io.shardingsphere.transaction.xa.convert.datasource.XADataSourceFactory;
import io.shardingsphere.transaction.xa.jta.datasource.XADataSourceFactory;
import io.shardingsphere.transaction.xa.manager.XATransactionManagerSPILoader;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
......
......@@ -34,7 +34,7 @@ import io.shardingsphere.shardingproxy.transport.mysql.packet.command.query.Fiel
import io.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.TextResultSetRowPacket;
import io.shardingsphere.shardingproxy.transport.mysql.packet.generic.OKPacket;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.core.TransactionOperationType;
import lombok.SneakyThrows;
import org.hamcrest.CoreMatchers;
import org.junit.After;
......@@ -50,7 +50,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
......@@ -145,7 +144,7 @@ public final class ComQueryPacketTest {
Optional<CommandResponsePackets> actual = packet.execute();
assertTrue(actual.isPresent());
assertOKPacket(actual.get());
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), instanceOf(ShardingTransactionContext.class));
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
@Test
......@@ -157,7 +156,7 @@ public final class ComQueryPacketTest {
Optional<CommandResponsePackets> actual = packet.execute();
assertTrue(actual.isPresent());
assertOKPacket(actual.get());
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), instanceOf(ShardingTransactionContext.class));
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
}
private void assertOKPacket(final CommandResponsePackets actual) {
......
......@@ -18,8 +18,9 @@
package io.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.query;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.handler.ShardingTransactionHandlerAdapter;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import java.util.HashMap;
import java.util.Map;
......@@ -29,35 +30,40 @@ import java.util.Map;
*
* @author zhaojun
*/
public final class FixedXAShardingTransactionHandler implements ShardingTransactionHandler {
public final class FixedXAShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
private static final Map<String, Object> INVOKES = new HashMap<>();
private static final Map<String, TransactionOperationType> INVOKES = new HashMap<>();
/**
* Get invoke map.
*
* @return map
*/
static Map<String, Object> getInvokes() {
static Map<String, TransactionOperationType> getInvokes() {
return INVOKES;
}
@Override
public void doInTransaction(final ShardingTransactionContext context) {
switch (context.getOperationType()) {
public void doInTransaction(final TransactionOperationType transactionOperationType) {
switch (transactionOperationType) {
case BEGIN:
INVOKES.put("begin", context);
INVOKES.put("begin", transactionOperationType);
return;
case COMMIT:
INVOKES.put("commit", context);
INVOKES.put("commit", transactionOperationType);
return;
case ROLLBACK:
INVOKES.put("rollback", context);
INVOKES.put("rollback", transactionOperationType);
return;
default:
}
}
@Override
public ShardingTransactionManager getShardingTransactionManager() {
return null;
}
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
......
......@@ -19,7 +19,6 @@ package io.shardingsphere.transaction.spi.xa;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import javax.sql.DataSource;
......@@ -32,7 +31,12 @@ import javax.transaction.TransactionManager;
* @author zhangliang
* @author zhaojun
*/
public interface XATransactionManager extends ShardingTransactionManager<XATransactionContext> {
public interface XATransactionManager extends ShardingTransactionManager {
/**
* Startup XA transaction manager.
*/
void startup();
/**
* destroy the transaction manager and could be helpful with shutdown gracefully.
......@@ -56,4 +60,21 @@ public interface XATransactionManager extends ShardingTransactionManager<XATrans
* @return transaction manager
*/
TransactionManager getUnderlyingTransactionManager();
/**
* Register recovery resource.
*
* @param dataSourceName data source name
* @param xaDataSource XA data source
*/
void registerRecoveryResource(String dataSourceName, XADataSource xaDataSource);
/**
* Remove recovery resource.
*
* @param dataSourceName data source name
* @param xaDataSource XA data source
*/
void removeRecoveryResource(String dataSourceName, XADataSource xaDataSource);
}
......@@ -46,5 +46,10 @@
<artifactId>commons-dbcp2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
......@@ -22,7 +22,7 @@ import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.spi.TransactionalDataSourceConverter;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
import io.shardingsphere.transaction.xa.convert.datasource.XADataSourceFactory;
import io.shardingsphere.transaction.xa.jta.datasource.XADataSourceFactory;
import io.shardingsphere.transaction.xa.convert.swap.DataSourceSwapperRegistry;
import io.shardingsphere.transaction.xa.manager.XATransactionManagerSPILoader;
......
......@@ -30,7 +30,8 @@ import java.util.Collections;
*/
public final class DruidParameterSwapper extends DataSourceSwapperAdapter {
private static final String DRUID_CLASS_NAME = "com.alibaba.druid.pool.DruidDataSource";
private static final String
DRUID_CLASS_NAME = "com.alibaba.druid.pool.DruidDataSource";
@Override
protected void convertProperties(final AdvancedMapUpdater<String, Object> updater) {
......
......@@ -17,26 +17,92 @@
package io.shardingsphere.transaction.xa.handler;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.core.handler.ShardingTransactionHandlerAdapter;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
import io.shardingsphere.transaction.xa.jta.datasource.ShardingXADataSource;
import io.shardingsphere.transaction.xa.manager.XATransactionManagerSPILoader;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
/**
* XA sharding transaction handler.
*
* @author zhaojun
*/
public final class XAShardingTransactionHandler extends ShardingTransactionHandlerAdapter<XATransactionContext> {
@Slf4j
public final class XAShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
private final Map<String, ShardingXADataSource> cachedShardingXADataSourceMap = new HashMap<>();
private final XATransactionManager xaTransactionManager = XATransactionManagerSPILoader.getInstance().getTransactionManager();
@Override
protected ShardingTransactionManager getShardingTransactionManager() {
return XATransactionManagerSPILoader.getInstance().getTransactionManager();
public ShardingTransactionManager getShardingTransactionManager() {
return xaTransactionManager;
}
@Override
public TransactionType getTransactionType() {
return TransactionType.XA;
}
@Override
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
DataSource dataSource = entry.getValue();
if (dataSource instanceof AtomikosDataSourceBean) {
continue;
}
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(databaseType, entry.getKey(), entry.getValue());
cachedShardingXADataSourceMap.put(entry.getKey(), shardingXADataSource);
xaTransactionManager.registerRecoveryResource(entry.getKey(), shardingXADataSource.getXaDataSource());
}
xaTransactionManager.startup();
}
@Override
public void clearTransactionalResource() {
if (!cachedShardingXADataSourceMap.isEmpty()) {
for (ShardingXADataSource each : cachedShardingXADataSourceMap.values()) {
xaTransactionManager.removeRecoveryResource(each.getResourceName(), each.getXaDataSource());
}
}
cachedShardingXADataSourceMap.clear();
}
@Override
public Connection createConnection(final String dataSourceName, final DataSource dataSource) {
Connection result;
ShardingXADataSource shardingXADataSource = cachedShardingXADataSourceMap.get(dataSourceName);
try {
Transaction transaction = xaTransactionManager.getUnderlyingTransactionManager().getTransaction();
if (null != transaction && Status.STATUS_NO_TRANSACTION != transaction.getStatus()) {
ShardingXAConnection shardingXAConnection = shardingXADataSource.getXAConnection();
transaction.enlistResource(shardingXAConnection.getXAResource());
result = shardingXAConnection.getConnection();
} else {
result = shardingXADataSource.getConnectionFromOriginalDataSource();
}
} catch (final SQLException | RollbackException | SystemException ex) {
log.error("Failed to synchronize transactional resource");
throw new ShardingException(ex);
}
return result;
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
/**
* Sharding XA resource.
*
* @author zhaojun
*/
@RequiredArgsConstructor
@Getter
public final class ShardingXAResource implements XAResource {
private final String resourceName;
private final XAResource delegate;
@Override
public void commit(final Xid xid, final boolean b) throws XAException {
delegate.commit(xid, b);
}
@Override
public void end(final Xid xid, final int i) throws XAException {
delegate.end(xid, i);
}
@Override
public void forget(final Xid xid) throws XAException {
delegate.forget(xid);
}
@Override
public int getTransactionTimeout() throws XAException {
return delegate.getTransactionTimeout();
}
@Override
public boolean isSameRM(final XAResource xaResource) {
ShardingXAResource shardingXAResource = (ShardingXAResource) xaResource;
return resourceName.equals(shardingXAResource.getResourceName());
}
@Override
public int prepare(final Xid xid) throws XAException {
return delegate.prepare(xid);
}
@Override
public Xid[] recover(final int i) throws XAException {
return delegate.recover(i);
}
@Override
public void rollback(final Xid xid) throws XAException {
delegate.rollback(xid);
}
@Override
public boolean setTransactionTimeout(final int i) throws XAException {
return delegate.setTransactionTimeout(i);
}
@Override
public void start(final Xid xid, final int i) throws XAException {
delegate.start(xid, i);
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.connection;
import io.shardingsphere.transaction.xa.jta.ShardingXAResource;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import javax.sql.ConnectionEventListener;
import javax.sql.StatementEventListener;
import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
import java.sql.Connection;
import java.sql.SQLException;
/**
* Sharding XA Connection.
*
* @author zhaojun
*/
@RequiredArgsConstructor
@Getter
public final class ShardingXAConnection implements XAConnection {
private final String resourceName;
private final XAConnection delegate;
@Override
public XAResource getXAResource() throws SQLException {
return new ShardingXAResource(resourceName, delegate.getXAResource());
}
@Override
public Connection getConnection() throws SQLException {
return delegate.getConnection();
}
@Override
public void close() throws SQLException {
delegate.close();
}
@Override
public void addConnectionEventListener(final ConnectionEventListener listener) {
delegate.addConnectionEventListener(listener);
}
@Override
public void removeConnectionEventListener(final ConnectionEventListener listener) {
delegate.removeConnectionEventListener(listener);
}
@Override
public void addStatementEventListener(final StatementEventListener listener) {
delegate.addStatementEventListener(listener);
}
@Override
public void removeStatementEventListener(final StatementEventListener listener) {
delegate.removeStatementEventListener(listener);
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.connection;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.xa.jta.connection.dialect.H2ShardingXAConnectionWrapper;
import io.shardingsphere.transaction.xa.jta.connection.dialect.MySQLShardingXAConnectionWrapper;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import javax.sql.XADataSource;
import java.sql.Connection;
/**
* Sharding XA connection wrapper.
*
* @author zhaojun
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ShardingXAConnectionFactory {
/**
* Create a sharding XA connection from normal connection.
*
* @param databaseType database type
* @param connection normal connection
* @param resourceName resource name
* @param xaDataSource XA data source
* @return sharding XA connection
*/
public static ShardingXAConnection createShardingXAConnection(final DatabaseType databaseType, final String resourceName, final XADataSource xaDataSource, final Connection connection) {
switch (databaseType) {
case MySQL:
return new MySQLShardingXAConnectionWrapper().wrap(resourceName, xaDataSource, connection);
case H2:
return new H2ShardingXAConnectionWrapper().wrap(resourceName, xaDataSource, connection);
default:
throw new UnsupportedOperationException(String.format("Cannot support database type: `%s`", databaseType));
}
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.connection;
import javax.sql.XADataSource;
import java.sql.Connection;
/**
* Sharding XA connection wrapper.
*
* @author zhaojun
*/
public interface ShardingXAConnectionWrapper {
/**
* Wrap a normal connection to sharding XA connection.
*
* @param resourceName resource name
* @param xaDataSource XA data source
* @param connection connection
* @return sharding XA connection
*/
ShardingXAConnection wrap(String resourceName, XADataSource xaDataSource, Connection connection);
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.connection.dialect;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnectionWrapper;
import lombok.extern.slf4j.Slf4j;
import org.h2.jdbc.JdbcConnection;
import org.h2.jdbcx.JdbcDataSourceFactory;
import org.h2.jdbcx.JdbcXAConnection;
import org.h2.message.TraceObject;
import javax.sql.XADataSource;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
/**
* H2 sharding XA connection wrapper.
*
* @author zhaojun
*/
@Slf4j
public final class H2ShardingXAConnectionWrapper implements ShardingXAConnectionWrapper {
private static final int XA_DATA_SOURCE = 13;
private static final Constructor<JdbcXAConnection> CONSTRUCTOR = getH2JdbcXAConstructor();
private static final Method NEXT_ID = getNextIdMethod();
private static final JdbcDataSourceFactory FACTORY = new JdbcDataSourceFactory();
private static Constructor<JdbcXAConnection> getH2JdbcXAConstructor() {
try {
Constructor<JdbcXAConnection> h2XAConnectionConstructor = JdbcXAConnection.class.getDeclaredConstructor(JdbcDataSourceFactory.class, Integer.TYPE, JdbcConnection.class);
h2XAConnectionConstructor.setAccessible(true);
return h2XAConnectionConstructor;
} catch (final NoSuchMethodException ex) {
throw new ShardingException("Could not find constructor of H2 XA connection");
}
}
private static Method getNextIdMethod() {
try {
Method method = TraceObject.class.getDeclaredMethod("getNextId", Integer.TYPE);
method.setAccessible(true);
return method;
} catch (final NoSuchMethodException ex) {
throw new ShardingException("Could not find getNextId of H2 XA DataSource");
}
}
@Override
public ShardingXAConnection wrap(final String resourceName, final XADataSource xaDataSource, final Connection connection) {
try {
Connection h2PhysicalConnection = (Connection) connection.unwrap(Class.forName("org.h2.jdbc.JdbcConnection"));
JdbcXAConnection jdbcXAConnection = CONSTRUCTOR.newInstance(FACTORY, NEXT_ID.invoke(null, XA_DATA_SOURCE), h2PhysicalConnection);
return new ShardingXAConnection(resourceName, jdbcXAConnection);
} catch (final ClassNotFoundException | SQLException | IllegalAccessException | InvocationTargetException | InstantiationException ex) {
log.error("Failed to wrap a connection to ShardingXAConnection");
throw new ShardingException(ex);
}
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.connection.dialect;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.util.ReflectiveUtil;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnectionWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import java.lang.reflect.Method;
import java.sql.Connection;
/**
* MySQL sharding XA connection wrapper.
*
* @author zhaojun
*/
@RequiredArgsConstructor
@Slf4j
public final class MySQLShardingXAConnectionWrapper implements ShardingXAConnectionWrapper {
@Override
public ShardingXAConnection wrap(final String resourceName, final XADataSource xaDataSource, final Connection connection) {
try {
Connection mysqlPhysicalConnection = (Connection) connection.unwrap(Class.forName("com.mysql.jdbc.Connection"));
Method wrapConnectionMethod = ReflectiveUtil.findMethod(xaDataSource, "wrapConnection", Connection.class);
XAConnection xaConnection = (XAConnection) wrapConnectionMethod.invoke(xaDataSource, mysqlPhysicalConnection);
return new ShardingXAConnection(resourceName, xaConnection);
} catch (final Exception ex) {
log.error("Failed to wrap a connection to ShardingXAConnection");
throw new ShardingException(ex);
}
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.datasource;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import java.io.PrintWriter;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.logging.Logger;
/**
* Abstract unsupported sharding XA data source.
*
* @author zhaojun
*/
public abstract class AbstractUnsupportedShardingXADataSource implements XADataSource {
@Override
public final XAConnection getXAConnection(final String user, final String password) throws SQLException {
throw new SQLFeatureNotSupportedException("getXAConnection by user and password");
}
@Override
public final PrintWriter getLogWriter() throws SQLException {
throw new SQLFeatureNotSupportedException("getLogWriter");
}
@Override
public final void setLogWriter(final PrintWriter out) throws SQLException {
throw new SQLFeatureNotSupportedException("setLogWriter");
}
@Override
public final void setLoginTimeout(final int seconds) throws SQLException {
throw new SQLFeatureNotSupportedException("setLoginTimeout");
}
@Override
public final int getLoginTimeout() throws SQLException {
throw new SQLFeatureNotSupportedException("getLoginTimeout");
}
@Override
public final Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException("getParentLogger");
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.datasource;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnectionFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import java.sql.Connection;
import java.sql.SQLException;
/**
* Sharding XA data source.
*
* @author zhaojun
*/
@Getter
@Slf4j
public final class ShardingXADataSource extends AbstractUnsupportedShardingXADataSource {
private final DatabaseType databaseType;
private final String resourceName;
private final DataSource originalDataSource;
private final XADataSource xaDataSource;
private boolean isOriginalXADataSource;
public ShardingXADataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource) {
this.databaseType = databaseType;
this.resourceName = resourceName;
this.originalDataSource = dataSource;
if (dataSource instanceof XADataSource) {
this.xaDataSource = (XADataSource) dataSource;
this.isOriginalXADataSource = true;
} else {
this.xaDataSource = XADataSourceFactory.build(databaseType, dataSource);
}
}
@Override
public ShardingXAConnection getXAConnection() throws SQLException {
return isOriginalXADataSource ? new ShardingXAConnection(resourceName, xaDataSource.getXAConnection())
: ShardingXAConnectionFactory.createShardingXAConnection(databaseType, resourceName, xaDataSource, originalDataSource.getConnection());
}
/**
* Get connection from original data source.
*
* @return connection
* @throws SQLException SQL exception
*/
public Connection getConnectionFromOriginalDataSource() throws SQLException {
return originalDataSource.getConnection();
}
}
......@@ -15,16 +15,23 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource;
package io.shardingsphere.transaction.xa.jta.datasource;
import com.atomikos.beans.PropertyException;
import com.atomikos.beans.PropertyUtils;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.xa.convert.swap.DataSourceSwapperRegistry;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* XA data source factory.
......@@ -32,6 +39,7 @@ import java.util.Map;
* @author zhaojun
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class XADataSourceFactory {
private static final Map<DatabaseType, String> XA_DRIVER_CLASS_NAMES = new HashMap<>(DatabaseType.values().length, 1);
......@@ -51,6 +59,30 @@ public final class XADataSourceFactory {
* @return XA DataSource instance
*/
public static XADataSource build(final DatabaseType databaseType) {
return newXADataSourceInstance(databaseType);
}
/**
* Create XA data source through general data source.
*
* @param databaseType database type
* @param dataSource data source
* @return XA data source
*/
public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) {
try {
DataSourceParameter dataSourceParameter = DataSourceSwapperRegistry.getSwapper(dataSource.getClass()).swap(dataSource);
XADataSource xaDataSource = newXADataSourceInstance(databaseType);
Properties xaProperties = XAPropertiesFactory.createXAProperties(databaseType).build(dataSourceParameter);
PropertyUtils.setProperties(xaDataSource, xaProperties);
return xaDataSource;
} catch (final PropertyException ex) {
log.error("Failed to create ShardingXADataSource");
throw new ShardingException(ex);
}
}
private static XADataSource newXADataSourceInstance(final DatabaseType databaseType) {
String xaDataSourceClassName = XA_DRIVER_CLASS_NAMES.get(databaseType);
Class xaDataSourceClass;
try {
......
......@@ -15,14 +15,14 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource;
package io.shardingsphere.transaction.xa.jta.datasource;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.H2XAProperties;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.MySQLXAProperties;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.OracleXAProperties;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.PostgreSQLXAProperties;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.SQLServerXAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.H2XAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.MySQLXAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.OracleXAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.PostgreSQLXAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.SQLServerXAProperties;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
......
......@@ -15,11 +15,11 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource.dialect;
package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
......
......@@ -15,11 +15,11 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource.dialect;
package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
......
......@@ -15,12 +15,12 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource.dialect;
package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.metadata.datasource.dialect.OracleDataSourceMetaData;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
......
......@@ -15,12 +15,12 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource.dialect;
package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.metadata.datasource.dialect.PostgreSQLDataSourceMetaData;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
......
......@@ -15,12 +15,12 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource.dialect;
package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import com.google.common.base.Optional;
import io.shardingsphere.core.metadata.datasource.dialect.SQLServerDataSourceMetaData;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.xa.convert.datasource.XAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.XAProperties;
import java.util.Properties;
......
......@@ -22,7 +22,7 @@ import com.atomikos.beans.PropertyUtils;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.xa.convert.datasource.XAPropertiesFactory;
import io.shardingsphere.transaction.xa.jta.datasource.XAPropertiesFactory;
import javax.sql.DataSource;
import javax.sql.XADataSource;
......
......@@ -17,11 +17,12 @@
package io.shardingsphere.transaction.xa.manager;
import com.atomikos.icatch.config.UserTransactionService;
import com.atomikos.icatch.config.UserTransactionServiceImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
import javax.sql.DataSource;
......@@ -41,29 +42,22 @@ import javax.transaction.TransactionManager;
*/
public final class AtomikosTransactionManager implements XATransactionManager {
private final UserTransactionManager underlyingTransactionManager;
private final UserTransactionManager underlyingTransactionManager = new UserTransactionManager();
public AtomikosTransactionManager() {
underlyingTransactionManager = new UserTransactionManager();
init();
}
private final UserTransactionService userTransactionService = new UserTransactionServiceImp();
private void init() {
try {
underlyingTransactionManager.init();
} catch (SystemException ex) {
throw new ShardingException(ex);
}
@Override
public void startup() {
userTransactionService.init();
}
@Override
public void destroy() {
underlyingTransactionManager.setForceShutdown(true);
underlyingTransactionManager.close();
userTransactionService.shutdown(true);
}
@Override
public void begin(final XATransactionContext transactionContext) throws ShardingException {
public void begin() throws ShardingException {
try {
underlyingTransactionManager.begin();
} catch (final SystemException | NotSupportedException ex) {
......@@ -72,7 +66,7 @@ public final class AtomikosTransactionManager implements XATransactionManager {
}
@Override
public void commit(final XATransactionContext transactionContext) throws ShardingException {
public void commit() throws ShardingException {
try {
underlyingTransactionManager.commit();
} catch (final RollbackException | HeuristicMixedException | HeuristicRollbackException | SystemException ex) {
......@@ -81,7 +75,7 @@ public final class AtomikosTransactionManager implements XATransactionManager {
}
@Override
public void rollback(final XATransactionContext transactionContext) throws ShardingException {
public void rollback() throws ShardingException {
try {
if (Status.STATUS_NO_TRANSACTION != getStatus()) {
underlyingTransactionManager.rollback();
......@@ -113,4 +107,15 @@ public final class AtomikosTransactionManager implements XATransactionManager {
public TransactionManager getUnderlyingTransactionManager() {
return underlyingTransactionManager;
}
@Override
public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {
userTransactionService.registerResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource));
}
@Override
public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {
userTransactionService.removeResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource));
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.manager;
import com.atomikos.datasource.xa.jdbc.JdbcTransactionalResource;
import io.shardingsphere.transaction.xa.jta.ShardingXAResource;
import javax.sql.XADataSource;
import javax.transaction.xa.XAResource;
/**
* Sharding recovery resource.
*
* @author zhaojun
*/
public final class AtomikosXARecoverableResource extends JdbcTransactionalResource {
private final String resourceName;
public AtomikosXARecoverableResource(final String serverName, final XADataSource xaDataSource) {
super(serverName, xaDataSource);
resourceName = serverName;
}
@Override
public boolean usesXAResource(final XAResource xaResource) {
return resourceName.equals(((ShardingXAResource) xaResource).getResourceName());
}
}
com.atomikos.icatch.serial_jta_transactions = false
com.atomikos.icatch.automatic_resource_registration = false
com.atomikos.icatch.default_jta_timeout = 300000
com.atomikos.icatch.max_actives = 10000
com.atomikos.icatch.checkpoint_interval = 50000
......
......@@ -17,14 +17,9 @@
package io.shardingsphere.transaction.xa.convert;
import io.shardingsphere.transaction.xa.convert.datasource.XADataSourceFactoryTest;
import io.shardingsphere.transaction.xa.convert.datasource.XAPropertiesFactoryTest;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.H2XAPropertiesTest;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.MySQLXAPropertiesTest;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.OracleXAPropertiesTest;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.PostgreSQLXAPropertiesTest;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.SQLServerXAPropertiesTest;
import io.shardingsphere.transaction.xa.convert.swap.DataSourceSwapperRegistryTest;
import io.shardingsphere.transaction.xa.jta.datasource.XADataSourceFactoryTest;
import io.shardingsphere.transaction.xa.jta.datasource.XAPropertiesFactoryTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
......@@ -33,11 +28,6 @@ import org.junit.runners.Suite;
XADataSourceMapConverterTest.class,
XADataSourceFactoryTest.class,
XAPropertiesFactoryTest.class,
H2XAPropertiesTest.class,
MySQLXAPropertiesTest.class,
PostgreSQLXAPropertiesTest.class,
OracleXAPropertiesTest.class,
SQLServerXAPropertiesTest.class,
DataSourceSwapperRegistryTest.class
})
public final class AllConvertTests {
......
......@@ -18,6 +18,7 @@
package io.shardingsphere.transaction.xa.fixture;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.xa.DruidXADataSource;
import com.zaxxer.hikari.HikariDataSource;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.constant.PoolType;
......@@ -50,6 +51,8 @@ public final class DataSourceUtils {
return newHikariDataSource(databaseType, databaseName);
case DRUID:
return newDruidDataSource(databaseType, databaseName);
case DRUID_XA:
return newDruidXADataSource(databaseType, databaseName);
default:
return Mockito.mock(DataSource.class);
}
......@@ -71,17 +74,27 @@ public final class DataSourceUtils {
private static DruidDataSource newDruidDataSource(final DatabaseType databaseType, final String databaseName) {
DruidDataSource result = new DruidDataSource();
result.setUrl(getUrl(databaseType, databaseName));
result.setUsername("root");
result.setPassword("root");
result.setMaxActive(10);
result.setMinIdle(2);
result.setMaxWait(15 * 1000);
result.setMinEvictableIdleTimeMillis(40 * 1000);
result.setTimeBetweenEvictionRunsMillis(20 * 1000);
configDruidDataSource(result, databaseType, databaseName);
return result;
}
private static DruidXADataSource newDruidXADataSource(final DatabaseType databaseType, final String databaseName) {
DruidXADataSource result = new DruidXADataSource();
configDruidDataSource(result, databaseType, databaseName);
return result;
}
private static void configDruidDataSource(final DruidDataSource druidDataSource, final DatabaseType databaseType, final String databaseName) {
druidDataSource.setUrl(getUrl(databaseType, databaseName));
druidDataSource.setUsername("root");
druidDataSource.setPassword("root");
druidDataSource.setMaxActive(10);
druidDataSource.setMinIdle(2);
druidDataSource.setMaxWait(15 * 1000);
druidDataSource.setMinEvictableIdleTimeMillis(40 * 1000);
druidDataSource.setTimeBetweenEvictionRunsMillis(20 * 1000);
}
private static HikariDataSource newHikariDataSource(final DatabaseType databaseType, final String databaseName) {
HikariDataSource result = new HikariDataSource();
result.setJdbcUrl(getUrl(databaseType, databaseName));
......
......@@ -19,7 +19,6 @@ package io.shardingsphere.transaction.xa.fixture;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
import javax.sql.DataSource;
......@@ -28,20 +27,24 @@ import javax.transaction.Status;
import javax.transaction.TransactionManager;
public final class FixtureXATransactionManager implements XATransactionManager {
@Override
public void startup() {
}
@Override
public void destroy() {
}
@Override
public void begin(final XATransactionContext transactionContext) {
public void begin() {
}
@Override
public void commit(final XATransactionContext transactionContext) {
public void commit() {
}
@Override
public void rollback(final XATransactionContext transactionContext) {
public void rollback() {
}
@Override
......@@ -58,4 +61,14 @@ public final class FixtureXATransactionManager implements XATransactionManager {
public TransactionManager getUnderlyingTransactionManager() {
return null;
}
@Override
public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {
}
@Override
public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {
}
}
......@@ -17,33 +17,76 @@
package io.shardingsphere.transaction.xa.handler;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.constant.PoolType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import io.shardingsphere.transaction.xa.manager.AtomikosTransactionManager;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
import io.shardingsphere.transaction.xa.fixture.DataSourceUtils;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
import io.shardingsphere.transaction.xa.jta.datasource.ShardingXADataSource;
import lombok.SneakyThrows;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class XAShardingTransactionHandlerTest {
private XAShardingTransactionHandler xaShardingTransactionHandler = new XAShardingTransactionHandler();
private XATransactionContext beginContext = new XATransactionContext(TransactionOperationType.BEGIN);
@Mock
private XATransactionManager xaTransactionManager;
private XATransactionContext commitContext = new XATransactionContext(TransactionOperationType.COMMIT);
@Mock
private TransactionManager transactionManager;
private XATransactionContext rollbackContext = new XATransactionContext(TransactionOperationType.ROLLBACK);
@Mock
private Transaction transaction;
@Before
@SneakyThrows
public void setUp() {
setMockXATransactionManager(xaShardingTransactionHandler, xaTransactionManager);
when(xaTransactionManager.getUnderlyingTransactionManager()).thenReturn(transactionManager);
when(transactionManager.getTransaction()).thenReturn(transaction);
}
@SneakyThrows
private void setMockXATransactionManager(final XAShardingTransactionHandler xaShardingTransactionHandler, final XATransactionManager xaTransactionManager) {
Field field = xaShardingTransactionHandler.getClass().getDeclaredField("xaTransactionManager");
field.setAccessible(true);
field.set(xaShardingTransactionHandler, xaTransactionManager);
}
@Test
public void assertGetTransactionManager() {
ShardingTransactionManager shardingTransactionManager = xaShardingTransactionHandler.getShardingTransactionManager();
assertThat(shardingTransactionManager, instanceOf(AtomikosTransactionManager.class));
assertThat(shardingTransactionManager, instanceOf(XATransactionManager.class));
}
@Test
......@@ -52,62 +95,103 @@ public class XAShardingTransactionHandlerTest {
}
@Test
public void assertDoXATransactionBegin() throws InterruptedException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
xaShardingTransactionHandler.doInTransaction(beginContext);
int actualStatus = xaShardingTransactionHandler.getShardingTransactionManager().getStatus();
assertThat(actualStatus, is(Status.STATUS_ACTIVE));
}
});
thread.start();
thread.join();
public void assertRegisterXATransactionalDataSource() {
Map<String, DataSource> dataSourceMap = createDataSourceMap(PoolType.DRUID_XA, DatabaseType.MySQL);
xaShardingTransactionHandler.registerTransactionalResource(DatabaseType.MySQL, dataSourceMap);
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
verify(xaTransactionManager).registerRecoveryResource(entry.getKey(), (XADataSource) entry.getValue());
}
}
@Test
public void assertRegisterAtomikosDataSourceBean() {
Map<String, DataSource> dataSourceMap = createAtomikosDataSourceBeanMap();
xaShardingTransactionHandler.registerTransactionalResource(DatabaseType.MySQL, dataSourceMap);
verify(xaTransactionManager, times(0)).registerRecoveryResource(anyString(), any(XADataSource.class));
}
@Test
public void assertRegisterNoneXATransactionalDAtaSource() {
Map<String, DataSource> dataSourceMap = createDataSourceMap(PoolType.HIKARI, DatabaseType.MySQL);
xaShardingTransactionHandler.registerTransactionalResource(DatabaseType.MySQL, dataSourceMap);
Map<String, ShardingXADataSource> cachedXADatasourceMap = getCachedShardingXADataSourceMap();
assertThat(cachedXADatasourceMap.size(), is(2));
}
@Test
public void assertDoXATransactionCommit() throws InterruptedException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
xaShardingTransactionHandler.doInTransaction(beginContext);
xaShardingTransactionHandler.doInTransaction(commitContext);
int actualStatus = xaShardingTransactionHandler.getShardingTransactionManager().getStatus();
assertThat(actualStatus, is(Status.STATUS_NO_TRANSACTION));
}
});
thread.start();
thread.join();
@SneakyThrows
public void assertCreateNoneTransactionalConnection() {
when(transaction.getStatus()).thenReturn(Status.STATUS_NO_TRANSACTION);
DataSource dataSource = mock(DataSource.class);
setCachedShardingXADataSourceMap("ds1");
ShardingXADataSource shardingXADataSource = getCachedShardingXADataSourceMap().get("ds1");
xaShardingTransactionHandler.createConnection("ds1", dataSource);
verify(shardingXADataSource).getConnectionFromOriginalDataSource();
}
@Test
public void assertDoXATransactionRollback() throws InterruptedException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
xaShardingTransactionHandler.doInTransaction(beginContext);
xaShardingTransactionHandler.doInTransaction(rollbackContext);
int actualStatus = xaShardingTransactionHandler.getShardingTransactionManager().getStatus();
assertThat(actualStatus, is(Status.STATUS_NO_TRANSACTION));
}
});
thread.start();
thread.join();
@SneakyThrows
public void assertCreateTransactionalConnection() {
when(transaction.getStatus()).thenReturn(Status.STATUS_ACTIVE);
DataSource dataSource = mock(DataSource.class);
setCachedShardingXADataSourceMap("ds1");
Connection actual = xaShardingTransactionHandler.createConnection("ds1", dataSource);
assertThat(actual, instanceOf(Connection.class));
verify(transaction).enlistResource(any(XAResource.class));
}
@Test
public void assertDoXATransactionCommitRollback() throws InterruptedException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
xaShardingTransactionHandler.doInTransaction(beginContext);
xaShardingTransactionHandler.doInTransaction(commitContext);
xaShardingTransactionHandler.doInTransaction(rollbackContext);
int actualStatus = xaShardingTransactionHandler.getShardingTransactionManager().getStatus();
assertThat(actualStatus, is(Status.STATUS_NO_TRANSACTION));
}
});
thread.start();
thread.join();
public void assertClearTransactionalDataSource() {
setCachedShardingXADataSourceMap("ds1");
xaShardingTransactionHandler.clearTransactionalResource();
Map<String, ShardingXADataSource> cachedShardingXADataSourceMap = getCachedShardingXADataSourceMap();
verify(xaTransactionManager).removeRecoveryResource(anyString(), any(XADataSource.class));
assertThat(cachedShardingXADataSourceMap.size(), is(0));
}
@SneakyThrows
@SuppressWarnings("unchecked")
private Map<String, ShardingXADataSource> getCachedShardingXADataSourceMap() {
Field field = xaShardingTransactionHandler.getClass().getDeclaredField("cachedShardingXADataSourceMap");
field.setAccessible(true);
return (Map<String, ShardingXADataSource>) field.get(xaShardingTransactionHandler);
}
@SneakyThrows
private void setCachedShardingXADataSourceMap(final String datasourceName) {
Field field = xaShardingTransactionHandler.getClass().getDeclaredField("cachedShardingXADataSourceMap");
field.setAccessible(true);
field.set(xaShardingTransactionHandler, createMockShardingXADataSourceMap(datasourceName));
}
@SneakyThrows
private Map<String, ShardingXADataSource> createMockShardingXADataSourceMap(final String datasourceName) {
ShardingXADataSource shardingXADataSource = mock(ShardingXADataSource.class);
ShardingXAConnection shardingXAConnection = mock(ShardingXAConnection.class);
XADataSource xaDataSource = mock(XADataSource.class);
XAResource xaResource = mock(XAResource.class);
Connection connection = mock(Connection.class);
when(shardingXAConnection.getConnection()).thenReturn(connection);
when(shardingXAConnection.getXAResource()).thenReturn(xaResource);
when(shardingXADataSource.getXAConnection()).thenReturn(shardingXAConnection);
when(shardingXADataSource.getResourceName()).thenReturn(datasourceName);
when(shardingXADataSource.getXaDataSource()).thenReturn(xaDataSource);
Map<String, ShardingXADataSource> result = new HashMap<>();
result.put(datasourceName, shardingXADataSource);
return result;
}
private Map<String, DataSource> createDataSourceMap(final PoolType poolType, final DatabaseType databaseType) {
Map<String, DataSource> result = new HashMap<>();
result.put("ds1", DataSourceUtils.build(poolType, databaseType, "demo_ds_1"));
result.put("ds2", DataSourceUtils.build(poolType, databaseType, "demo_ds_2"));
return result;
}
private Map<String, DataSource> createAtomikosDataSourceBeanMap() {
Map<String, DataSource> result = new HashMap<>();
result.put("ds1", new AtomikosDataSourceBean());
result.put("ds2", new AtomikosDataSourceBean());
return result;
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class ShardingXAResourceTest {
@Mock
private XAResource xaResource;
@Mock
private Xid xid;
private ShardingXAResource shardingXAResource;
@Before
public void setUp() {
shardingXAResource = new ShardingXAResource("ds1", xaResource);
}
@Test
public void assertCommit() throws XAException {
shardingXAResource.commit(xid, true);
verify(xaResource).commit(xid, true);
}
@Test
public void assertEnd() throws XAException {
shardingXAResource.end(xid, 1);
verify(xaResource).end(xid, 1);
}
@Test
public void assertForget() throws XAException {
shardingXAResource.forget(xid);
verify(xaResource).forget(xid);
}
@Test
public void assertGetTransactionTimeout() throws XAException {
shardingXAResource.getTransactionTimeout();
verify(xaResource).getTransactionTimeout();
}
@Test
public void assertIsSameRM() {
assertTrue(shardingXAResource.isSameRM(new ShardingXAResource("ds1", xaResource)));
}
@Test
public void assertPrepare() throws XAException {
shardingXAResource.prepare(xid);
verify(xaResource).prepare(xid);
}
@Test
public void assertRecover() throws XAException {
shardingXAResource.recover(1);
verify(xaResource).recover(1);
}
@Test
public void assertRollback() throws XAException {
shardingXAResource.rollback(xid);
verify(xaResource).rollback(xid);
}
@Test
public void assertSetTransactionTimeout() throws XAException {
shardingXAResource.setTransactionTimeout(1);
verify(xaResource).setTransactionTimeout(1);
}
@Test
public void assertStart() throws XAException {
shardingXAResource.start(xid, 1);
verify(xaResource).start(xid, 1);
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.connection;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
import lombok.SneakyThrows;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.XADataSource;
import java.sql.Connection;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class ShardingXAConnectionFactoryTest {
@Mock
private XADataSource xaDataSource;
@Mock
private Connection connection;
@Test(expected = ShardingException.class)
public void assertCreateMysqlMySQLShardingXAConnection() {
ShardingXAConnectionFactory.createShardingXAConnection(DatabaseType.MySQL, "ds1", xaDataSource, connection);
}
@Test
@SneakyThrows
public void assertCreateH2ShardingXAConnection() {
ShardingXAConnection actual = ShardingXAConnectionFactory.createShardingXAConnection(DatabaseType.H2, "ds1", xaDataSource, connection);
assertThat(actual.getResourceName(), is("ds1"));
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.connection;
import io.shardingsphere.transaction.xa.jta.ShardingXAResource;
import lombok.SneakyThrows;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.ConnectionEventListener;
import javax.sql.StatementEventListener;
import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class ShardingXAConnectionTest {
@Mock
private XAConnection xaConnection;
private ShardingXAConnection shardingXAConnection;
@Before
public void setUp() {
shardingXAConnection = new ShardingXAConnection("ds1", xaConnection);
}
@Test
@SneakyThrows
public void assertGetConnection() {
shardingXAConnection.getConnection();
verify(xaConnection).getConnection();
}
@Test
@SneakyThrows
public void assertGetXAResource() {
XAResource actual = shardingXAConnection.getXAResource();
assertThat(actual, instanceOf(ShardingXAResource.class));
}
@Test
@SneakyThrows
public void close() {
shardingXAConnection.close();
verify(xaConnection).close();
}
@Test
public void assertAddConnectionEventListener() {
shardingXAConnection.addConnectionEventListener(mock(ConnectionEventListener.class));
verify(xaConnection).addConnectionEventListener(any(ConnectionEventListener.class));
}
@Test
public void assertRemoveConnectionEventListener() {
shardingXAConnection.removeConnectionEventListener(mock(ConnectionEventListener.class));
verify(xaConnection).removeConnectionEventListener(any(ConnectionEventListener.class));
}
@Test
public void assertAddStatementEventListener() {
shardingXAConnection.addStatementEventListener(mock(StatementEventListener.class));
verify(xaConnection).addStatementEventListener(any(StatementEventListener.class));
}
@Test
public void removeStatementEventListener() {
shardingXAConnection.removeStatementEventListener(mock(StatementEventListener.class));
verify(xaConnection).removeStatementEventListener(any(StatementEventListener.class));
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.connection.dialect;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.constant.PoolType;
import io.shardingsphere.transaction.xa.fixture.DataSourceUtils;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
import io.shardingsphere.transaction.xa.jta.datasource.XADataSourceFactory;
import lombok.SneakyThrows;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import javax.transaction.xa.XAResource;
import java.sql.Connection;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class MySQLShardingXAConnectionWrapperTest {
private XADataSource xaDataSource;
@Mock
private Connection connection;
@Before
@SneakyThrows
@SuppressWarnings("unchecked")
public void setUp() {
Connection mysqlConnection = (Connection) mock(Class.forName("com.mysql.jdbc.Connection"));
DataSource dataSource = DataSourceUtils.build(PoolType.HIKARI, DatabaseType.MySQL, "ds1");
xaDataSource = XADataSourceFactory.build(DatabaseType.MySQL, dataSource);
when(connection.unwrap((Class<Object>) anyObject())).thenReturn(mysqlConnection);
}
@Test
@SneakyThrows
public void assertCreateMySQLConnection() {
ShardingXAConnection actual = new MySQLShardingXAConnectionWrapper().wrap("ds1", xaDataSource, connection);
assertThat(actual.getXAResource(), instanceOf(XAResource.class));
assertThat(actual.getConnection(), instanceOf(Connection.class));
assertThat(actual.getResourceName(), is("ds1"));
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.xa.jta.datasource;
import com.alibaba.druid.pool.xa.DruidPooledXAConnection;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.constant.PoolType;
import io.shardingsphere.transaction.xa.fixture.DataSourceUtils;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
import lombok.SneakyThrows;
import org.h2.jdbcx.JdbcDataSource;
import org.h2.jdbcx.JdbcXAConnection;
import org.junit.Test;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class ShardingXADataSourceTest {
@Test
public void assertBuildShardingXADataSourceOfXA() {
DataSource dataSource = DataSourceUtils.build(PoolType.DRUID_XA, DatabaseType.MySQL, "ds1");
ShardingXADataSource actual = new ShardingXADataSource(DatabaseType.MySQL, "ds1", dataSource);
assertThat(actual.getDatabaseType(), is(DatabaseType.MySQL));
assertThat(actual.getResourceName(), is("ds1"));
assertThat(actual.getOriginalDataSource(), is(dataSource));
assertTrue(actual.isOriginalXADataSource());
assertThat(actual.getXaDataSource(), is((XADataSource) dataSource));
}
@Test
public void assertBuildShardingXADataSourceOfNoneXA() {
DataSource dataSource = DataSourceUtils.build(PoolType.HIKARI, DatabaseType.H2, "ds1");
ShardingXADataSource actual = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
assertThat(actual.getDatabaseType(), is(DatabaseType.H2));
assertThat(actual.getResourceName(), is("ds1"));
assertFalse(actual.isOriginalXADataSource());
assertThat(actual.getOriginalDataSource(), is(dataSource));
assertThat(actual.getXaDataSource(), instanceOf(JdbcDataSource.class));
JdbcDataSource jdbcDataSource = (JdbcDataSource) actual.getXaDataSource();
assertThat(jdbcDataSource.getUser(), is("root"));
assertThat(jdbcDataSource.getPassword(), is("root"));
}
@Test
@SneakyThrows
public void assertGetXAConnectionOfXA() {
DataSource dataSource = DataSourceUtils.build(PoolType.DRUID_XA, DatabaseType.H2, "ds1");
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
ShardingXAConnection actual = shardingXADataSource.getXAConnection();
assertThat(actual.getConnection(), instanceOf(Connection.class));
assertThat(actual.getResourceName(), is("ds1"));
assertThat(actual.getDelegate(), instanceOf(DruidPooledXAConnection.class));
}
@Test
@SneakyThrows
public void assertGetXAConnectionOfNoneXA() {
DataSource dataSource = DataSourceUtils.build(PoolType.HIKARI, DatabaseType.H2, "ds1");
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
ShardingXAConnection actual = shardingXADataSource.getXAConnection();
assertThat(actual.getConnection(), instanceOf(Connection.class));
assertThat(actual.getResourceName(), is("ds1"));
assertThat(actual.getDelegate(), instanceOf(JdbcXAConnection.class));
}
@Test
@SneakyThrows
public void assertGetConnectionFromOriginalDataSource() {
DataSource dataSource = DataSourceUtils.build(PoolType.HIKARI, DatabaseType.H2, "ds1");
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
Connection actual = shardingXADataSource.getConnectionFromOriginalDataSource();
assertThat(actual, instanceOf(Connection.class));
}
@Test(expected = SQLFeatureNotSupportedException.class)
public void assertGetLoginTimeout() throws SQLException {
DataSource dataSource = DataSourceUtils.build(PoolType.DRUID_XA, DatabaseType.H2, "ds1");
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
shardingXADataSource.getLoginTimeout();
}
@Test(expected = SQLFeatureNotSupportedException.class)
public void assertSetLogWriter() throws SQLException {
DataSource dataSource = DataSourceUtils.build(PoolType.DRUID_XA, DatabaseType.H2, "ds1");
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
shardingXADataSource.setLogWriter(null);
}
@Test(expected = SQLFeatureNotSupportedException.class)
public void assertSetLoginTimeout() throws SQLException {
DataSource dataSource = DataSourceUtils.build(PoolType.DRUID_XA, DatabaseType.H2, "ds1");
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
shardingXADataSource.setLoginTimeout(10);
}
@Test(expected = SQLFeatureNotSupportedException.class)
public void assertGetParentLogger() throws SQLException {
DataSource dataSource = DataSourceUtils.build(PoolType.DRUID_XA, DatabaseType.H2, "ds1");
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
shardingXADataSource.getParentLogger();
}
@Test(expected = SQLFeatureNotSupportedException.class)
public void assertGetLogWriter() throws SQLException {
DataSource dataSource = DataSourceUtils.build(PoolType.DRUID_XA, DatabaseType.H2, "ds1");
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
shardingXADataSource.getLogWriter();
}
@Test(expected = SQLFeatureNotSupportedException.class)
public void assertGetXAConnectionByUserAndPassword() throws SQLException {
DataSource dataSource = DataSourceUtils.build(PoolType.DRUID_XA, DatabaseType.H2, "ds1");
ShardingXADataSource shardingXADataSource = new ShardingXADataSource(DatabaseType.H2, "ds1", dataSource);
shardingXADataSource.getXAConnection("root", "root");
}
}
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource;
package io.shardingsphere.transaction.xa.jta.datasource;
import com.microsoft.sqlserver.jdbc.SQLServerXADataSource;
import io.shardingsphere.core.constant.DatabaseType;
......
......@@ -15,14 +15,14 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource;
package io.shardingsphere.transaction.xa.jta.datasource;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.H2XAProperties;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.MySQLXAProperties;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.OracleXAProperties;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.PostgreSQLXAProperties;
import io.shardingsphere.transaction.xa.convert.datasource.dialect.SQLServerXAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.H2XAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.MySQLXAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.OracleXAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.PostgreSQLXAProperties;
import io.shardingsphere.transaction.xa.jta.datasource.dialect.SQLServerXAProperties;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
......
......@@ -15,7 +15,7 @@
* </p>
*/
package io.shardingsphere.transaction.xa.convert.datasource.dialect;
package io.shardingsphere.transaction.xa.jta.datasource.dialect;
import io.shardingsphere.core.rule.DataSourceParameter;
import lombok.Getter;
......
......@@ -21,7 +21,7 @@ import com.atomikos.beans.PropertyException;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.xa.convert.datasource.XADataSourceFactory;
import io.shardingsphere.transaction.xa.jta.datasource.XADataSourceFactory;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.Before;
......
......@@ -17,18 +17,16 @@
package io.shardingsphere.transaction.xa.manager;
import com.atomikos.icatch.config.UserTransactionService;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.context.XATransactionContext;
import io.shardingsphere.transaction.xa.fixture.ReflectiveUtil;
import lombok.SneakyThrows;
import org.h2.jdbcx.JdbcDataSource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -39,11 +37,11 @@ import javax.sql.DataSource;
import javax.sql.XADataSource;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
......@@ -52,66 +50,70 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class AtomikosTransactionManagerTest {
private AtomikosTransactionManager atomikosTransactionManager = new AtomikosTransactionManager();
@Mock
private UserTransactionManager userTransactionManager;
private AtomikosTransactionManager atomikosTransactionManager = new AtomikosTransactionManager();
@Mock
private UserTransactionService userTransactionService;
private TransactionManager underlyingTransactionManager = atomikosTransactionManager.getUnderlyingTransactionManager();
@Mock
private XADataSource xaDataSource;
@Before
@SneakyThrows
public void setUp() {
ReflectiveUtil.setProperty(atomikosTransactionManager, "underlyingTransactionManager", userTransactionManager);
ReflectiveUtil.setProperty(atomikosTransactionManager, "userTransactionService", userTransactionService);
}
@After
public void tearDown() {
ReflectiveUtil.setProperty(atomikosTransactionManager, "underlyingTransactionManager", underlyingTransactionManager);
atomikosTransactionManager.destroy();
@Test
public void assertStartup() {
atomikosTransactionManager.startup();
verify(userTransactionService).init();
}
@Test(expected = ShardingException.class)
@SneakyThrows
public void assertUnderlyingTransactionManagerInitFailed() {
doThrow(SystemException.class).when(userTransactionManager).init();
ReflectiveUtil.methodInvoke(atomikosTransactionManager, "init");
@Test
public void assertShutdown() {
atomikosTransactionManager.destroy();
verify(userTransactionService).shutdown(true);
}
@Test
public void assertBeginWithoutException() throws Exception {
atomikosTransactionManager.begin(new XATransactionContext(TransactionOperationType.BEGIN));
atomikosTransactionManager.begin();
verify(userTransactionManager).begin();
}
@Test(expected = ShardingException.class)
public void assertBeginWithException() throws Exception {
doThrow(SystemException.class).when(userTransactionManager).begin();
atomikosTransactionManager.begin(new XATransactionContext(TransactionOperationType.BEGIN));
atomikosTransactionManager.begin();
}
@Test
public void assertCommitWithoutException() throws Exception {
atomikosTransactionManager.commit(new XATransactionContext(TransactionOperationType.COMMIT));
atomikosTransactionManager.commit();
verify(userTransactionManager).commit();
}
@Test(expected = ShardingException.class)
public void assertCommitWithException() throws Exception {
doThrow(SystemException.class).when(userTransactionManager).commit();
atomikosTransactionManager.commit(new XATransactionContext(TransactionOperationType.COMMIT));
atomikosTransactionManager.commit();
}
@Test
public void assertRollbackWithoutException() throws Exception {
atomikosTransactionManager.rollback(new XATransactionContext(TransactionOperationType.ROLLBACK));
atomikosTransactionManager.rollback();
verify(userTransactionManager).rollback();
}
@Test(expected = ShardingException.class)
public void assertRollbackWithException() throws Exception {
doThrow(SystemException.class).when(userTransactionManager).rollback();
atomikosTransactionManager.rollback(new XATransactionContext(TransactionOperationType.ROLLBACK));
atomikosTransactionManager.rollback();
}
@Test
......@@ -155,4 +157,16 @@ public final class AtomikosTransactionManagerTest {
XADataSource xaDataSource = new MysqlXADataSource();
atomikosTransactionManager.wrapDataSource(DatabaseType.MySQL, xaDataSource, "ds_name", dataSourceParameter);
}
@Test
public void assertRegisterRecoveryResource() {
atomikosTransactionManager.registerRecoveryResource("ds1", xaDataSource);
verify(userTransactionService).registerResource(any(AtomikosXARecoverableResource.class));
}
@Test
public void assertRemoveRecoveryResource() {
atomikosTransactionManager.removeRecoveryResource("ds1", xaDataSource);
verify(userTransactionService).removeResource(any(AtomikosXARecoverableResource.class));
}
}
......@@ -15,33 +15,37 @@
* </p>
*/
package io.shardingsphere.transaction.core.datasource;
package io.shardingsphere.transaction.xa.manager;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import io.shardingsphere.transaction.xa.jta.ShardingXAResource;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import javax.sql.XADataSource;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
public final class ShardingTransactionalDataSourceTest {
@RunWith(MockitoJUnitRunner.class)
public class AtomikosXARecoverableResourceTest {
private Map<String, DataSource> dataSourceMap = new HashMap<>();
@Mock
private ShardingXAResource shardingXAResource;
@Test
public void assertGetDataSourceMapWithConvertedDataSourceMap() {
ShardingTransactionalDataSource actual = new ShardingTransactionalDataSource(DatabaseType.H2, dataSourceMap);
TransactionTypeHolder.set(TransactionType.XA);
assertTrue(dataSourceMap != actual.getDataSourceMap());
@Mock
private XADataSource xaDataSource;
@Before
public void setUp() {
when(shardingXAResource.getResourceName()).thenReturn("ds1");
}
@Test
public void assertGetDataSourceMapWithoutConvertedDataSourceMap() {
ShardingTransactionalDataSource actual = new ShardingTransactionalDataSource(DatabaseType.H2, dataSourceMap);
assertTrue(dataSourceMap == actual.getDataSourceMap());
public void assertUseXAResource() {
AtomikosXARecoverableResource atomikosXARecoverableResource = new AtomikosXARecoverableResource("ds1", xaDataSource);
assertTrue(atomikosXARecoverableResource.usesXAResource(shardingXAResource));
}
}
......@@ -21,6 +21,8 @@ import io.shardingsphere.transaction.core.TransactionOperationType;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.sql.Connection;
/**
* XA transaction context.
*
......@@ -31,4 +33,14 @@ import lombok.RequiredArgsConstructor;
public final class XATransactionContext implements ShardingTransactionContext {
private final TransactionOperationType operationType;
private final Connection connection;
private final String datasourceName;
public XATransactionContext(final TransactionOperationType operationType) {
this.operationType = operationType;
connection = null;
datasourceName = null;
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.core.datasource;
import com.google.common.base.Optional;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.util.ReflectiveUtil;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import io.shardingsphere.transaction.core.loader.TransactionalDataSourceConverterSPILoader;
import io.shardingsphere.transaction.spi.TransactionalDataSourceConverter;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
/**
* Sharding transactional data sources.
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class ShardingTransactionalDataSource implements AutoCloseable {
@Getter
private final Map<String, DataSource> originalDataSourceMap;
private final Map<TransactionType, Map<String, DataSource>> transactionalDataSourceMap;
public ShardingTransactionalDataSource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
originalDataSourceMap = dataSourceMap;
transactionalDataSourceMap = new HashMap<>(TransactionType.values().length, 1);
for (TransactionType each : TransactionType.values()) {
Optional<TransactionalDataSourceConverter> converter = TransactionalDataSourceConverterSPILoader.findConverter(each);
if (converter.isPresent()) {
transactionalDataSourceMap.put(each, converter.get().convert(databaseType, dataSourceMap));
}
}
}
/**
* Get data source map via transaction type from threadlocal.
*
* @return data source map
*/
public Map<String, DataSource> getDataSourceMap() {
return transactionalDataSourceMap.containsKey(TransactionTypeHolder.get()) ? transactionalDataSourceMap.get(TransactionTypeHolder.get()) : originalDataSourceMap;
}
@Override
public void close() {
close(originalDataSourceMap);
for (Entry<TransactionType, Map<String, DataSource>> entry : transactionalDataSourceMap.entrySet()) {
close(entry.getValue());
}
}
private void close(final Map<String, DataSource> dataSourceMap) {
for (DataSource each : dataSourceMap.values()) {
try {
ReflectiveUtil.findMethod(each, "close").invoke(each);
} catch (final ReflectiveOperationException ignored) {
}
}
}
}
......@@ -17,35 +17,64 @@
package io.shardingsphere.transaction.core.handler;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
/**
* Abstract class for sharding transaction handler.
*
* @author zhaojun
*
* @param <T> type of sharding transaction context
*/
public abstract class ShardingTransactionHandlerAdapter<T extends ShardingTransactionContext> implements ShardingTransactionHandler<T> {
public abstract class ShardingTransactionHandlerAdapter implements ShardingTransactionHandler {
/**
* Default implement for do in transaction.
*/
@Override
@SuppressWarnings("unchecked")
public final void doInTransaction(final T context) {
switch (context.getOperationType()) {
public void doInTransaction(final TransactionOperationType transactionOperationType) {
switch (transactionOperationType) {
case BEGIN:
getShardingTransactionManager().begin(context);
getShardingTransactionManager().begin();
break;
case COMMIT:
getShardingTransactionManager().commit(context);
getShardingTransactionManager().commit();
break;
case ROLLBACK:
getShardingTransactionManager().rollback(context);
getShardingTransactionManager().rollback();
break;
default:
}
}
protected abstract ShardingTransactionManager getShardingTransactionManager();
/**
* Default implement for register transaction resource.
*/
@Override
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
// adapter
}
/**
* Default implement for clear transactional resource.
*/
@Override
public void clearTransactionalResource() {
// adapter
}
/**
* Default implement for create connection.
*/
@Override
public Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return dataSource.getConnection();
}
}
......@@ -17,13 +17,14 @@
package io.shardingsphere.transaction.core.loader;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
......@@ -37,9 +38,7 @@ import java.util.ServiceLoader;
@Slf4j
public final class ShardingTransactionHandlerRegistry {
private static final Map<TransactionType, ShardingTransactionHandler<ShardingTransactionContext>> TRANSACTION_HANDLER_MAP = new HashMap<>();
private static final ShardingTransactionHandlerRegistry INSTANCE = new ShardingTransactionHandlerRegistry();
private static final Map<TransactionType, ShardingTransactionHandler> TRANSACTION_HANDLER_MAP = new HashMap<>();
static {
load();
......@@ -56,26 +55,28 @@ public final class ShardingTransactionHandlerRegistry {
each.getTransactionType(), TRANSACTION_HANDLER_MAP.get(each.getTransactionType()).getClass().getName());
continue;
}
TRANSACTION_HANDLER_MAP.put(each.getTransactionType(), (ShardingTransactionHandler<ShardingTransactionContext>) each);
TRANSACTION_HANDLER_MAP.put(each.getTransactionType(), each);
}
}
/**
* Get instance of sharding transaction handler registry.
*
* @return sharding transaction handler registry
*/
public static ShardingTransactionHandlerRegistry getInstance() {
return INSTANCE;
}
/**
* Get transaction handler by type.
*
* @param transactionType transaction type
* @return sharding transaction handler implement
*/
public ShardingTransactionHandler<ShardingTransactionContext> getHandler(final TransactionType transactionType) {
public static ShardingTransactionHandler getHandler(final TransactionType transactionType) {
return TRANSACTION_HANDLER_MAP.get(transactionType);
}
/**
* Register transaction resource.
* @param databaseType database type
* @param dataSourceMap data source map
*/
public static void registerTransactionResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
for (Map.Entry<TransactionType, ShardingTransactionHandler> entry : TRANSACTION_HANDLER_MAP.entrySet()) {
entry.getValue().registerTransactionalResource(databaseType, dataSourceMap);
}
}
}
......@@ -17,14 +17,12 @@
package io.shardingsphere.transaction.core.manager;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
/**
* BASE transaction manager.
*
* @author yangyi
*/
public interface BASETransactionManager<T extends ShardingTransactionContext> extends ShardingTransactionManager<T> {
public interface BASETransactionManager extends ShardingTransactionManager {
/**
* Get transaction id in current thread.
......
......@@ -18,7 +18,6 @@
package io.shardingsphere.transaction.core.manager;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
/**
* Sharding transaction manager.
......@@ -26,33 +25,29 @@ import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
* @author zhaojun
* @author zhangliang
*
* @param <T> transaction context type
*/
public interface ShardingTransactionManager<T extends ShardingTransactionContext> {
public interface ShardingTransactionManager {
/**
* Begin transaction.
*
* @param transactionContext transaction context
* @throws ShardingException sharding exception
*/
void begin(T transactionContext) throws ShardingException;
void begin() throws ShardingException;
/**
* Commit transaction.
*
* @param transactionContext transaction context
* @throws ShardingException sharding exception
*/
void commit(T transactionContext) throws ShardingException;
void commit() throws ShardingException;
/**
* Rollback transaction.
*
* @param transactionContext transaction context
* @throws ShardingException sharding exception
*/
void rollback(T transactionContext) throws ShardingException;
void rollback() throws ShardingException;
/**
* Obtain the status of the transaction associated with the current thread.
......
......@@ -17,24 +17,37 @@
package io.shardingsphere.transaction.spi;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.context.ShardingTransactionContext;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
/**
* Sharding transaction handler SPI.
*
* @author zhaojun
*
* @param <T> type of sharding transaction context
*/
public interface ShardingTransactionHandler<T extends ShardingTransactionContext> {
public interface ShardingTransactionHandler {
/**
* Do transaction operation using specific transaction manager.
*
* @param context sharding transaction context
* @param transactionOperationType transaction operation type
*/
void doInTransaction(TransactionOperationType transactionOperationType);
/**
* Get sharding transaction manager.
*
* @return sharding transaction manager
*/
void doInTransaction(T context);
ShardingTransactionManager getShardingTransactionManager();
/**
* Get transaction type.
......@@ -42,4 +55,27 @@ public interface ShardingTransactionHandler<T extends ShardingTransactionContext
* @return transaction type
*/
TransactionType getTransactionType();
/**
* Register transaction data source.
*
* @param databaseType database type
* @param dataSourceMap data source map
*/
void registerTransactionalResource(DatabaseType databaseType, Map<String, DataSource> dataSourceMap);
/**
* Clear transactional resource.
*/
void clearTransactionalResource();
/**
* Create transactional connection.
*
* @param dataSourceName data source name
* @param dataSource data source
* @return connection
* @throws SQLException SQL exception
*/
Connection createConnection(String dataSourceName, DataSource dataSource) throws SQLException;
}
......@@ -18,7 +18,6 @@
package io.shardingsphere.transaction;
import io.shardingsphere.transaction.api.TransactionTypeHolderTest;
import io.shardingsphere.transaction.core.datasource.ShardingTransactionalDataSourceTest;
import io.shardingsphere.transaction.core.loader.TransactionalDataSourceConverterSPILoaderTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
......@@ -27,8 +26,7 @@ import org.junit.runners.Suite.SuiteClasses;
@RunWith(Suite.class)
@SuiteClasses({
TransactionTypeHolderTest.class,
TransactionalDataSourceConverterSPILoaderTest.class,
ShardingTransactionalDataSourceTest.class
TransactionalDataSourceConverterSPILoaderTest.class
})
public final class AllTransactionTests {
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.transaction.core.handler;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import java.sql.Connection;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class ShardingTransactionHandlerAdapterTest {
private FixedShardingTransactionHandler fixedShardingTransactionHandler = new FixedShardingTransactionHandler();
private ShardingTransactionManager shardingTransactionManager = fixedShardingTransactionHandler.getShardingTransactionManager();
@Test
public void assertDoXATransactionBegin() {
fixedShardingTransactionHandler.doInTransaction(TransactionOperationType.BEGIN);
verify(shardingTransactionManager).begin();
}
@Test
public void assertDoXATransactionCommit() {
fixedShardingTransactionHandler.doInTransaction(TransactionOperationType.COMMIT);
verify(shardingTransactionManager).commit();
}
@Test
public void assertDoXATransactionRollback() {
fixedShardingTransactionHandler.doInTransaction(TransactionOperationType.ROLLBACK);
verify(shardingTransactionManager).rollback();
}
private static final class FixedShardingTransactionHandler extends ShardingTransactionHandlerAdapter {
private ShardingTransactionManager shardingTransactionManager = mock(ShardingTransactionManager.class);
@Override
public ShardingTransactionManager getShardingTransactionManager() {
return shardingTransactionManager;
}
@Override
public TransactionType getTransactionType() {
return null;
}
@Override
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
}
@Override
public void clearTransactionalResource() {
}
@Override
public Connection createConnection(final String dataSourceName, final DataSource dataSource) {
return null;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册