未验证 提交 7f7ff1c4 编写于 作者: L Liang Zhang 提交者: GitHub

Rename to ConnectionStatusManager (#7424)

* Refactor ConnectionStatusHandler

* Rename ConnectionStatusManager

* Move ConnectionStatus to status package
上级 57798ff6
......@@ -105,7 +105,7 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
}
private boolean isExecuteDDLInXATransaction(final SQLStatement sqlStatement) {
return TransactionType.XA == connection.getTransactionType() && sqlStatement instanceof DDLStatement && connection.getStatusHandler().isInTransaction();
return TransactionType.XA == connection.getTransactionType() && sqlStatement instanceof DDLStatement && connection.getStatusManager().isInTransaction();
}
private String getTableName(final SQLStatementContext<?> sqlStatementContext) {
......
......@@ -32,6 +32,7 @@ import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.group.Stateme
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
import org.apache.shardingsphere.masterslave.route.engine.impl.MasterVisitedManager;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.status.ConnectionStatusManager;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.transaction.core.TransactionType;
......@@ -82,7 +83,7 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
private final ResourceLock resourceLock = new ResourceLock();
private final ConnectionStatusHandler statusHandler = new ConnectionStatusHandler(resourceLock);
private final ConnectionStatusManager statusManager = new ConnectionStatusManager(resourceLock);
public BackendConnection(final TransactionType transactionType) {
this.transactionType = transactionType;
......@@ -117,7 +118,7 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
private boolean isSwitchFailed() {
int retryCount = 0;
while (statusHandler.isInTransaction() && retryCount < MAXIMUM_RETRY_COUNT) {
while (statusManager.isInTransaction() && retryCount < MAXIMUM_RETRY_COUNT) {
resourceLock.doAwait();
++retryCount;
log.info("Current transaction have not terminated, retry count:[{}].", retryCount);
......@@ -127,7 +128,7 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
@Override
public List<Connection> getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
return statusHandler.isInTransaction()
return statusManager.isInTransaction()
? getConnectionsWithTransaction(dataSourceName, connectionSize, connectionMode) : getConnectionsWithoutTransaction(dataSourceName, connectionSize, connectionMode);
}
......@@ -213,7 +214,7 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
* @return true or false
*/
public boolean isSerialExecute() {
return statusHandler.isInTransaction() && (TransactionType.LOCAL == transactionType || TransactionType.XA == transactionType);
return statusManager.isInTransaction() && (TransactionType.LOCAL == transactionType || TransactionType.XA == transactionType);
}
/**
......@@ -259,10 +260,10 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
MasterVisitedManager.clear();
exceptions.addAll(closeResultSets());
exceptions.addAll(closeStatements());
if (!statusHandler.isInTransaction() || forceClose || TransactionType.BASE == transactionType) {
if (!statusManager.isInTransaction() || forceClose || TransactionType.BASE == transactionType) {
exceptions.addAll(releaseConnections(forceClose));
}
statusHandler.doNotifyIfNecessary();
statusManager.switchToReleased();
throwSQLExceptionIfNecessary(exceptions);
}
......@@ -296,7 +297,7 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
Collection<SQLException> result = new LinkedList<>();
for (Connection each : cachedConnections.values()) {
try {
if (forceRollback && statusHandler.isInTransaction()) {
if (forceRollback && statusManager.isInTransaction()) {
each.rollback();
}
each.close();
......
......@@ -47,8 +47,8 @@ public final class BackendTransactionManager implements TransactionManager {
@Override
public void begin() {
if (!connection.getStatusHandler().isInTransaction()) {
connection.getStatusHandler().switchInTransactionStatus();
if (!connection.getStatusManager().isInTransaction()) {
connection.getStatusManager().switchToInTransaction();
connection.releaseConnections(false);
}
if (TransactionType.LOCAL == transactionType || null == shardingTransactionManager) {
......@@ -60,7 +60,7 @@ public final class BackendTransactionManager implements TransactionManager {
@Override
public void commit() throws SQLException {
if (connection.getStatusHandler().isInTransaction()) {
if (connection.getStatusManager().isInTransaction()) {
try {
if (TransactionType.LOCAL == transactionType || null == shardingTransactionManager) {
localTransactionManager.commit();
......@@ -68,14 +68,14 @@ public final class BackendTransactionManager implements TransactionManager {
shardingTransactionManager.commit();
}
} finally {
connection.getStatusHandler().switchUsingStatus();
connection.getStatusManager().switchToUsing();
}
}
}
@Override
public void rollback() throws SQLException {
if (connection.getStatusHandler().isInTransaction()) {
if (connection.getStatusManager().isInTransaction()) {
try {
if (TransactionType.LOCAL == transactionType || null == shardingTransactionManager) {
localTransactionManager.rollback();
......@@ -83,7 +83,7 @@ public final class BackendTransactionManager implements TransactionManager {
shardingTransactionManager.rollback();
}
} finally {
connection.getStatusHandler().switchUsingStatus();
connection.getStatusManager().switchToUsing();
}
}
}
......
......@@ -40,7 +40,7 @@ public final class LocalTransactionManager implements TransactionManager {
@Override
public void commit() throws SQLException {
if (connection.getStatusHandler().isInTransaction()) {
if (connection.getStatusManager().isInTransaction()) {
Collection<SQLException> exceptions = new LinkedList<>(commitConnections());
throwSQLExceptionIfNecessary(exceptions);
}
......@@ -48,7 +48,7 @@ public final class LocalTransactionManager implements TransactionManager {
@Override
public void rollback() throws SQLException {
if (connection.getStatusHandler().isInTransaction()) {
if (connection.getStatusManager().isInTransaction()) {
Collection<SQLException> exceptions = new LinkedList<>(rollbackConnections());
throwSQLExceptionIfNecessary(exceptions);
}
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.backend.communication.jdbc.connection;
package org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.status;
/**
* Connection status.
......
......@@ -15,17 +15,18 @@
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.backend.communication.jdbc.connection;
package org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.status;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ResourceLock;
import java.util.concurrent.atomic.AtomicReference;
/**
* Connection status handler.
* Connection status manager.
*/
@RequiredArgsConstructor
public final class ConnectionStatusHandler {
public final class ConnectionStatusManager {
private final AtomicReference<ConnectionStatus> status = new AtomicReference<>(ConnectionStatus.RELEASED);
......@@ -34,14 +35,14 @@ public final class ConnectionStatusHandler {
/**
* Switch connection status to using.
*/
public void switchUsingStatus() {
public void switchToUsing() {
status.set(ConnectionStatus.USING);
}
/**
* Switch connection status to in transaction.
*/
public void switchInTransactionStatus() {
public void switchToInTransaction() {
status.set(ConnectionStatus.IN_TRANSACTION);
}
......@@ -55,9 +56,9 @@ public final class ConnectionStatusHandler {
}
/**
* Notify connection to finish wait if necessary.
* Switch connection status to released.
*/
void doNotifyIfNecessary() {
public void switchToReleased() {
if (status.compareAndSet(ConnectionStatus.USING, ConnectionStatus.RELEASED)) {
resourceLock.doNotify();
}
......
......@@ -92,7 +92,7 @@ public final class TextProtocolBackendHandlerFactory {
}
if (tclStatement instanceof SetAutoCommitStatement) {
if (((SetAutoCommitStatement) tclStatement).isAutoCommit()) {
return backendConnection.getStatusHandler().isInTransaction() ? new TransactionBackendHandler(TransactionOperationType.COMMIT, backendConnection) : new SkipBackendHandler();
return backendConnection.getStatusManager().isInTransaction() ? new TransactionBackendHandler(TransactionOperationType.COMMIT, backendConnection) : new SkipBackendHandler();
}
return new TransactionBackendHandler(TransactionOperationType.BEGIN, backendConnection);
}
......
......@@ -21,7 +21,7 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypes;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionStatusHandler;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.status.ConnectionStatusManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
......@@ -124,9 +124,9 @@ public final class TextProtocolBackendHandlerFactoryTest {
@Test
public void assertNewInstanceWithSetAutoCommitToOnForInTransaction() {
ConnectionStatusHandler statusHandler = mock(ConnectionStatusHandler.class);
when(backendConnection.getStatusHandler()).thenReturn(statusHandler);
when(statusHandler.isInTransaction()).thenReturn(true);
ConnectionStatusManager statusManager = mock(ConnectionStatusManager.class);
when(backendConnection.getStatusManager()).thenReturn(statusManager);
when(statusManager.isInTransaction()).thenReturn(true);
String sql = "SET AUTOCOMMIT=1";
TextProtocolBackendHandler actual = TextProtocolBackendHandlerFactory.newInstance(databaseType, sql, backendConnection);
assertThat(actual, instanceOf(TransactionBackendHandler.class));
......@@ -134,9 +134,9 @@ public final class TextProtocolBackendHandlerFactoryTest {
@Test
public void assertNewInstanceWithScopeSetAutoCommitToOnForInTransaction() {
ConnectionStatusHandler statusHandler = mock(ConnectionStatusHandler.class);
when(backendConnection.getStatusHandler()).thenReturn(statusHandler);
when(statusHandler.isInTransaction()).thenReturn(true);
ConnectionStatusManager statusManager = mock(ConnectionStatusManager.class);
when(backendConnection.getStatusManager()).thenReturn(statusManager);
when(statusManager.isInTransaction()).thenReturn(true);
String sql = "SET @@SESSION.AUTOCOMMIT = ON";
TextProtocolBackendHandler actual = TextProtocolBackendHandlerFactory.newInstance(databaseType, sql, backendConnection);
assertThat(actual, instanceOf(TransactionBackendHandler.class));
......@@ -145,9 +145,9 @@ public final class TextProtocolBackendHandlerFactoryTest {
@Test
public void assertNewInstanceWithSetAutoCommitToOnForNotInTransaction() {
String sql = "SET AUTOCOMMIT=1";
ConnectionStatusHandler statusHandler = mock(ConnectionStatusHandler.class);
when(backendConnection.getStatusHandler()).thenReturn(statusHandler);
when(statusHandler.isInTransaction()).thenReturn(false);
ConnectionStatusManager statusManager = mock(ConnectionStatusManager.class);
when(backendConnection.getStatusManager()).thenReturn(statusManager);
when(statusManager.isInTransaction()).thenReturn(false);
TextProtocolBackendHandler actual = TextProtocolBackendHandlerFactory.newInstance(databaseType, sql, backendConnection);
assertThat(actual, instanceOf(SkipBackendHandler.class));
}
......
......@@ -128,44 +128,44 @@ public final class BackendConnectionTest {
@Test
public void assertGetConnectionCacheIsEmpty() throws SQLException {
backendConnection.getStatusHandler().switchInTransactionStatus();
backendConnection.getStatusManager().switchToInTransaction();
when(backendDataSource.getConnections(anyString(), anyString(), eq(2), any())).thenReturn(MockConnectionUtil.mockNewConnections(2));
List<Connection> actualConnections = backendConnection.getConnections("ds1", 2, ConnectionMode.MEMORY_STRICTLY);
assertThat(actualConnections.size(), is(2));
assertThat(backendConnection.getConnectionSize(), is(2));
assertTrue(backendConnection.getStatusHandler().isInTransaction());
assertTrue(backendConnection.getStatusManager().isInTransaction());
}
@Test
public void assertGetConnectionSizeLessThanCache() throws SQLException {
backendConnection.getStatusHandler().switchInTransactionStatus();
backendConnection.getStatusManager().switchToInTransaction();
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
List<Connection> actualConnections = backendConnection.getConnections("ds1", 2, ConnectionMode.MEMORY_STRICTLY);
assertThat(actualConnections.size(), is(2));
assertThat(backendConnection.getConnectionSize(), is(10));
assertTrue(backendConnection.getStatusHandler().isInTransaction());
assertTrue(backendConnection.getStatusManager().isInTransaction());
}
@Test
public void assertGetConnectionSizeGreaterThanCache() throws SQLException {
backendConnection.getStatusHandler().switchInTransactionStatus();
backendConnection.getStatusManager().switchToInTransaction();
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
when(backendDataSource.getConnections(anyString(), anyString(), eq(2), any())).thenReturn(MockConnectionUtil.mockNewConnections(2));
List<Connection> actualConnections = backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
assertThat(actualConnections.size(), is(12));
assertThat(backendConnection.getConnectionSize(), is(12));
assertTrue(backendConnection.getStatusHandler().isInTransaction());
assertTrue(backendConnection.getStatusManager().isInTransaction());
}
@Test
public void assertGetConnectionWithMethodInvocation() throws SQLException {
backendConnection.getStatusHandler().switchInTransactionStatus();
backendConnection.getStatusManager().switchToInTransaction();
when(backendDataSource.getConnections(anyString(), anyString(), eq(2), any())).thenReturn(MockConnectionUtil.mockNewConnections(2));
setMethodInvocation();
List<Connection> actualConnections = backendConnection.getConnections("ds1", 2, ConnectionMode.MEMORY_STRICTLY);
verify(backendConnection.getMethodInvocations().iterator().next(), times(2)).invoke(any());
assertThat(actualConnections.size(), is(2));
assertTrue(backendConnection.getStatusHandler().isInTransaction());
assertTrue(backendConnection.getStatusManager().isInTransaction());
}
@SneakyThrows(ReflectiveOperationException.class)
......@@ -192,11 +192,11 @@ public final class BackendConnectionTest {
@SneakyThrows
private void assertOneThreadResult() {
backendConnection.getStatusHandler().switchInTransactionStatus();
backendConnection.getStatusManager().switchToInTransaction();
List<Connection> actualConnections = backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
assertThat(actualConnections.size(), is(12));
assertThat(backendConnection.getConnectionSize(), is(12));
assertTrue(backendConnection.getStatusHandler().isInTransaction());
assertTrue(backendConnection.getStatusManager().isInTransaction());
}
@Test
......@@ -206,7 +206,7 @@ public final class BackendConnectionTest {
backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
when(backendDataSource.getConnections(anyString(), anyString(), eq(12), any())).thenReturn(MockConnectionUtil.mockNewConnections(12));
backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
backendConnection.getStatusHandler().switchUsingStatus();
backendConnection.getStatusManager().switchToUsing();
mockResultSetAndStatement(backendConnection);
actual = backendConnection;
}
......@@ -223,7 +223,7 @@ public final class BackendConnectionTest {
backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
when(backendDataSource.getConnections(anyString(), anyString(), eq(2), any())).thenReturn(MockConnectionUtil.mockNewConnections(2));
backendConnection.getStatusHandler().switchInTransactionStatus();
backendConnection.getStatusManager().switchToInTransaction();
backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
mockResultSetAndStatement(backendConnection);
actual = backendConnection;
......@@ -240,10 +240,10 @@ public final class BackendConnectionTest {
try (BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL)) {
backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
backendConnection.setTransactionType(TransactionType.XA);
backendConnection.getStatusHandler().switchInTransactionStatus();
backendConnection.getStatusManager().switchToInTransaction();
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
backendConnection.getStatusHandler().switchUsingStatus();
backendConnection.getStatusManager().switchToUsing();
mockResultSetAndStatement(backendConnection);
mockResultSetAndStatementException(backendConnection);
actual = backendConnection;
......
......@@ -18,6 +18,7 @@
package org.apache.shardingsphere.proxy.backend.communication.jdbc.connection;
import lombok.SneakyThrows;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.status.ConnectionStatusManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.transaction.ShardingTransactionManagerEngine;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
......@@ -45,7 +46,7 @@ public final class BackendTransactionManagerTest {
private BackendConnection backendConnection;
@Mock
private ConnectionStatusHandler statusHandler;
private ConnectionStatusManager statusManager;
@Mock
private LocalTransactionManager localTransactionManager;
......@@ -59,7 +60,7 @@ public final class BackendTransactionManagerTest {
public void setUp() {
setTransactionContexts();
when(backendConnection.getSchemaName()).thenReturn("schema");
when(backendConnection.getStatusHandler()).thenReturn(statusHandler);
when(backendConnection.getStatusManager()).thenReturn(statusManager);
}
@SneakyThrows(ReflectiveOperationException.class)
......@@ -81,7 +82,7 @@ public final class BackendTransactionManagerTest {
public void assertBeginForLocalTransaction() {
newBackendTransactionManager(TransactionType.LOCAL, false);
backendTransactionManager.begin();
verify(statusHandler).switchInTransactionStatus();
verify(statusManager).switchToInTransaction();
verify(backendConnection).releaseConnections(false);
verify(localTransactionManager).begin();
}
......@@ -90,7 +91,7 @@ public final class BackendTransactionManagerTest {
public void assertBeginForDistributedTransaction() {
newBackendTransactionManager(TransactionType.XA, true);
backendTransactionManager.begin();
verify(statusHandler, times(0)).switchInTransactionStatus();
verify(statusManager, times(0)).switchToInTransaction();
verify(backendConnection, times(0)).releaseConnections(false);
verify(shardingTransactionManager).begin();
}
......@@ -99,7 +100,7 @@ public final class BackendTransactionManagerTest {
public void assertCommitForLocalTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, true);
backendTransactionManager.commit();
verify(statusHandler).switchUsingStatus();
verify(statusManager).switchToUsing();
verify(localTransactionManager).commit();
}
......@@ -107,7 +108,7 @@ public final class BackendTransactionManagerTest {
public void assertCommitForDistributedTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.XA, true);
backendTransactionManager.commit();
verify(statusHandler).switchUsingStatus();
verify(statusManager).switchToUsing();
verify(shardingTransactionManager).commit();
}
......@@ -115,7 +116,7 @@ public final class BackendTransactionManagerTest {
public void assertCommitWithoutTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, false);
backendTransactionManager.commit();
verify(statusHandler, times(0)).switchUsingStatus();
verify(statusManager, times(0)).switchToUsing();
verify(localTransactionManager, times(0)).commit();
verify(shardingTransactionManager, times(0)).commit();
}
......@@ -124,7 +125,7 @@ public final class BackendTransactionManagerTest {
public void assertRollbackForLocalTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, true);
backendTransactionManager.rollback();
verify(statusHandler).switchUsingStatus();
verify(statusManager).switchToUsing();
verify(localTransactionManager).rollback();
}
......@@ -132,7 +133,7 @@ public final class BackendTransactionManagerTest {
public void assertRollbackForDistributedTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.XA, true);
backendTransactionManager.rollback();
verify(statusHandler).switchUsingStatus();
verify(statusManager).switchToUsing();
verify(shardingTransactionManager).rollback();
}
......@@ -140,14 +141,14 @@ public final class BackendTransactionManagerTest {
public void assertRollbackWithoutTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, false);
backendTransactionManager.rollback();
verify(statusHandler, times(0)).switchUsingStatus();
verify(statusManager, times(0)).switchToUsing();
verify(localTransactionManager, times(0)).rollback();
verify(shardingTransactionManager, times(0)).rollback();
}
private void newBackendTransactionManager(final TransactionType transactionType, final boolean inTransaction) {
when(backendConnection.getTransactionType()).thenReturn(transactionType);
when(statusHandler.isInTransaction()).thenReturn(inTransaction);
when(statusManager.isInTransaction()).thenReturn(inTransaction);
backendTransactionManager = new BackendTransactionManager(backendConnection);
setLocalTransactionManager();
}
......
......@@ -15,19 +15,20 @@
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.backend.communication.jdbc.connection;
package org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.status;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ResourceLock;
import org.junit.Test;
public final class ConnectionStatusHandlerTest {
public final class ConnectionStatusManagerTest {
private final ConnectionStatusHandler connectionStatusHandler = new ConnectionStatusHandler(new ResourceLock());
private final ConnectionStatusManager connectionStatusManager = new ConnectionStatusManager(new ResourceLock());
@Test
public void assertWaitUntilConnectionReleaseForNoneTransaction() throws InterruptedException {
Thread waitThread = new Thread(() -> {
connectionStatusHandler.switchInTransactionStatus();
connectionStatusHandler.waitUntilConnectionReleasedIfNecessary();
connectionStatusManager.switchToInTransaction();
connectionStatusManager.waitUntilConnectionReleasedIfNecessary();
});
Thread notifyThread = new Thread(() -> {
try {
......@@ -35,7 +36,7 @@ public final class ConnectionStatusHandlerTest {
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
connectionStatusHandler.doNotifyIfNecessary();
connectionStatusManager.switchToReleased();
});
waitThread.start();
notifyThread.start();
......@@ -46,8 +47,8 @@ public final class ConnectionStatusHandlerTest {
@Test
public void assertWaitUntilConnectionReleaseForTransaction() throws InterruptedException {
Thread waitThread = new Thread(() -> {
connectionStatusHandler.switchUsingStatus();
connectionStatusHandler.waitUntilConnectionReleasedIfNecessary();
connectionStatusManager.switchToUsing();
connectionStatusManager.waitUntilConnectionReleasedIfNecessary();
});
Thread notifyThread = new Thread(() -> {
try {
......@@ -55,7 +56,7 @@ public final class ConnectionStatusHandlerTest {
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
connectionStatusHandler.doNotifyIfNecessary();
connectionStatusManager.switchToReleased();
});
waitThread.start();
notifyThread.start();
......
......@@ -28,7 +28,7 @@ import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.infra.hook.RootInvokeHook;
import org.apache.shardingsphere.infra.hook.SPIRootInvokeHook;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionStatusHandler;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.status.ConnectionStatusManager;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.exception.ExpectedExceptions;
......@@ -67,9 +67,9 @@ public final class CommandExecutorTask implements Runnable {
boolean isNeedFlush = false;
try (BackendConnection backendConnection = this.backendConnection;
PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message)) {
ConnectionStatusHandler statusHandler = backendConnection.getStatusHandler();
statusHandler.waitUntilConnectionReleasedIfNecessary();
statusHandler.switchUsingStatus();
ConnectionStatusManager statusManager = backendConnection.getStatusManager();
statusManager.waitUntilConnectionReleasedIfNecessary();
statusManager.switchToUsing();
isNeedFlush = executeCommand(context, payload, backendConnection);
connectionSize = backendConnection.getConnectionSize();
// CHECKSTYLE:OFF
......
......@@ -25,7 +25,7 @@ import org.apache.shardingsphere.db.protocol.packet.CommandPacketType;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionStatusHandler;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.status.ConnectionStatusManager;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.context.FrontendContext;
......@@ -63,7 +63,7 @@ public final class CommandExecutorTaskTest {
private ChannelHandlerContext handlerContext;
@Mock
private ConnectionStatusHandler statusHandler;
private ConnectionStatusManager statusHandler;
@Mock
private CommandExecuteEngine executeEngine;
......@@ -97,13 +97,13 @@ public final class CommandExecutorTaskTest {
when(executeEngine.getCommandExecutor(eq(commandPacketType), eq(commandPacket), eq(backendConnection))).thenReturn(queryCommandExecutor);
when(executeEngine.getCommandPacketType(eq(payload))).thenReturn(commandPacketType);
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
when(backendConnection.getStatusHandler()).thenReturn(statusHandler);
when(backendConnection.getStatusManager()).thenReturn(statusHandler);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(statusHandler).waitUntilConnectionReleasedIfNecessary();
verify(statusHandler).switchUsingStatus();
verify(statusHandler).switchToUsing();
}
@Test
......@@ -114,13 +114,13 @@ public final class CommandExecutorTaskTest {
when(executeEngine.getCommandExecutor(eq(commandPacketType), eq(commandPacket), eq(backendConnection))).thenReturn(queryCommandExecutor);
when(executeEngine.getCommandPacketType(eq(payload))).thenReturn(commandPacketType);
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
when(backendConnection.getStatusHandler()).thenReturn(statusHandler);
when(backendConnection.getStatusManager()).thenReturn(statusHandler);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(statusHandler).waitUntilConnectionReleasedIfNecessary();
verify(statusHandler).switchUsingStatus();
verify(statusHandler).switchToUsing();
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
verify(executeEngine).writeQueryData(handlerContext, backendConnection, queryCommandExecutor, 1);
......@@ -136,13 +136,13 @@ public final class CommandExecutorTaskTest {
when(executeEngine.getCommandExecutor(eq(commandPacketType), eq(commandPacket), eq(backendConnection))).thenReturn(commandExecutor);
when(executeEngine.getCommandPacketType(eq(payload))).thenReturn(commandPacketType);
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
when(backendConnection.getStatusHandler()).thenReturn(statusHandler);
when(backendConnection.getStatusManager()).thenReturn(statusHandler);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(statusHandler).waitUntilConnectionReleasedIfNecessary();
verify(statusHandler).switchUsingStatus();
verify(statusHandler).switchToUsing();
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
}
......@@ -150,7 +150,7 @@ public final class CommandExecutorTaskTest {
@Test
public void assertRunWithError() {
RuntimeException mockException = new RuntimeException("mock");
when(backendConnection.getStatusHandler()).thenThrow(mockException);
when(backendConnection.getStatusManager()).thenThrow(mockException);
when(codecEngine.createPacketPayload(message)).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
when(executeEngine.getErrorPacket(eq(mockException))).thenReturn(databasePacket);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册