提交 89cc0a08 编写于 作者: T terrymanu

Move CIRCUIT_BREAK state in to state machine

上级 87a7bf4b
......@@ -32,48 +32,48 @@ import java.util.Map.Entry;
* @see <a href="https://www.postgresql.org/docs/12/protocol-message-formats.html">ErrorResponse (B)</a>
*/
public final class PostgreSQLErrorResponsePacket implements PostgreSQLPacket {
public static final char FIELD_TYPE_SEVERITY = 'S';
public static final char FIELD_TYPE_SEVERITY2 = 'V';
public static final char FIELD_TYPE_CODE = 'C';
public static final char FIELD_TYPE_MESSAGE = 'M';
public static final char FIELD_TYPE_DETAIL = 'D';
public static final char FIELD_TYPE_HINT = 'H';
public static final char FIELD_TYPE_POSITION = 'P';
public static final char FIELD_TYPE_INTERNAL_POSITION = 'p';
public static final char FIELD_TYPE_INTERNAL_QUERY = 'q';
public static final char FIELD_TYPE_WHERE = 'W';
public static final char FIELD_TYPE_SCHEMA_NAME = 's';
public static final char FIELD_TYPE_TABLE_NAME = 't';
public static final char FIELD_TYPE_COLUMN_NAME = 'c';
public static final char FIELD_TYPE_DATA_TYPE_NAME = 'd';
public static final char FIELD_TYPE_CONSTRAINT_NAME = 'n';
public static final char FIELD_TYPE_FILE = 'F';
public static final char FIELD_TYPE_LINE = 'L';
public static final char FIELD_TYPE_ROUTINE = 'R';
@Getter
private final char messageType = PostgreSQLCommandPacketType.ERROR_RESPONSE.getValue();
private final Map<Character, String> fields = new HashMap<>();
@Override
public void write(final PostgreSQLPacketPayload payload) {
for (Entry<Character, String> each : fields.entrySet()) {
......@@ -82,7 +82,7 @@ public final class PostgreSQLErrorResponsePacket implements PostgreSQLPacket {
}
payload.writeInt1(0);
}
/**
* Add field.
*
......
......@@ -18,10 +18,14 @@
package org.apache.shardingsphere.proxy.frontend.state.impl;
import io.netty.channel.ChannelHandlerContext;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
import java.util.Optional;
/**
* Circuit break proxy state.
*/
......@@ -29,6 +33,8 @@ public final class CircuitBreakProxyState implements ProxyState {
@Override
public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
throw new UnsupportedOperationException("CircuitBreakProxyState");
context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new CircuitBreakException()));
Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket();
databasePacket.ifPresent(context::writeAndFlush);
}
}
......@@ -24,11 +24,11 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.e
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryData;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
......@@ -36,7 +36,6 @@ import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
......@@ -65,9 +64,6 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) {
throw new CircuitBreakException();
}
BackendResponse backendResponse = databaseCommunicationEngine.execute();
return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse);
}
......
......@@ -24,8 +24,6 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.que
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
......@@ -56,9 +54,6 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) {
throw new CircuitBreakException();
}
BackendResponse backendResponse = textProtocolBackendHandler.execute();
return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse);
}
......
......@@ -28,7 +28,6 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.bin
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.sql.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
......@@ -48,7 +47,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
......@@ -80,9 +78,6 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) {
return Collections.singletonList(new PostgreSQLErrorResponsePacket());
}
List<DatabasePacket<?>> result = new LinkedList<>();
result.add(new PostgreSQLBindCompletePacket());
if (null == databaseCommunicationEngine) {
......
......@@ -25,12 +25,10 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.Pos
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLComQueryPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.sql.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
......@@ -62,9 +60,6 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) {
return Collections.singletonList(new PostgreSQLErrorResponsePacket());
}
BackendResponse backendResponse = textProtocolBackendHandler.execute();
if (backendResponse instanceof QueryResponse) {
Optional<PostgreSQLRowDescriptionPacket> result = createQueryPacket((QueryResponse) backendResponse);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册