diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLErrorResponsePacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLErrorResponsePacket.java index 323371dcf9a9b05032bbdb3ce795bc685cbbc163..08382118efb93109ee016c65dc00b8b0cbae4be7 100644 --- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLErrorResponsePacket.java +++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLErrorResponsePacket.java @@ -32,48 +32,48 @@ import java.util.Map.Entry; * @see ErrorResponse (B) */ public final class PostgreSQLErrorResponsePacket implements PostgreSQLPacket { - + public static final char FIELD_TYPE_SEVERITY = 'S'; - + public static final char FIELD_TYPE_SEVERITY2 = 'V'; - + public static final char FIELD_TYPE_CODE = 'C'; - + public static final char FIELD_TYPE_MESSAGE = 'M'; - + public static final char FIELD_TYPE_DETAIL = 'D'; - + public static final char FIELD_TYPE_HINT = 'H'; - + public static final char FIELD_TYPE_POSITION = 'P'; - + public static final char FIELD_TYPE_INTERNAL_POSITION = 'p'; - + public static final char FIELD_TYPE_INTERNAL_QUERY = 'q'; - + public static final char FIELD_TYPE_WHERE = 'W'; - + public static final char FIELD_TYPE_SCHEMA_NAME = 's'; - + public static final char FIELD_TYPE_TABLE_NAME = 't'; - + public static final char FIELD_TYPE_COLUMN_NAME = 'c'; - + public static final char FIELD_TYPE_DATA_TYPE_NAME = 'd'; - + public static final char FIELD_TYPE_CONSTRAINT_NAME = 'n'; - + public static final char FIELD_TYPE_FILE = 'F'; - + public static final char FIELD_TYPE_LINE = 'L'; - + public static final char FIELD_TYPE_ROUTINE = 'R'; - + @Getter private final char messageType = PostgreSQLCommandPacketType.ERROR_RESPONSE.getValue(); - + private final Map fields = new HashMap<>(); - + @Override public void write(final PostgreSQLPacketPayload payload) { for (Entry each : fields.entrySet()) { @@ -82,7 +82,7 @@ public final class PostgreSQLErrorResponsePacket implements PostgreSQLPacket { } payload.writeInt1(0); } - + /** * Add field. * diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateMachine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateMachine.java index 42ab3fe96531ff2bea48661b781bd3a2d240f6f5..a6cd219e4c38f4195e3bc35af0d19ca15d4f60e4 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateMachine.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateMachine.java @@ -22,6 +22,8 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine; +import org.apache.shardingsphere.proxy.frontend.state.impl.CircuitBreakProxyState; +import org.apache.shardingsphere.proxy.frontend.state.impl.LockProxyState; import org.apache.shardingsphere.proxy.frontend.state.impl.OKProxyState; import java.util.Map; @@ -40,6 +42,8 @@ public final class ProxyStateMachine { static { PROXY_STATE_MAP.put(ProxyStateType.OK, new OKProxyState()); + PROXY_STATE_MAP.put(ProxyStateType.LOCK, new LockProxyState()); + PROXY_STATE_MAP.put(ProxyStateType.CIRCUIT_BREAK, new CircuitBreakProxyState()); CURRENT_STATE.set(PROXY_STATE_MAP.get(ProxyStateType.OK)); } diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java new file mode 100644 index 0000000000000000000000000000000000000000..edf9e92c8cec75460d708d20db93177682a1fa54 --- /dev/null +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.proxy.frontend.state.impl; + +import com.google.common.eventbus.Subscribe; +import io.netty.channel.ChannelHandlerContext; +import org.apache.shardingsphere.db.protocol.packet.DatabasePacket; +import org.apache.shardingsphere.governance.core.event.GovernanceEventBus; +import org.apache.shardingsphere.governance.core.registry.event.CircuitStateChangedEvent; +import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; +import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException; +import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine; +import org.apache.shardingsphere.proxy.frontend.state.ProxyState; +import org.apache.shardingsphere.proxy.frontend.state.ProxyStateMachine; +import org.apache.shardingsphere.proxy.frontend.state.ProxyStateType; + +import java.util.Optional; + +/** + * Circuit break proxy state. + */ +public final class CircuitBreakProxyState implements ProxyState { + + public CircuitBreakProxyState() { + GovernanceEventBus.getInstance().register(this); + } + + @Override + public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) { + context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new CircuitBreakException())); + Optional> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket(); + databasePacket.ifPresent(context::writeAndFlush); + } + + /** + * Renew circuit breaker state. + * + * @param event circuit state changed event + */ + @Subscribe + public synchronized void renew(final CircuitStateChangedEvent event) { + if (event.isCircuitBreak()) { + ProxyStateMachine.switchState(ProxyStateType.CIRCUIT_BREAK); + } else { + // TODO check previous state, maybe lock + ProxyStateMachine.switchState(ProxyStateType.OK); + } + } +} diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java new file mode 100644 index 0000000000000000000000000000000000000000..727a62c87d51fb0c339d529444aee2a90e74443e --- /dev/null +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/LockProxyState.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.proxy.frontend.state.impl; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; +import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine; +import org.apache.shardingsphere.proxy.frontend.state.ProxyState; + +/** + * Lock proxy state. + */ +public final class LockProxyState implements ProxyState { + + @Override + public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) { + throw new UnsupportedOperationException("LockProxyState"); + } +} diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java index a51bbbd94b43ccb2dcb30f7eb0b1438fa822f5fc..9ce7036170193251883363d74e742599f9eb2788 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java @@ -24,11 +24,11 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.e import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket; import org.apache.shardingsphere.db.protocol.packet.DatabasePacket; import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry; +import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine; import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine; import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory; import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; -import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException; import org.apache.shardingsphere.proxy.backend.response.BackendResponse; import org.apache.shardingsphere.proxy.backend.response.query.QueryData; import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse; @@ -36,7 +36,6 @@ import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse; import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor; import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType; import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder; -import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine; import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import java.sql.SQLException; @@ -65,9 +64,6 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor { @Override public Collection> execute() throws SQLException { - if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) { - throw new CircuitBreakException(); - } BackendResponse backendResponse = databaseCommunicationEngine.execute(); return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse); } diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java index 89e1e76a70ce355c1c313580221a197aa8c10593..9ccdce5b74b4c27c029958ea383271a404f62152 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java @@ -24,8 +24,6 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.que import org.apache.shardingsphere.db.protocol.packet.DatabasePacket; import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry; import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; -import org.apache.shardingsphere.proxy.backend.context.ProxyContext; -import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException; import org.apache.shardingsphere.proxy.backend.response.BackendResponse; import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse; import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse; @@ -56,9 +54,6 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor { @Override public Collection> execute() throws SQLException { - if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) { - throw new CircuitBreakException(); - } BackendResponse backendResponse = textProtocolBackendHandler.execute(); return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse); } diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java index c5955f6869abce99bf2b02dc16cf33417a5e16a2..d21fd6fea011022413123d3d85251ac1956930a6 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java @@ -28,7 +28,6 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.bin import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket; -import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket; import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.executor.sql.QueryResult; import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader; @@ -48,7 +47,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; @@ -80,9 +78,6 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor { @Override public Collection> execute() throws SQLException { - if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) { - return Collections.singletonList(new PostgreSQLErrorResponsePacket()); - } List> result = new LinkedList<>(); result.add(new PostgreSQLBindCompletePacket()); if (null == databaseCommunicationEngine) { diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java index 7c2e52cdef77ebbb20096c3497345a365e1f7502..6dfb8c4048acdd5124ded556e25c9e030fb5c146 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java @@ -25,12 +25,10 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.Pos import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLComQueryPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket; -import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket; import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.executor.sql.QueryResult; import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader; import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; -import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.proxy.backend.response.BackendResponse; import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse; import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse; @@ -62,9 +60,6 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor { @Override public Collection> execute() throws SQLException { - if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) { - return Collections.singletonList(new PostgreSQLErrorResponsePacket()); - } BackendResponse backendResponse = textProtocolBackendHandler.execute(); if (backendResponse instanceof QueryResponse) { Optional result = createQueryPacket((QueryResponse) backendResponse);