提交 cad9f69a 编写于 作者: T terrymanu

split ShardingTransactionHandler.doInTransaction to begin, commit & rollback

上级 25953f9b
......@@ -30,7 +30,6 @@ import io.shardingsphere.spi.root.RootInvokeHook;
import io.shardingsphere.spi.root.SPIRootInvokeHook;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import lombok.Getter;
......@@ -181,7 +180,7 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
if (TransactionType.LOCAL == transactionType) {
setAutoCommitForLocalTransaction(autoCommit);
} else if (!autoCommit) {
shardingTransactionHandler.doInTransaction(TransactionOperationType.BEGIN);
shardingTransactionHandler.begin();
}
}
......@@ -201,7 +200,7 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
if (TransactionType.LOCAL == transactionType) {
commitForLocalTransaction();
} else {
shardingTransactionHandler.doInTransaction(TransactionOperationType.COMMIT);
shardingTransactionHandler.commit();
}
}
......@@ -220,7 +219,7 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera
if (TransactionType.LOCAL == transactionType) {
rollbackForLocalTransaction();
} else {
shardingTransactionHandler.doInTransaction(TransactionOperationType.ROLLBACK);
shardingTransactionHandler.rollback();
}
}
......
......@@ -44,19 +44,18 @@ public final class FixedBaseShardingTransactionHandler extends ShardingTransacti
}
@Override
public void doInTransaction(final TransactionOperationType transactionOperationType) {
switch (transactionOperationType) {
case BEGIN:
INVOKES.put("begin", transactionOperationType);
return;
case COMMIT:
INVOKES.put("commit", transactionOperationType);
return;
case ROLLBACK:
INVOKES.put("rollback", transactionOperationType);
return;
default:
}
public void begin() {
INVOKES.put("begin", TransactionOperationType.BEGIN);
}
@Override
public void commit() {
INVOKES.put("commit", TransactionOperationType.COMMIT);
}
@Override
public void rollback() {
INVOKES.put("rollback", TransactionOperationType.ROLLBACK);
}
@Override
......
......@@ -44,19 +44,18 @@ public final class FixedXAShardingTransactionHandler extends ShardingTransaction
}
@Override
public void doInTransaction(final TransactionOperationType transactionOperationType) {
switch (transactionOperationType) {
case BEGIN:
INVOKES.put("begin", transactionOperationType);
return;
case COMMIT:
INVOKES.put("commit", transactionOperationType);
return;
case ROLLBACK:
INVOKES.put("rollback", transactionOperationType);
return;
default:
}
public void begin() {
INVOKES.put("begin", TransactionOperationType.BEGIN);
}
@Override
public void commit() {
INVOKES.put("commit", TransactionOperationType.COMMIT);
}
@Override
public void rollback() {
INVOKES.put("rollback", TransactionOperationType.ROLLBACK);
}
@Override
......
......@@ -41,7 +41,19 @@ public final class TransactionBackendHandler extends AbstractBackendHandler {
@Override
protected CommandResponsePackets execute0() throws Exception {
backendTransactionManager.doInTransaction(operationType);
switch (operationType) {
case BEGIN:
backendTransactionManager.begin();
break;
case COMMIT:
backendTransactionManager.commit();
break;
case ROLLBACK:
backendTransactionManager.rollback();
break;
default:
throw new UnsupportedOperationException(operationType.name());
}
return new CommandResponsePackets(new OKPacket(1));
}
}
......@@ -17,9 +17,9 @@
package io.shardingsphere.shardingproxy.backend.jdbc.connection;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.loader.ShardingTransactionHandlerRegistry;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import lombok.RequiredArgsConstructor;
......@@ -37,23 +37,47 @@ public final class BackendTransactionManager implements TransactionManager {
private final BackendConnection connection;
@Override
public void doInTransaction(final TransactionOperationType operationType) throws SQLException {
TransactionType transactionType = connection.getTransactionType();
ShardingTransactionHandler shardingTransactionHandler = ShardingTransactionHandlerRegistry.getHandler(transactionType);
if (null != transactionType && transactionType != TransactionType.LOCAL) {
Preconditions.checkNotNull(shardingTransactionHandler, String.format("Cannot find transaction manager of [%s]", transactionType));
}
if (TransactionOperationType.BEGIN == operationType && !connection.getStateHandler().isInTransaction()) {
public void begin() {
Optional<ShardingTransactionHandler> shardingTransactionHandler = getShardingTransactionHandler(connection);
if (!connection.getStateHandler().isInTransaction()) {
connection.getStateHandler().getAndSetStatus(ConnectionStatus.TRANSACTION);
connection.releaseConnections(false);
}
if (TransactionType.LOCAL == transactionType) {
new LocalTransactionManager(connection).doInTransaction(operationType);
} else if (TransactionType.XA == transactionType) {
shardingTransactionHandler.doInTransaction(operationType);
if (TransactionOperationType.BEGIN != operationType) {
connection.getStateHandler().getAndSetStatus(ConnectionStatus.TERMINATED);
}
if (!shardingTransactionHandler.isPresent()) {
new LocalTransactionManager(connection).begin();
} else if (TransactionType.XA == shardingTransactionHandler.get().getTransactionType()) {
shardingTransactionHandler.get().begin();
}
}
@Override
public void commit() throws SQLException {
Optional<ShardingTransactionHandler> shardingTransactionHandler = getShardingTransactionHandler(connection);
if (!shardingTransactionHandler.isPresent()) {
new LocalTransactionManager(connection).commit();
} else if (TransactionType.XA == shardingTransactionHandler.get().getTransactionType()) {
shardingTransactionHandler.get().commit();
connection.getStateHandler().getAndSetStatus(ConnectionStatus.TERMINATED);
}
}
@Override
public void rollback() throws SQLException {
Optional<ShardingTransactionHandler> shardingTransactionHandler = getShardingTransactionHandler(connection);
if (!shardingTransactionHandler.isPresent()) {
new LocalTransactionManager(connection).rollback();
} else if (TransactionType.XA == shardingTransactionHandler.get().getTransactionType()) {
shardingTransactionHandler.get().rollback();
connection.getStateHandler().getAndSetStatus(ConnectionStatus.TERMINATED);
}
}
private Optional<ShardingTransactionHandler> getShardingTransactionHandler(final BackendConnection connection) {
TransactionType transactionType = connection.getTransactionType();
ShardingTransactionHandler result = ShardingTransactionHandlerRegistry.getHandler(transactionType);
if (null != transactionType && transactionType != TransactionType.LOCAL) {
Preconditions.checkNotNull(result, String.format("Cannot find transaction manager of [%s]", transactionType));
}
return Optional.fromNullable(result);
}
}
......@@ -17,7 +17,6 @@
package io.shardingsphere.shardingproxy.backend.jdbc.connection;
import io.shardingsphere.transaction.core.TransactionOperationType;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
......@@ -37,26 +36,12 @@ public final class LocalTransactionManager implements TransactionManager {
private final BackendConnection connection;
@Override
public void doInTransaction(final TransactionOperationType operationType) throws SQLException {
switch (operationType) {
case BEGIN:
setAutoCommit();
break;
case COMMIT:
commit();
break;
case ROLLBACK:
rollback();
break;
default:
}
}
private void setAutoCommit() {
public void begin() {
recordMethodInvocation(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{false});
}
private void commit() throws SQLException {
@Override
public void commit() throws SQLException {
if (connection.getStateHandler().isInTransaction()) {
Collection<SQLException> exceptions = new LinkedList<>();
exceptions.addAll(commitConnections());
......@@ -65,7 +50,8 @@ public final class LocalTransactionManager implements TransactionManager {
}
}
private void rollback() throws SQLException {
@Override
public void rollback() throws SQLException {
if (connection.getStateHandler().isInTransaction()) {
Collection<SQLException> exceptions = new LinkedList<>();
exceptions.addAll(rollbackConnections());
......
......@@ -17,8 +17,6 @@
package io.shardingsphere.shardingproxy.backend.jdbc.connection;
import io.shardingsphere.transaction.core.TransactionOperationType;
import java.sql.SQLException;
/**
......@@ -29,10 +27,23 @@ import java.sql.SQLException;
public interface TransactionManager {
/**
* Handle proxy transaction.
* Begin transaction.
*
* @throws SQLException SQL Exception
*/
void begin() throws SQLException;
/**
* Commit transaction.
*
* @throws SQLException SQL Exception
*/
void commit() throws SQLException;
/**
* Rollback transaction.
*
* @param operationType transaction operation type
* @throws SQLException SQL Exception
*/
void doInTransaction(TransactionOperationType operationType) throws SQLException;
void rollback() throws SQLException;
}
......@@ -22,7 +22,6 @@ import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.shardingproxy.backend.MockGlobalRegistryUtil;
import io.shardingsphere.shardingproxy.backend.jdbc.datasource.JDBCBackendDataSource;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import lombok.SneakyThrows;
import org.junit.Before;
import org.junit.Test;
......@@ -229,16 +228,16 @@ public final class BackendConnectionTest {
}
@Test(expected = ShardingException.class)
public void assertFailedSwitchTransactionTypeWhileBegin() throws SQLException {
public void assertFailedSwitchTransactionTypeWhileBegin() {
BackendTransactionManager transactionManager = new BackendTransactionManager(backendConnection);
transactionManager.doInTransaction(TransactionOperationType.BEGIN);
transactionManager.begin();
backendConnection.setTransactionType(TransactionType.XA);
}
@Test(expected = ShardingException.class)
public void assertFailedSwitchLogicSchemaWhileBegin() throws SQLException {
public void assertFailedSwitchLogicSchemaWhileBegin() {
BackendTransactionManager transactionManager = new BackendTransactionManager(backendConnection);
transactionManager.doInTransaction(TransactionOperationType.BEGIN);
transactionManager.begin();
backendConnection.setCurrentSchema("newSchema");
}
}
......@@ -18,7 +18,6 @@
package io.shardingsphere.shardingproxy.backend.jdbc.connection;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import org.junit.Test;
import java.sql.Connection;
......@@ -40,13 +39,13 @@ public class BackendTransactionManagerTest {
@Test
public void assertLocalTransactionCommit() throws SQLException {
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 2);
backendTransactionManager.doInTransaction(TransactionOperationType.BEGIN);
backendTransactionManager.begin();
assertThat(backendConnection.getStateHandler().getStatus(), is(ConnectionStatus.TRANSACTION));
assertThat(backendConnection.getMethodInvocations().size(), is(1));
assertThat(backendConnection.getMethodInvocations().iterator().next().getArguments(), is(new Object[]{false}));
assertTrue(backendConnection.getCachedConnections().isEmpty());
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 2);
backendTransactionManager.doInTransaction(TransactionOperationType.COMMIT);
backendTransactionManager.commit();
Iterator<Connection> iterator = backendConnection.getCachedConnections().values().iterator();
verify(iterator.next()).commit();
verify(iterator.next()).commit();
......@@ -55,11 +54,11 @@ public class BackendTransactionManagerTest {
@Test
public void assertLocalTransactionCommitWithException() throws SQLException {
backendTransactionManager.doInTransaction(TransactionOperationType.BEGIN);
backendTransactionManager.begin();
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 2);
MockConnectionUtil.mockThrowException(backendConnection.getCachedConnections().values());
try {
backendTransactionManager.doInTransaction(TransactionOperationType.COMMIT);
backendTransactionManager.commit();
} catch (final SQLException ex) {
assertThat(ex.getNextException().getNextException(), instanceOf(SQLException.class));
}
......@@ -68,9 +67,9 @@ public class BackendTransactionManagerTest {
@Test
public void assertLocalTransactionRollback() throws SQLException {
backendTransactionManager.doInTransaction(TransactionOperationType.BEGIN);
backendTransactionManager.begin();
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 2);
backendTransactionManager.doInTransaction(TransactionOperationType.ROLLBACK);
backendTransactionManager.rollback();
Iterator<Connection> iterator = backendConnection.getCachedConnections().values().iterator();
verify(iterator.next()).rollback();
verify(iterator.next()).rollback();
......@@ -79,11 +78,11 @@ public class BackendTransactionManagerTest {
@Test
public void assertLocalTransactionRollbackWithException() throws SQLException {
backendTransactionManager.doInTransaction(TransactionOperationType.BEGIN);
backendTransactionManager.begin();
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 2);
MockConnectionUtil.mockThrowException(backendConnection.getCachedConnections().values());
try {
backendTransactionManager.doInTransaction(TransactionOperationType.ROLLBACK);
backendTransactionManager.rollback();
} catch (final SQLException ex) {
assertThat(ex.getNextException().getNextException(), instanceOf(SQLException.class));
}
......@@ -94,22 +93,22 @@ public class BackendTransactionManagerTest {
public void assertXATransactionCommit() throws SQLException {
backendConnection.setCurrentSchema("schema");
backendConnection.setTransactionType(TransactionType.XA);
backendTransactionManager.doInTransaction(TransactionOperationType.BEGIN);
backendTransactionManager.begin();
assertTrue(backendConnection.getMethodInvocations().isEmpty());
assertThat(backendConnection.getStateHandler().getStatus(), is(ConnectionStatus.TRANSACTION));
backendTransactionManager.doInTransaction(TransactionOperationType.COMMIT);
backendTransactionManager.commit();
assertThat(backendConnection.getStateHandler().getStatus(), is(ConnectionStatus.TERMINATED));
backendTransactionManager.doInTransaction(TransactionOperationType.BEGIN);
backendTransactionManager.begin();
}
@Test
public void assertXATransactionRollback() throws SQLException {
backendConnection.setCurrentSchema("schema");
backendConnection.setTransactionType(TransactionType.XA);
backendTransactionManager.doInTransaction(TransactionOperationType.BEGIN);
backendTransactionManager.begin();
assertTrue(backendConnection.getMethodInvocations().isEmpty());
assertThat(backendConnection.getStateHandler().getStatus(), is(ConnectionStatus.TRANSACTION));
backendTransactionManager.doInTransaction(TransactionOperationType.ROLLBACK);
backendTransactionManager.rollback();
assertThat(backendConnection.getStateHandler().getStatus(), is(ConnectionStatus.TERMINATED));
}
}
......@@ -34,29 +34,23 @@ public final class FixedXAShardingTransactionHandler extends ShardingTransaction
private static final Map<String, TransactionOperationType> INVOKES = new HashMap<>();
/**
* Get invoke map.
*
* @return map
*/
static Map<String, TransactionOperationType> getInvokes() {
return INVOKES;
}
@Override
public void doInTransaction(final TransactionOperationType transactionOperationType) {
switch (transactionOperationType) {
case BEGIN:
INVOKES.put("begin", transactionOperationType);
return;
case COMMIT:
INVOKES.put("commit", transactionOperationType);
return;
case ROLLBACK:
INVOKES.put("rollback", transactionOperationType);
return;
default:
}
public void begin() {
INVOKES.put("begin", TransactionOperationType.BEGIN);
}
@Override
public void commit() {
INVOKES.put("commit", TransactionOperationType.COMMIT);
}
@Override
public void rollback() {
INVOKES.put("rollback", TransactionOperationType.ROLLBACK);
}
@Override
......
......@@ -18,7 +18,6 @@
package io.shardingsphere.transaction.core.handler;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.spi.ShardingTransactionHandler;
import javax.sql.DataSource;
......@@ -34,24 +33,19 @@ import java.util.Map;
*/
public abstract class ShardingTransactionHandlerAdapter implements ShardingTransactionHandler {
/**
* Default implement for do in transaction.
*/
@Override
@SuppressWarnings("unchecked")
public void doInTransaction(final TransactionOperationType transactionOperationType) {
switch (transactionOperationType) {
case BEGIN:
getShardingTransactionManager().begin();
break;
case COMMIT:
getShardingTransactionManager().commit();
break;
case ROLLBACK:
getShardingTransactionManager().rollback();
break;
default:
}
public void begin() {
getShardingTransactionManager().begin();
}
@Override
public void commit() {
getShardingTransactionManager().commit();
}
@Override
public void rollback() {
getShardingTransactionManager().rollback();
}
/**
......
......@@ -19,7 +19,6 @@ package io.shardingsphere.transaction.spi;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import javax.sql.DataSource;
......@@ -36,11 +35,19 @@ import java.util.Map;
public interface ShardingTransactionHandler {
/**
* Do transaction operation using specific transaction manager.
*
* @param transactionOperationType transaction operation type
* Begin transaction.
*/
void begin();
/**
* Commit transaction.
*/
void commit();
/**
* Rollback transaction.
*/
void doInTransaction(TransactionOperationType transactionOperationType);
void rollback();
/**
* Get sharding transaction manager.
......
......@@ -19,7 +19,6 @@ package io.shardingsphere.transaction.core.handler;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.transaction.api.TransactionType;
import io.shardingsphere.transaction.core.TransactionOperationType;
import io.shardingsphere.transaction.core.manager.ShardingTransactionManager;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -40,20 +39,20 @@ public class ShardingTransactionHandlerAdapterTest {
private ShardingTransactionManager shardingTransactionManager = fixedShardingTransactionHandler.getShardingTransactionManager();
@Test
public void assertDoXATransactionBegin() {
fixedShardingTransactionHandler.doInTransaction(TransactionOperationType.BEGIN);
public void assertBeginForXATransaction() {
fixedShardingTransactionHandler.begin();
verify(shardingTransactionManager).begin();
}
@Test
public void assertDoXATransactionCommit() {
fixedShardingTransactionHandler.doInTransaction(TransactionOperationType.COMMIT);
public void assertCommitForXATransaction() {
fixedShardingTransactionHandler.commit();
verify(shardingTransactionManager).commit();
}
@Test
public void assertDoXATransactionRollback() {
fixedShardingTransactionHandler.doInTransaction(TransactionOperationType.ROLLBACK);
public void assertRollbackXATransaction() {
fixedShardingTransactionHandler.rollback();
verify(shardingTransactionManager).rollback();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册