提交 b388432a 编写于 作者: T terrymanu

ShardingTransactionHandler => ShardingTransactionEngine

上级 f5299f3b
......@@ -30,8 +30,8 @@ import io.shardingsphere.spi.root.RootInvokeHook;
import io.shardingsphere.spi.root.SPIRootInvokeHook;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.core.loader.ShardingTransactionEngineRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionEngine;
import lombok.Getter;
import javax.sql.DataSource;
......@@ -75,14 +75,14 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
private final TransactionType transactionType;
private final ShardingTransactionHandler shardingTransactionHandler;
private final ShardingTransactionEngine shardingTransactionEngine;
protected AbstractConnectionAdapter(final TransactionType transactionType) {
rootInvokeHook.start();
this.transactionType = transactionType;
shardingTransactionHandler = ShardingTransactionHandlerRegistry.getHandler(transactionType);
shardingTransactionEngine = ShardingTransactionEngineRegistry.getShardingTransactionEngine(transactionType);
if (TransactionType.LOCAL != transactionType) {
Preconditions.checkNotNull(shardingTransactionHandler, "Cannot find transaction manager of [%s]", transactionType);
Preconditions.checkNotNull(shardingTransactionEngine, "Cannot find transaction manager of [%s]", transactionType);
}
}
......@@ -162,7 +162,7 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
}
private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
Connection result = null == shardingTransactionHandler ? dataSource.getConnection() : shardingTransactionHandler.createConnection(dataSourceName, dataSource);
Connection result = null == shardingTransactionEngine ? dataSource.getConnection() : shardingTransactionEngine.createConnection(dataSourceName, dataSource);
replayMethodsInvocation(result);
return result;
}
......@@ -180,7 +180,7 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
if (TransactionType.LOCAL == transactionType) {
setAutoCommitForLocalTransaction(autoCommit);
} else if (!autoCommit) {
shardingTransactionHandler.begin();
shardingTransactionEngine.begin();
}
}
......@@ -200,7 +200,7 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
if (TransactionType.LOCAL == transactionType) {
commitForLocalTransaction();
} else {
shardingTransactionHandler.commit();
shardingTransactionEngine.commit();
}
}
......@@ -219,7 +219,7 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
if (TransactionType.LOCAL == transactionType) {
rollbackForLocalTransaction();
} else {
shardingTransactionHandler.rollback();
shardingTransactionEngine.rollback();
}
}
......
......@@ -22,7 +22,7 @@ import io.shardingsphere.core.bootstrap.ShardingBootstrap;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.util.ReflectiveUtil;
import io.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationDataSource;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import io.shardingsphere.transaction.core.loader.ShardingTransactionEngineRegistry;
import lombok.Getter;
import lombok.Setter;
......@@ -57,7 +57,7 @@ public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOpera
public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
databaseType = getDatabaseType(dataSourceMap.values());
ShardingTransactionHandlerRegistry.registerTransactionResource(databaseType, dataSourceMap);
ShardingTransactionEngineRegistry.registerTransactionResource(databaseType, dataSourceMap);
this.dataSourceMap = dataSourceMap;
}
......
......@@ -20,8 +20,8 @@ package io.shardingsphere.shardingjdbc.jdbc.adapter;
import com.google.common.collect.Multimap;
import io.shardingsphere.shardingjdbc.common.base.AbstractShardingJDBCDatabaseAndTableTest;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedBaseShardingTransactionHandler;
import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedXAShardingTransactionHandler;
import io.shardingsphere.shardingjdbc.jdbc.core.fixture.FixedBaseShardingTransactionEngine;
import io.shardingsphere.shardingjdbc.jdbc.core.fixture.FixedXAShardingTransactionEngine;
import io.shardingsphere.shardingjdbc.jdbc.util.JDBCTestSQL;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
......@@ -47,8 +47,8 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd
@After
public void tearDown() {
TransactionTypeHolder.clear();
FixedXAShardingTransactionHandler.getInvokes().clear();
FixedBaseShardingTransactionHandler.getInvokes().clear();
FixedXAShardingTransactionEngine.getInvokes().clear();
FixedBaseShardingTransactionEngine.getInvokes().clear();
}
@Test
......@@ -75,7 +75,7 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd
TransactionTypeHolder.set(TransactionType.XA);
try (ShardingConnection actual = getShardingDataSource().getConnection()) {
actual.setAutoCommit(true);
assertNull(FixedXAShardingTransactionHandler.getInvokes().get("begin"));
assertNull(FixedXAShardingTransactionEngine.getInvokes().get("begin"));
}
}
......@@ -84,7 +84,7 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd
TransactionTypeHolder.set(TransactionType.BASE);
try (ShardingConnection actual = getShardingDataSource().getConnection()) {
actual.setAutoCommit(true);
assertNull(FixedBaseShardingTransactionHandler.getInvokes().get("begin"));
assertNull(FixedBaseShardingTransactionEngine.getInvokes().get("begin"));
}
}
......@@ -103,7 +103,7 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd
TransactionTypeHolder.set(TransactionType.XA);
try (ShardingConnection actual = getShardingDataSource().getConnection()) {
actual.commit();
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
assertThat(FixedXAShardingTransactionEngine.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
}
}
......@@ -122,7 +122,7 @@ public final class ConnectionAdapterTest extends AbstractShardingJDBCDatabaseAnd
TransactionTypeHolder.set(TransactionType.XA);
try (ShardingConnection actual = getShardingDataSource().getConnection()) {
actual.rollback();
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
assertThat(FixedXAShardingTransactionEngine.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
}
......
......@@ -25,8 +25,8 @@ import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.shardingjdbc.fixture.TestDataSource;
import io.shardingsphere.shardingjdbc.jdbc.core.ShardingContext;
import io.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedBaseShardingTransactionHandler;
import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedXAShardingTransactionHandler;
import io.shardingsphere.shardingjdbc.jdbc.core.fixture.FixedBaseShardingTransactionEngine;
import io.shardingsphere.shardingjdbc.jdbc.core.fixture.FixedXAShardingTransactionEngine;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import io.shardingsphere.transaction.core.TransactionOperationType;
......@@ -90,8 +90,8 @@ public final class ShardingConnectionTest {
try {
connection.close();
TransactionTypeHolder.clear();
FixedXAShardingTransactionHandler.getInvokes().clear();
FixedBaseShardingTransactionHandler.getInvokes().clear();
FixedXAShardingTransactionEngine.getInvokes().clear();
FixedBaseShardingTransactionEngine.getInvokes().clear();
} catch (final SQLException ignored) {
}
}
......@@ -110,21 +110,21 @@ public final class ShardingConnectionTest {
public void assertXATransactionOperation() throws SQLException {
connection = new ShardingConnection(dataSourceMap, shardingContext, TransactionType.XA);
connection.setAutoCommit(false);
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("begin"), is(TransactionOperationType.BEGIN));
assertThat(FixedXAShardingTransactionEngine.getInvokes().get("begin"), is(TransactionOperationType.BEGIN));
connection.commit();
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
assertThat(FixedXAShardingTransactionEngine.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
connection.rollback();
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
assertThat(FixedXAShardingTransactionEngine.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
@Test
public void assertBaseTransactionOperation() throws SQLException {
connection = new ShardingConnection(dataSourceMap, shardingContext, TransactionType.BASE);
connection.setAutoCommit(false);
assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("begin"), is(TransactionOperationType.BEGIN));
assertThat(FixedBaseShardingTransactionEngine.getInvokes().get("begin"), is(TransactionOperationType.BEGIN));
connection.commit();
assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
assertThat(FixedBaseShardingTransactionEngine.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
connection.rollback();
assertThat(FixedBaseShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
assertThat(FixedBaseShardingTransactionEngine.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
}
......@@ -26,7 +26,7 @@ import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.shardingjdbc.api.MasterSlaveDataSourceFactory;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedXAShardingTransactionHandler;
import io.shardingsphere.shardingjdbc.jdbc.core.fixture.FixedXAShardingTransactionEngine;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import org.junit.After;
......@@ -178,13 +178,13 @@ public final class ShardingDataSourceTest {
ShardingConnection shardingConnection = shardingDataSource.getConnection();
assertThat(shardingConnection.getDataSourceMap().size(), is(1));
assertThat(shardingConnection.getTransactionType(), is(TransactionType.XA));
assertThat(shardingConnection.getShardingTransactionHandler(), instanceOf(FixedXAShardingTransactionHandler.class));
assertThat(shardingConnection.getShardingTransactionEngine(), instanceOf(FixedXAShardingTransactionEngine.class));
TransactionTypeHolder.set(TransactionType.LOCAL);
shardingConnection = shardingDataSource.getConnection();
assertThat(shardingConnection.getConnection("ds"), is(dataSource.getConnection()));
assertThat(shardingConnection.getDataSourceMap(), is(dataSourceMap));
assertThat(shardingConnection.getTransactionType(), is(TransactionType.LOCAL));
assertThat(shardingConnection.getShardingTransactionHandler() == null, is(true));
assertThat(shardingConnection.getShardingTransactionEngine() == null, is(true));
}
private ShardingDataSource createShardingDataSource(final Map<String, DataSource> dataSourceMap) throws SQLException {
......
......@@ -15,12 +15,12 @@
* </p>
*/
package io.shardingsphere.shardingjdbc.jdbc.core.fixed;
package io.shardingsphere.shardingjdbc.jdbc.core.fixture;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.spi.ShardingTransactionEngine;
import javax.sql.DataSource;
import java.sql.Connection;
......@@ -29,11 +29,11 @@ import java.util.HashMap;
import java.util.Map;
/**
* Fixed base sharding transaction handler.
* Fixed base sharding transaction engine.
*
* @author zhaojun
*/
public final class FixedBaseShardingTransactionHandler implements ShardingTransactionHandler {
public final class FixedBaseShardingTransactionEngine implements ShardingTransactionEngine {
private static final Map<String, TransactionOperationType> INVOKES = new HashMap<>();
......@@ -52,30 +52,30 @@ public final class FixedBaseShardingTransactionHandler implements ShardingTransa
}
@Override
public void begin() {
INVOKES.put("begin", TransactionOperationType.BEGIN);
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
}
@Override
public void commit() {
INVOKES.put("commit", TransactionOperationType.COMMIT);
public void clearTransactionalResources() {
}
@Override
public void rollback() {
INVOKES.put("rollback", TransactionOperationType.ROLLBACK);
public Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return dataSource.getConnection();
}
@Override
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
public void begin() {
INVOKES.put("begin", TransactionOperationType.BEGIN);
}
@Override
public void clearTransactionalResource() {
public void commit() {
INVOKES.put("commit", TransactionOperationType.COMMIT);
}
@Override
public Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return dataSource.getConnection();
public void rollback() {
INVOKES.put("rollback", TransactionOperationType.ROLLBACK);
}
}
......@@ -15,12 +15,12 @@
* </p>
*/
package io.shardingsphere.shardingjdbc.jdbc.core.fixed;
package io.shardingsphere.shardingjdbc.jdbc.core.fixture;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.spi.ShardingTransactionEngine;
import javax.sql.DataSource;
import java.sql.Connection;
......@@ -29,11 +29,11 @@ import java.util.HashMap;
import java.util.Map;
/**
* Fixed base sharding transaction handler.
* Fixed base sharding transaction engine.
*
* @author zhaojun
*/
public final class FixedXAShardingTransactionHandler implements ShardingTransactionHandler {
public final class FixedXAShardingTransactionEngine implements ShardingTransactionEngine {
private static final Map<String, TransactionOperationType> INVOKES = new HashMap<>();
......@@ -52,30 +52,30 @@ public final class FixedXAShardingTransactionHandler implements ShardingTransact
}
@Override
public void begin() {
INVOKES.put("begin", TransactionOperationType.BEGIN);
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
}
@Override
public void commit() {
INVOKES.put("commit", TransactionOperationType.COMMIT);
public void clearTransactionalResources() {
}
@Override
public void rollback() {
INVOKES.put("rollback", TransactionOperationType.ROLLBACK);
public Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return dataSource.getConnection();
}
@Override
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
public void begin() {
INVOKES.put("begin", TransactionOperationType.BEGIN);
}
@Override
public void clearTransactionalResource() {
public void commit() {
INVOKES.put("commit", TransactionOperationType.COMMIT);
}
@Override
public Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return dataSource.getConnection();
public void rollback() {
INVOKES.put("rollback", TransactionOperationType.ROLLBACK);
}
}
io.shardingsphere.shardingjdbc.jdbc.core.fixture.FixedBaseShardingTransactionEngine
io.shardingsphere.shardingjdbc.jdbc.core.fixture.FixedXAShardingTransactionEngine
io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedBaseShardingTransactionHandler
io.shardingsphere.shardingjdbc.jdbc.core.fixed.FixedXAShardingTransactionHandler
......@@ -20,8 +20,8 @@ package io.shardingsphere.shardingproxy.backend.jdbc.connection;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.core.loader.ShardingTransactionEngineRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionEngine;
import lombok.RequiredArgsConstructor;
import java.sql.SQLException;
......@@ -38,43 +38,43 @@ public final class BackendTransactionManager implements TransactionManager {
@Override
public void begin() {
Optional<ShardingTransactionHandler> shardingTransactionHandler = getShardingTransactionHandler(connection);
Optional<ShardingTransactionEngine> shardingTransactionEngine = getShardingTransactionEngine(connection);
if (!connection.getStateHandler().isInTransaction()) {
connection.getStateHandler().getAndSetStatus(ConnectionStatus.TRANSACTION);
connection.releaseConnections(false);
}
if (!shardingTransactionHandler.isPresent()) {
if (!shardingTransactionEngine.isPresent()) {
new LocalTransactionManager(connection).begin();
} else if (TransactionType.XA == shardingTransactionHandler.get().getTransactionType()) {
shardingTransactionHandler.get().begin();
} else if (TransactionType.XA == shardingTransactionEngine.get().getTransactionType()) {
shardingTransactionEngine.get().begin();
}
}
@Override
public void commit() throws SQLException {
Optional<ShardingTransactionHandler> shardingTransactionHandler = getShardingTransactionHandler(connection);
if (!shardingTransactionHandler.isPresent()) {
Optional<ShardingTransactionEngine> shardingTransactionEngine = getShardingTransactionEngine(connection);
if (!shardingTransactionEngine.isPresent()) {
new LocalTransactionManager(connection).commit();
} else if (TransactionType.XA == shardingTransactionHandler.get().getTransactionType()) {
shardingTransactionHandler.get().commit();
} else if (TransactionType.XA == shardingTransactionEngine.get().getTransactionType()) {
shardingTransactionEngine.get().commit();
connection.getStateHandler().getAndSetStatus(ConnectionStatus.TERMINATED);
}
}
@Override
public void rollback() throws SQLException {
Optional<ShardingTransactionHandler> shardingTransactionHandler = getShardingTransactionHandler(connection);
if (!shardingTransactionHandler.isPresent()) {
Optional<ShardingTransactionEngine> shardingTransactionEngine = getShardingTransactionEngine(connection);
if (!shardingTransactionEngine.isPresent()) {
new LocalTransactionManager(connection).rollback();
} else if (TransactionType.XA == shardingTransactionHandler.get().getTransactionType()) {
shardingTransactionHandler.get().rollback();
} else if (TransactionType.XA == shardingTransactionEngine.get().getTransactionType()) {
shardingTransactionEngine.get().rollback();
connection.getStateHandler().getAndSetStatus(ConnectionStatus.TERMINATED);
}
}
private Optional<ShardingTransactionHandler> getShardingTransactionHandler(final BackendConnection connection) {
private Optional<ShardingTransactionEngine> getShardingTransactionEngine(final BackendConnection connection) {
TransactionType transactionType = connection.getTransactionType();
ShardingTransactionHandler result = ShardingTransactionHandlerRegistry.getHandler(transactionType);
ShardingTransactionEngine result = ShardingTransactionEngineRegistry.getShardingTransactionEngine(transactionType);
if (null != transactionType && transactionType != TransactionType.LOCAL) {
Preconditions.checkNotNull(result, String.format("Cannot find transaction manager of [%s]", transactionType));
}
......
......@@ -35,7 +35,7 @@ public class TransactionBackendHandlerTest {
private BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL);
@Test
public void assertTransactionHandlerExecute() {
public void assertExecute() {
TransactionBackendHandler transactionBackendHandler = new TransactionBackendHandler(TransactionOperationType.BEGIN, backendConnection);
CommandResponsePackets actual = transactionBackendHandler.execute();
assertThat(actual.getHeadPacket(), instanceOf(OKPacket.class));
......
......@@ -74,7 +74,7 @@ public final class ComQueryPacketTest {
@After
public void tearDown() {
FixedXAShardingTransactionHandler.getInvokes().clear();
FixedXAShardingTransactionEngine.getInvokes().clear();
}
@SneakyThrows
......@@ -144,7 +144,7 @@ public final class ComQueryPacketTest {
Optional<CommandResponsePackets> actual = packet.execute();
assertTrue(actual.isPresent());
assertOKPacket(actual.get());
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
assertThat(FixedXAShardingTransactionEngine.getInvokes().get("rollback"), is(TransactionOperationType.ROLLBACK));
}
@Test
......@@ -156,7 +156,7 @@ public final class ComQueryPacketTest {
Optional<CommandResponsePackets> actual = packet.execute();
assertTrue(actual.isPresent());
assertOKPacket(actual.get());
assertThat(FixedXAShardingTransactionHandler.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
assertThat(FixedXAShardingTransactionEngine.getInvokes().get("commit"), is(TransactionOperationType.COMMIT));
}
private void assertOKPacket(final CommandResponsePackets actual) {
......
......@@ -20,7 +20,7 @@ package io.shardingsphere.shardingproxy.transport.mysql.packet.command.query.tex
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.spi.ShardingTransactionEngine;
import javax.sql.DataSource;
import java.sql.Connection;
......@@ -29,11 +29,11 @@ import java.util.HashMap;
import java.util.Map;
/**
* Fixed base sharding transaction handler.
* Fixed base sharding transaction engine.
*
* @author zhaojun
*/
public final class FixedXAShardingTransactionHandler implements ShardingTransactionHandler {
public final class FixedXAShardingTransactionEngine implements ShardingTransactionEngine {
private static final Map<String, TransactionOperationType> INVOKES = new HashMap<>();
......@@ -47,30 +47,30 @@ public final class FixedXAShardingTransactionHandler implements ShardingTransact
}
@Override
public void begin() {
INVOKES.put("begin", TransactionOperationType.BEGIN);
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
}
@Override
public void commit() {
INVOKES.put("commit", TransactionOperationType.COMMIT);
public void clearTransactionalResources() {
}
@Override
public void rollback() {
INVOKES.put("rollback", TransactionOperationType.ROLLBACK);
public Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return dataSource.getConnection();
}
@Override
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
public void begin() {
INVOKES.put("begin", TransactionOperationType.BEGIN);
}
@Override
public void clearTransactionalResource() {
public void commit() {
INVOKES.put("commit", TransactionOperationType.COMMIT);
}
@Override
public Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return dataSource.getConnection();
public void rollback() {
INVOKES.put("rollback", TransactionOperationType.ROLLBACK);
}
}
io.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.query.FixedXAShardingTransactionHandler
io.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.query.FixedXAShardingTransactionEngine
......
......@@ -21,7 +21,7 @@ import com.atomikos.jdbc.AtomikosDataSourceBean;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.spi.ShardingTransactionEngine;
import io.shardingsphere.transaction.spi.xa.XATransactionManager;
import io.shardingsphere.transaction.xa.jta.connection.ShardingXAConnection;
import io.shardingsphere.transaction.xa.jta.datasource.ShardingXADataSource;
......@@ -40,12 +40,12 @@ import java.util.Map;
import java.util.Map.Entry;
/**
* XA sharding transaction handler.
* Sharding transaction engine for XA.
*
* @author zhaojun
*/
@Slf4j
public final class XAShardingTransactionHandler implements ShardingTransactionHandler {
public final class XAShardingTransactionEngine implements ShardingTransactionEngine {
private final Map<String, ShardingXADataSource> cachedShardingXADataSourceMap = new HashMap<>();
......@@ -56,21 +56,6 @@ public final class XAShardingTransactionHandler implements ShardingTransactionHa
return TransactionType.XA;
}
@Override
public void begin() {
xaTransactionManager.begin();
}
@Override
public void commit() {
xaTransactionManager.commit();
}
@Override
public void rollback() {
xaTransactionManager.rollback();
}
@Override
public void registerTransactionalResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
......@@ -86,7 +71,7 @@ public final class XAShardingTransactionHandler implements ShardingTransactionHa
}
@Override
public void clearTransactionalResource() {
public void clearTransactionalResources() {
if (!cachedShardingXADataSourceMap.isEmpty()) {
for (ShardingXADataSource each : cachedShardingXADataSourceMap.values()) {
xaTransactionManager.removeRecoveryResource(each.getResourceName(), each.getXaDataSource());
......@@ -114,5 +99,19 @@ public final class XAShardingTransactionHandler implements ShardingTransactionHa
}
return result;
}
}
@Override
public void begin() {
xaTransactionManager.begin();
}
@Override
public void commit() {
xaTransactionManager.commit();
}
@Override
public void rollback() {
xaTransactionManager.rollback();
}
}
......@@ -18,7 +18,7 @@
package io.shardingsphere.transaction.xa;
import io.shardingsphere.transaction.xa.convert.AllConvertTests;
import io.shardingsphere.transaction.xa.handler.XAShardingTransactionHandlerTest;
import io.shardingsphere.transaction.xa.handler.XAShardingTransactionEngineTest;
import io.shardingsphere.transaction.xa.manager.AllManagerTests;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
......@@ -27,7 +27,7 @@ import org.junit.runners.Suite.SuiteClasses;
@RunWith(Suite.class)
@SuiteClasses({
AllManagerTests.class,
XAShardingTransactionHandlerTest.class,
XAShardingTransactionEngineTest.class,
AllConvertTests.class
})
public final class AllTests {
......
......@@ -55,9 +55,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class XAShardingTransactionHandlerTest {
public class XAShardingTransactionEngineTest {
private XAShardingTransactionHandler xaShardingTransactionHandler = new XAShardingTransactionHandler();
private XAShardingTransactionEngine xaShardingTransactionEngine = new XAShardingTransactionEngine();
@Mock
private XATransactionManager xaTransactionManager;
......@@ -71,27 +71,27 @@ public class XAShardingTransactionHandlerTest {
@Before
@SneakyThrows
public void setUp() {
setMockXATransactionManager(xaShardingTransactionHandler, xaTransactionManager);
setMockXATransactionManager(xaShardingTransactionEngine, xaTransactionManager);
when(xaTransactionManager.getUnderlyingTransactionManager()).thenReturn(transactionManager);
when(transactionManager.getTransaction()).thenReturn(transaction);
}
@SneakyThrows
private void setMockXATransactionManager(final XAShardingTransactionHandler xaShardingTransactionHandler, final XATransactionManager xaTransactionManager) {
Field field = xaShardingTransactionHandler.getClass().getDeclaredField("xaTransactionManager");
private void setMockXATransactionManager(final XAShardingTransactionEngine xaShardingTransactionEngine, final XATransactionManager xaTransactionManager) {
Field field = xaShardingTransactionEngine.getClass().getDeclaredField("xaTransactionManager");
field.setAccessible(true);
field.set(xaShardingTransactionHandler, xaTransactionManager);
field.set(xaShardingTransactionEngine, xaTransactionManager);
}
@Test
public void assertGetTransactionType() {
assertThat(xaShardingTransactionHandler.getTransactionType(), is(TransactionType.XA));
assertThat(xaShardingTransactionEngine.getTransactionType(), is(TransactionType.XA));
}
@Test
public void assertRegisterXATransactionalDataSource() {
Map<String, DataSource> dataSourceMap = createDataSourceMap(PoolType.DRUID_XA, DatabaseType.MySQL);
xaShardingTransactionHandler.registerTransactionalResource(DatabaseType.MySQL, dataSourceMap);
xaShardingTransactionEngine.registerTransactionalResource(DatabaseType.MySQL, dataSourceMap);
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
verify(xaTransactionManager).registerRecoveryResource(entry.getKey(), (XADataSource) entry.getValue());
}
......@@ -100,14 +100,14 @@ public class XAShardingTransactionHandlerTest {
@Test
public void assertRegisterAtomikosDataSourceBean() {
Map<String, DataSource> dataSourceMap = createAtomikosDataSourceBeanMap();
xaShardingTransactionHandler.registerTransactionalResource(DatabaseType.MySQL, dataSourceMap);
xaShardingTransactionEngine.registerTransactionalResource(DatabaseType.MySQL, dataSourceMap);
verify(xaTransactionManager, times(0)).registerRecoveryResource(anyString(), any(XADataSource.class));
}
@Test
public void assertRegisterNoneXATransactionalDAtaSource() {
Map<String, DataSource> dataSourceMap = createDataSourceMap(PoolType.HIKARI, DatabaseType.MySQL);
xaShardingTransactionHandler.registerTransactionalResource(DatabaseType.MySQL, dataSourceMap);
xaShardingTransactionEngine.registerTransactionalResource(DatabaseType.MySQL, dataSourceMap);
Map<String, ShardingXADataSource> cachedXADatasourceMap = getCachedShardingXADataSourceMap();
assertThat(cachedXADatasourceMap.size(), is(2));
}
......@@ -119,7 +119,7 @@ public class XAShardingTransactionHandlerTest {
DataSource dataSource = mock(DataSource.class);
setCachedShardingXADataSourceMap("ds1");
ShardingXADataSource shardingXADataSource = getCachedShardingXADataSourceMap().get("ds1");
xaShardingTransactionHandler.createConnection("ds1", dataSource);
xaShardingTransactionEngine.createConnection("ds1", dataSource);
verify(shardingXADataSource).getConnectionFromOriginalDataSource();
}
......@@ -129,7 +129,7 @@ public class XAShardingTransactionHandlerTest {
when(transaction.getStatus()).thenReturn(Status.STATUS_ACTIVE);
DataSource dataSource = mock(DataSource.class);
setCachedShardingXADataSourceMap("ds1");
Connection actual = xaShardingTransactionHandler.createConnection("ds1", dataSource);
Connection actual = xaShardingTransactionEngine.createConnection("ds1", dataSource);
assertThat(actual, instanceOf(Connection.class));
verify(transaction).enlistResource(any(XAResource.class));
}
......@@ -137,7 +137,7 @@ public class XAShardingTransactionHandlerTest {
@Test
public void assertClearTransactionalDataSource() {
setCachedShardingXADataSourceMap("ds1");
xaShardingTransactionHandler.clearTransactionalResource();
xaShardingTransactionEngine.clearTransactionalResources();
Map<String, ShardingXADataSource> cachedShardingXADataSourceMap = getCachedShardingXADataSourceMap();
verify(xaTransactionManager).removeRecoveryResource(anyString(), any(XADataSource.class));
assertThat(cachedShardingXADataSourceMap.size(), is(0));
......@@ -146,16 +146,16 @@ public class XAShardingTransactionHandlerTest {
@SneakyThrows
@SuppressWarnings("unchecked")
private Map<String, ShardingXADataSource> getCachedShardingXADataSourceMap() {
Field field = xaShardingTransactionHandler.getClass().getDeclaredField("cachedShardingXADataSourceMap");
Field field = xaShardingTransactionEngine.getClass().getDeclaredField("cachedShardingXADataSourceMap");
field.setAccessible(true);
return (Map<String, ShardingXADataSource>) field.get(xaShardingTransactionHandler);
return (Map<String, ShardingXADataSource>) field.get(xaShardingTransactionEngine);
}
@SneakyThrows
private void setCachedShardingXADataSourceMap(final String datasourceName) {
Field field = xaShardingTransactionHandler.getClass().getDeclaredField("cachedShardingXADataSourceMap");
Field field = xaShardingTransactionEngine.getClass().getDeclaredField("cachedShardingXADataSourceMap");
field.setAccessible(true);
field.set(xaShardingTransactionHandler, createMockShardingXADataSourceMap(datasourceName));
field.set(xaShardingTransactionEngine, createMockShardingXADataSourceMap(datasourceName));
}
@SneakyThrows
......
......@@ -19,7 +19,7 @@ package io.shardingsphere.transaction.core.loader;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import io.shardingsphere.transaction.spi.ShardingTransactionEngine;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -37,37 +37,37 @@ import java.util.ServiceLoader;
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class ShardingTransactionHandlerRegistry {
public final class ShardingTransactionEngineRegistry {
private static final Map<TransactionType, ShardingTransactionHandler> TRANSACTION_HANDLER_MAP = new HashMap<>();
private static final Map<TransactionType, ShardingTransactionEngine> TRANSACTION_ENGINES = new HashMap<>();
static {
load();
}
/**
* Load sharding transaction handler.
* Load sharding transaction engines.
*/
@SuppressWarnings("unchecked")
private static void load() {
for (ShardingTransactionHandler each : ServiceLoader.load(ShardingTransactionHandler.class)) {
if (TRANSACTION_HANDLER_MAP.containsKey(each.getTransactionType())) {
log.warn("Find more than one {} transaction handler implementation class, use `{}` now",
each.getTransactionType(), TRANSACTION_HANDLER_MAP.get(each.getTransactionType()).getClass().getName());
for (ShardingTransactionEngine each : ServiceLoader.load(ShardingTransactionEngine.class)) {
if (TRANSACTION_ENGINES.containsKey(each.getTransactionType())) {
log.warn("Find more than one {} transaction engine implementation class, use `{}` now",
each.getTransactionType(), TRANSACTION_ENGINES.get(each.getTransactionType()).getClass().getName());
continue;
}
TRANSACTION_HANDLER_MAP.put(each.getTransactionType(), each);
TRANSACTION_ENGINES.put(each.getTransactionType(), each);
}
}
/**
* Get transaction handler by type.
* Get sharding transaction engine.
*
* @param transactionType transaction type
* @return sharding transaction handler implement
* @return sharding transaction engine
*/
public static ShardingTransactionHandler getHandler(final TransactionType transactionType) {
return TRANSACTION_HANDLER_MAP.get(transactionType);
public static ShardingTransactionEngine getShardingTransactionEngine(final TransactionType transactionType) {
return TRANSACTION_ENGINES.get(transactionType);
}
/**
......@@ -77,7 +77,7 @@ public final class ShardingTransactionHandlerRegistry {
* @param dataSourceMap data source map
*/
public static void registerTransactionResource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
for (Entry<TransactionType, ShardingTransactionHandler> entry : TRANSACTION_HANDLER_MAP.entrySet()) {
for (Entry<TransactionType, ShardingTransactionEngine> entry : TRANSACTION_ENGINES.entrySet()) {
entry.getValue().registerTransactionalResource(databaseType, dataSourceMap);
}
}
......
......@@ -26,12 +26,12 @@ import java.sql.SQLException;
import java.util.Map;
/**
* Sharding transaction handler SPI.
* Sharding transaction engine.
*
* @author zhaojun
*
*/
public interface ShardingTransactionHandler {
public interface ShardingTransactionEngine {
/**
* Get transaction type.
......@@ -40,21 +40,6 @@ public interface ShardingTransactionHandler {
*/
TransactionType getTransactionType();
/**
* Begin transaction.
*/
void begin();
/**
* Commit transaction.
*/
void commit();
/**
* Rollback transaction.
*/
void rollback();
/**
* Register transaction data source.
*
......@@ -64,9 +49,9 @@ public interface ShardingTransactionHandler {
void registerTransactionalResource(DatabaseType databaseType, Map<String, DataSource> dataSourceMap);
/**
* Clear transactional resource.
* Clear transactional resources.
*/
void clearTransactionalResource();
void clearTransactionalResources();
/**
* Create transactional connection.
......@@ -77,4 +62,19 @@ public interface ShardingTransactionHandler {
* @throws SQLException SQL exception
*/
Connection createConnection(String dataSourceName, DataSource dataSource) throws SQLException;
/**
* Begin transaction.
*/
void begin();
/**
* Commit transaction.
*/
void commit();
/**
* Rollback transaction.
*/
void rollback();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册