未验证 提交 50365227 编写于 作者: ShardingSphere's avatar ShardingSphere 提交者: GitHub

Merge branch 'dev' into dev

......@@ -179,13 +179,9 @@ public final class InsertOptimizeEngine implements OptimizeEngine {
private void fillShardingCondition(final ShardingCondition shardingCondition, final Comparable<?> currentGeneratedKey) {
Column generateKeyColumn = shardingRule.findGenerateKeyColumn(insertStatement.getTables().getSingleTableName()).get();
if (isShardingColumn(generateKeyColumn)) {
if (shardingRule.isShardingColumn(generateKeyColumn)) {
shardingCondition.getShardingValues().add(new ListRouteValue<>(generateKeyColumn, new GeneratedKeyCondition(generateKeyColumn, -1, currentGeneratedKey).getConditionValues(parameters)));
}
insertStatement.setContainGenerateKey(true);
}
private boolean isShardingColumn(final Column generateKeyColumn) {
return shardingRule.getTableRule(generateKeyColumn.getTableName()).getAllShardingColumns().contains(generateKeyColumn.getName());
}
}
......@@ -121,8 +121,8 @@ public final class MySQLFrontendEngine implements DatabaseFrontendEngine {
private void writePackets(final ChannelHandlerContext context, final MySQLPacketPayload payload, final BackendConnection backendConnection) throws SQLException {
MySQLCommandPacketType commandPacketType = MySQLCommandPacketTypeLoader.getCommandPacketType(payload);
MySQLCommandPacket commandPacket = MySQLCommandPacketFactory.newInstance(commandPacketType, payload);
CommandPacketExecutor<MySQLPacket> commandPacketExecutor = MySQLCommandPacketExecutorFactory.newInstance(commandPacketType);
Collection<MySQLPacket> responsePackets = commandPacketExecutor.execute(backendConnection, commandPacket);
CommandPacketExecutor<MySQLPacket> commandPacketExecutor = MySQLCommandPacketExecutorFactory.newInstance(commandPacketType, commandPacket, backendConnection);
Collection<MySQLPacket> responsePackets = commandPacketExecutor.execute();
if (responsePackets.isEmpty()) {
return;
}
......
......@@ -127,8 +127,8 @@ public final class PostgreSQLFrontendEngine implements DatabaseFrontendEngine {
private void writePackets(final ChannelHandlerContext context, final PostgreSQLPacketPayload payload, final BackendConnection backendConnection) throws SQLException {
PostgreSQLCommandPacketType commandPacketType = PostgreSQLCommandPacketTypeLoader.getCommandPacketType(payload);
PostgreSQLCommandPacket commandPacket = PostgreSQLCommandPacketFactory.newInstance(commandPacketType, payload, backendConnection.getConnectionId());
CommandPacketExecutor<PostgreSQLPacket> commandPacketExecutor = PostgreSQLCommandPacketExecutorFactory.newInstance(commandPacketType);
Collection<PostgreSQLPacket> responsePackets = commandPacketExecutor.execute(backendConnection, commandPacket);
CommandPacketExecutor<PostgreSQLPacket> commandPacketExecutor = PostgreSQLCommandPacketExecutorFactory.newInstance(commandPacketType, commandPacket, backendConnection);
Collection<PostgreSQLPacket> responsePackets = commandPacketExecutor.execute();
if (commandPacket instanceof PostgreSQLComSyncPacket) {
context.write(new PostgreSQLCommandCompletePacket());
context.writeAndFlush(new PostgreSQLReadyForQueryPacket());
......
......@@ -17,8 +17,6 @@
package org.apache.shardingsphere.shardingproxy.transport.common.packet;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import java.sql.SQLException;
......@@ -36,10 +34,8 @@ public interface CommandPacketExecutor<T extends DatabasePacket> {
/**
* Execute command.
*
* @param backendConnection backend connection
* @param commandPacket command packet
* @return database packets to be sent
* @throws SQLException SQL exception
*/
Collection<T> execute(BackendConnection backendConnection, CommandPacket commandPacket) throws SQLException;
Collection<T> execute() throws SQLException;
}
......@@ -23,7 +23,7 @@ import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacke
import org.apache.shardingsphere.shardingproxy.transport.mysql.payload.MySQLPacketPayload;
/**
* MySQL command packet.
* Command packet for MySQL.
*
* @author zhangliang
*/
......
......@@ -19,15 +19,22 @@ package org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.MySQLOKCommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.MySQLUnsupportedCommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.initdb.MySQLComInitDbPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.close.MySQLComStmtClosePacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.execute.MySQLQueryComStmtExecutePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.execute.MySQLQueryComStmtExecutePacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.fieldlist.MySQLComFieldListPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.fieldlist.MySQLComFieldListPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.query.MySQLComQueryPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.query.MySQLComQueryPacketExecutor;
/**
......@@ -42,22 +49,24 @@ public final class MySQLCommandPacketExecutorFactory {
* Create new instance of command packet executor.
*
* @param commandPacketType command packet type for MySQL
* @param commandPacket command packet for MySQL
* @param backendConnection backend connection
* @return command packet executor
*/
public static CommandPacketExecutor<MySQLPacket> newInstance(final MySQLCommandPacketType commandPacketType) {
public static CommandPacketExecutor<MySQLPacket> newInstance(final MySQLCommandPacketType commandPacketType, final CommandPacket commandPacket, final BackendConnection backendConnection) {
switch (commandPacketType) {
case COM_QUIT:
return new MySQLOKCommandPacketExecutor();
case COM_INIT_DB:
return new MySQLComInitDbPacketExecutor();
return new MySQLComInitDbPacketExecutor((MySQLComInitDbPacket) commandPacket, backendConnection);
case COM_FIELD_LIST:
return new MySQLComFieldListPacketExecutor();
return new MySQLComFieldListPacketExecutor((MySQLComFieldListPacket) commandPacket, backendConnection);
case COM_QUERY:
return new MySQLComQueryPacketExecutor();
return new MySQLComQueryPacketExecutor((MySQLComQueryPacket) commandPacket, backendConnection);
case COM_STMT_PREPARE:
return new MySQLComStmtPreparePacketExecutor();
return new MySQLComStmtPreparePacketExecutor((MySQLComStmtPreparePacket) commandPacket, backendConnection);
case COM_STMT_EXECUTE:
return new MySQLQueryComStmtExecutePacketExecutor();
return new MySQLQueryComStmtExecutePacketExecutor((MySQLQueryComStmtExecutePacket) commandPacket, backendConnection);
case COM_STMT_CLOSE:
return new MySQLComStmtClosePacketExecutor();
case COM_PING:
......
......@@ -17,8 +17,6 @@
package org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.admin;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.generic.MySQLOKPacket;
......@@ -34,7 +32,7 @@ import java.util.Collections;
public final class MySQLOKCommandPacketExecutor implements CommandPacketExecutor<MySQLPacket> {
@Override
public Collection<MySQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
public Collection<MySQLPacket> execute() {
return Collections.<MySQLPacket>singletonList(new MySQLOKPacket(1));
}
}
......@@ -18,9 +18,7 @@
package org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.admin;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.error.CommonErrorCode;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.MySQLCommandPacketType;
......@@ -40,7 +38,7 @@ public final class MySQLUnsupportedCommandPacketExecutor implements CommandPacke
private final MySQLCommandPacketType type;
@Override
public Collection<MySQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
public Collection<MySQLPacket> execute() {
return Collections.<MySQLPacket>singletonList(new MySQLErrPacket(1, CommonErrorCode.UNSUPPORTED_COMMAND, type));
}
}
......@@ -17,9 +17,9 @@
package org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.initdb;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.schema.LogicSchemas;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.constant.MySQLServerErrorCode;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
......@@ -34,11 +34,15 @@ import java.util.Collections;
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class MySQLComInitDbPacketExecutor implements CommandPacketExecutor<MySQLPacket> {
private final MySQLComInitDbPacket comInitDbPacket;
private final BackendConnection backendConnection;
@Override
public Collection<MySQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
MySQLComInitDbPacket comInitDbPacket = (MySQLComInitDbPacket) commandPacket;
public Collection<MySQLPacket> execute() {
if (LogicSchemas.getInstance().schemaExists(comInitDbPacket.getSchema())) {
backendConnection.setCurrentSchema(comInitDbPacket.getSchema());
return Collections.<MySQLPacket>singletonList(new MySQLOKPacket(1));
......
......@@ -17,8 +17,6 @@
package org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.close;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
......@@ -30,12 +28,14 @@ import java.util.Collections;
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class MySQLComStmtClosePacketExecutor implements CommandPacketExecutor<MySQLPacket> {
private final MySQLComStmtClosePacket comStmtClosePacket;
@Override
public Collection<MySQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
MySQLComStmtClosePacket mySQLComStmtClosePacket = (MySQLComStmtClosePacket) commandPacket;
mySQLComStmtClosePacket.removeCachedStatement();
public Collection<MySQLPacket> execute() {
comStmtClosePacket.removeCachedStatement();
return Collections.emptyList();
}
}
......@@ -49,7 +49,6 @@ public final class MySQLQueryComStmtExecutePacket extends MySQLCommandPacket {
private final int statementId;
@Getter
private final MySQLBinaryStatement binaryStatement;
private final int flags;
......@@ -58,6 +57,9 @@ public final class MySQLQueryComStmtExecutePacket extends MySQLCommandPacket {
private final MySQLNewParametersBoundFlag newParametersBoundFlag;
@Getter
private final String sql;
@Getter
private final List<Object> parameters;
......@@ -76,6 +78,7 @@ public final class MySQLQueryComStmtExecutePacket extends MySQLCommandPacket {
if (MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST == newParametersBoundFlag) {
binaryStatement.setParameterTypes(getParameterTypes(payload, parametersCount));
}
sql = binaryStatement.getSql();
parameters = getParameters(payload, parametersCount);
}
......
......@@ -28,7 +28,6 @@ import org.apache.shardingsphere.shardingproxy.backend.response.query.QueryRespo
import org.apache.shardingsphere.shardingproxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.shardingproxy.context.GlobalContext;
import org.apache.shardingsphere.shardingproxy.error.CommonErrorCode;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.QueryCommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
......@@ -53,17 +52,19 @@ import java.util.List;
*/
public final class MySQLQueryComStmtExecutePacketExecutor implements QueryCommandPacketExecutor<MySQLPacket> {
private DatabaseCommunicationEngine databaseCommunicationEngine;
private final DatabaseCommunicationEngine databaseCommunicationEngine;
private boolean isQuery;
private volatile boolean isQuery;
private int currentSequenceId;
@Override
public Collection<MySQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
MySQLQueryComStmtExecutePacket comStmtExecutePacket = (MySQLQueryComStmtExecutePacket) commandPacket;
public MySQLQueryComStmtExecutePacketExecutor(final MySQLQueryComStmtExecutePacket comStmtExecutePacket, final BackendConnection backendConnection) {
databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(
backendConnection.getLogicSchema(), comStmtExecutePacket.getBinaryStatement().getSql(), comStmtExecutePacket.getParameters(), backendConnection);
backendConnection.getLogicSchema(), comStmtExecutePacket.getSql(), comStmtExecutePacket.getParameters(), backendConnection);
}
@Override
public Collection<MySQLPacket> execute() {
if (GlobalContext.getInstance().isCircuitBreak()) {
return Collections.<MySQLPacket>singletonList(new MySQLErrPacket(1, CommonErrorCode.CIRCUIT_BREAK_MODE));
}
......
......@@ -27,7 +27,6 @@ import org.apache.shardingsphere.shardingproxy.backend.schema.LogicSchema;
import org.apache.shardingsphere.shardingproxy.backend.schema.LogicSchemas;
import org.apache.shardingsphere.shardingproxy.backend.schema.MasterSlaveSchema;
import org.apache.shardingsphere.shardingproxy.backend.schema.ShardingSchema;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
......@@ -47,10 +46,20 @@ public final class MySQLComStmtPreparePacketExecutor implements CommandPacketExe
private static final MySQLBinaryStatementRegistry PREPARED_STATEMENT_REGISTRY = MySQLBinaryStatementRegistry.getInstance();
private final MySQLComStmtPreparePacket comStmtPreparePacket;
private final LogicSchema logicSchema;
private final String schemaName;
public MySQLComStmtPreparePacketExecutor(final MySQLComStmtPreparePacket comStmtPreparePacket, final BackendConnection backendConnection) {
this.comStmtPreparePacket = comStmtPreparePacket;
logicSchema = backendConnection.getLogicSchema();
schemaName = backendConnection.getSchemaName();
}
@Override
public Collection<MySQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
MySQLComStmtPreparePacket comStmtPreparePacket = (MySQLComStmtPreparePacket) commandPacket;
LogicSchema logicSchema = backendConnection.getLogicSchema();
public Collection<MySQLPacket> execute() {
// TODO we should use none-sharding parsing engine in future.
SQLParsingEngine sqlParsingEngine = new SQLParsingEngine(
LogicSchemas.getInstance().getDatabaseType(), comStmtPreparePacket.getSql(), getShardingRule(logicSchema), logicSchema.getMetaData().getTable());
......@@ -62,7 +71,7 @@ public final class MySQLComStmtPreparePacketExecutor implements CommandPacketExe
++currentSequenceId, PREPARED_STATEMENT_REGISTRY.register(comStmtPreparePacket.getSql(), parametersIndex), getNumColumns(sqlStatement), parametersIndex, 0));
for (int i = 0; i < parametersIndex; i++) {
// TODO add column name
result.add(new MySQLColumnDefinition41Packet(++currentSequenceId, backendConnection.getSchemaName(),
result.add(new MySQLColumnDefinition41Packet(++currentSequenceId, schemaName,
sqlStatement.getTables().isSingleTable() ? sqlStatement.getTables().getSingleTableName() : "", "", "", "", 100, MySQLColumnType.MYSQL_TYPE_VARCHAR, 0));
}
if (parametersIndex > 0) {
......
......@@ -22,7 +22,6 @@ import org.apache.shardingsphere.shardingproxy.backend.communication.DatabaseCom
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.response.BackendResponse;
import org.apache.shardingsphere.shardingproxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
......@@ -44,24 +43,30 @@ public final class MySQLComFieldListPacketExecutor implements CommandPacketExecu
private static final String SQL = "SHOW COLUMNS FROM %s FROM %s";
private DatabaseCommunicationEngine databaseCommunicationEngine;
private final MySQLComFieldListPacket comFieldListPacket;
private final String schemaName;
private final DatabaseCommunicationEngine databaseCommunicationEngine;
public MySQLComFieldListPacketExecutor(final MySQLComFieldListPacket comFieldListPacket, final BackendConnection backendConnection) {
this.comFieldListPacket = comFieldListPacket;
schemaName = backendConnection.getSchemaName();
databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newTextProtocolInstance(backendConnection.getLogicSchema(), getShowColumnsSQL(), backendConnection);
}
@Override
public Collection<MySQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) throws SQLException {
MySQLComFieldListPacket comFieldListPacket = (MySQLComFieldListPacket) commandPacket;
String schemaName = backendConnection.getSchemaName();
databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newTextProtocolInstance(
backendConnection.getLogicSchema(), getShowColumnsSQL(schemaName, comFieldListPacket), backendConnection);
public Collection<MySQLPacket> execute() throws SQLException {
BackendResponse backendResponse = databaseCommunicationEngine.execute();
return backendResponse instanceof ErrorResponse ? Collections.<MySQLPacket>singletonList(MySQLErrPacketFactory.newInstance(1, ((ErrorResponse) backendResponse).getCause()))
: getColumnDefinition41Packets(schemaName, comFieldListPacket);
: getColumnDefinition41Packets();
}
private String getShowColumnsSQL(final String schemaName, final MySQLComFieldListPacket comFieldListPacket) {
private String getShowColumnsSQL() {
return String.format(SQL, comFieldListPacket.getTable(), schemaName);
}
private Collection<MySQLPacket> getColumnDefinition41Packets(final String schemaName, final MySQLComFieldListPacket comFieldListPacket) throws SQLException {
private Collection<MySQLPacket> getColumnDefinition41Packets() throws SQLException {
Collection<MySQLPacket> result = new LinkedList<>();
int currentSequenceId = 0;
while (databaseCommunicationEngine.next()) {
......
......@@ -27,7 +27,6 @@ import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendH
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.shardingproxy.context.GlobalContext;
import org.apache.shardingsphere.shardingproxy.error.CommonErrorCode;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.QueryCommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.MySQLColumnDefinition41Packet;
......@@ -51,16 +50,18 @@ import java.util.List;
*/
public final class MySQLComQueryPacketExecutor implements QueryCommandPacketExecutor<MySQLPacket> {
private TextProtocolBackendHandler textProtocolBackendHandler;
private final TextProtocolBackendHandler textProtocolBackendHandler;
private boolean isQuery;
private volatile boolean isQuery;
private int currentSequenceId;
@Override
public Collection<MySQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
MySQLComQueryPacket comQueryPacket = (MySQLComQueryPacket) commandPacket;
public MySQLComQueryPacketExecutor(final MySQLComQueryPacket comQueryPacket, final BackendConnection backendConnection) {
textProtocolBackendHandler = TextProtocolBackendHandlerFactory.newInstance(comQueryPacket.getSql(), backendConnection);
}
@Override
public Collection<MySQLPacket> execute() {
if (GlobalContext.getInstance().isCircuitBreak()) {
return Collections.<MySQLPacket>singletonList(new MySQLErrPacket(1, CommonErrorCode.CIRCUIT_BREAK_MODE));
}
......
......@@ -21,9 +21,9 @@ import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacke
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
/**
* PostgreSQL command packet.
* Command packet for PostgreSQL.
*
* @author zhangyonglun
*/
public interface PostgreSQLCommandPacket extends PostgreSQLPacket, CommandPacket {
public abstract class PostgreSQLCommandPacket implements PostgreSQLPacket, CommandPacket {
}
......@@ -19,14 +19,18 @@ package org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.comm
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.admin.PostgreSQLUnsupportedCommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.describe.PostgreSQLComDescribePacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.execute.PostgreSQLComExecutePacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.parse.PostgreSQLComParsePacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.parse.PostgreSQLComParsePacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.sync.PostgreSQLComSyncPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.text.PostgreSQLComQueryPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.text.PostgreSQLComQueryPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.generic.PostgreSQLComTerminationPacketExecutor;
......@@ -42,16 +46,19 @@ public final class PostgreSQLCommandPacketExecutorFactory {
* Create new instance of command packet executor.
*
* @param commandPacketType command packet type for PostgreSQL
* @param commandPacket command packet for PostgreSQL
* @param backendConnection backend connection
* @return command packet executor
*/
public static CommandPacketExecutor<PostgreSQLPacket> newInstance(final PostgreSQLCommandPacketType commandPacketType) {
public static CommandPacketExecutor<PostgreSQLPacket> newInstance(
final PostgreSQLCommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket, final BackendConnection backendConnection) {
switch (commandPacketType) {
case QUERY:
return new PostgreSQLComQueryPacketExecutor();
return new PostgreSQLComQueryPacketExecutor((PostgreSQLComQueryPacket) commandPacket, backendConnection);
case PARSE:
return new PostgreSQLComParsePacketExecutor();
return new PostgreSQLComParsePacketExecutor((PostgreSQLComParsePacket) commandPacket, backendConnection);
case BIND:
return new PostgreSQLComBindPacketExecutor();
return new PostgreSQLComBindPacketExecutor((PostgreSQLComBindPacket) commandPacket, backendConnection);
case DESCRIBE:
return new PostgreSQLComDescribePacketExecutor();
case EXECUTE:
......
......@@ -54,7 +54,7 @@ public final class PostgreSQLCommandPacketFactory {
case QUERY:
return new PostgreSQLComQueryPacket(payload);
case PARSE:
return new PostgreSQLComParsePacket(payload, connectionId);
return new PostgreSQLComParsePacket(payload);
case BIND:
return new PostgreSQLComBindPacket(payload, connectionId);
case DESCRIBE:
......
......@@ -28,7 +28,7 @@ import org.apache.shardingsphere.shardingproxy.transport.postgresql.payload.Post
* @author zhangyonglun
*/
@RequiredArgsConstructor
public final class PostgreSQLUnsupportedCommandPacket implements PostgreSQLCommandPacket {
public final class PostgreSQLUnsupportedCommandPacket extends PostgreSQLCommandPacket {
@Getter
private final char messageType;
......
......@@ -17,8 +17,6 @@
package org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.admin;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
......@@ -35,7 +33,7 @@ import java.util.Collections;
public final class PostgreSQLUnsupportedCommandPacketExecutor implements CommandPacketExecutor<PostgreSQLPacket> {
@Override
public Collection<PostgreSQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
public Collection<PostgreSQLPacket> execute() {
return Collections.<PostgreSQLPacket>singletonList(new PostgreSQLErrorResponsePacket());
}
}
......@@ -22,27 +22,29 @@ import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.comma
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.PostgreSQLCommandPacketType;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.BinaryStatementRegistry;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.PostgreSQLBinaryStatement;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementParameterType;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.bind.protocol.PostgreSQLBinaryProtocolValue;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.bind.protocol.PostgreSQLBinaryProtocolValueFactory;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.payload.PostgreSQLPacketPayload;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* PostgreSQL command bind packet.
* Command bind packet for PostgreSQL.
*
* @author zhangyonglun
*/
@Getter
public final class PostgreSQLComBindPacket implements PostgreSQLCommandPacket {
public final class PostgreSQLComBindPacket extends PostgreSQLCommandPacket {
private final String statementId;
private final PostgreSQLBinaryStatement binaryStatement;
private final String sql;
private List<Object> parameters;
private final List<Object> parameters;
private final boolean binaryRowData;
......@@ -54,10 +56,9 @@ public final class PostgreSQLComBindPacket implements PostgreSQLCommandPacket {
for (int i = 0; i < parameterFormatsLength; i++) {
payload.readInt2();
}
binaryStatement = BinaryStatementRegistry.getInstance().get(connectionId).getBinaryStatement(statementId);
if (null != binaryStatement && null != binaryStatement.getSql()) {
parameters = getParameters(payload);
}
PostgreSQLBinaryStatement binaryStatement = BinaryStatementRegistry.getInstance().get(connectionId).getBinaryStatement(statementId);
sql = null == binaryStatement ? null : binaryStatement.getSql();
parameters = null == sql ? Collections.emptyList() : getParameters(payload, binaryStatement.getParameterTypes());
int resultFormatsLength = payload.readInt2();
binaryRowData = resultFormatsLength > 0;
for (int i = 0; i < resultFormatsLength; i++) {
......@@ -65,12 +66,12 @@ public final class PostgreSQLComBindPacket implements PostgreSQLCommandPacket {
}
}
private List<Object> getParameters(final PostgreSQLPacketPayload payload) throws SQLException {
private List<Object> getParameters(final PostgreSQLPacketPayload payload, final List<PostgreSQLBinaryStatementParameterType> parameterTypes) throws SQLException {
int parametersCount = payload.readInt2();
List<Object> result = new ArrayList<>(parametersCount);
for (int parameterIndex = 0; parameterIndex < parametersCount; parameterIndex++) {
payload.readInt4();
PostgreSQLBinaryProtocolValue binaryProtocolValue = PostgreSQLBinaryProtocolValueFactory.getBinaryProtocolValue(binaryStatement.getParameterTypes().get(parameterIndex).getColumnType());
PostgreSQLBinaryProtocolValue binaryProtocolValue = PostgreSQLBinaryProtocolValueFactory.getBinaryProtocolValue(parameterTypes.get(parameterIndex).getColumnType());
result.add(binaryProtocolValue.read(payload));
}
return result;
......
......@@ -28,7 +28,6 @@ import org.apache.shardingsphere.shardingproxy.backend.response.query.QueryHeade
import org.apache.shardingsphere.shardingproxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.shardingproxy.context.GlobalContext;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.QueryCommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.constant.PostgreSQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
......@@ -46,43 +45,47 @@ import java.util.LinkedList;
import java.util.List;
/**
* PostgreSQL command bind packet executor.
* Command bind packet executor for PostgreSQL.
*
* @author zhangyonglun
* @author zhangliang
*/
public final class PostgreSQLComBindPacketExecutor implements QueryCommandPacketExecutor<PostgreSQLPacket> {
private DatabaseCommunicationEngine databaseCommunicationEngine;
private final PostgreSQLComBindPacket comBindPacket;
private final DatabaseCommunicationEngine databaseCommunicationEngine;
private boolean isBinaryRowData;
private volatile boolean isQuery;
private boolean isQuery;
public PostgreSQLComBindPacketExecutor(final PostgreSQLComBindPacket comBindPacket, final BackendConnection backendConnection) {
this.comBindPacket = comBindPacket;
databaseCommunicationEngine = null == comBindPacket.getSql() ? null
: DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(
backendConnection.getLogicSchema(), comBindPacket.getSql(), comBindPacket.getParameters(), backendConnection);
}
@Override
public Collection<PostgreSQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
PostgreSQLComBindPacket comBindPacket = (PostgreSQLComBindPacket) commandPacket;
public Collection<PostgreSQLPacket> execute() {
if (GlobalContext.getInstance().isCircuitBreak()) {
return Collections.<PostgreSQLPacket>singletonList(new PostgreSQLErrorResponsePacket());
}
List<PostgreSQLPacket> result = new LinkedList<>();
result.add(new PostgreSQLBindCompletePacket());
isBinaryRowData = comBindPacket.isBinaryRowData();
if (null != comBindPacket.getBinaryStatement() && null != comBindPacket.getBinaryStatement().getSql()) {
databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(
backendConnection.getLogicSchema(), comBindPacket.getBinaryStatement().getSql(), comBindPacket.getParameters(), backendConnection);
BackendResponse backendResponse = databaseCommunicationEngine.execute();
if (backendResponse instanceof ErrorResponse) {
result.add(createErrorPacket((ErrorResponse) backendResponse));
}
if (backendResponse instanceof UpdateResponse) {
result.add(createUpdatePacket((UpdateResponse) backendResponse));
}
if (backendResponse instanceof QueryResponse) {
Optional<PostgreSQLRowDescriptionPacket> postgreSQLRowDescriptionPacket = createQueryPacket((QueryResponse) backendResponse);
if (postgreSQLRowDescriptionPacket.isPresent()) {
result.add(postgreSQLRowDescriptionPacket.get());
}
if (null == databaseCommunicationEngine) {
return result;
}
BackendResponse backendResponse = databaseCommunicationEngine.execute();
if (backendResponse instanceof ErrorResponse) {
result.add(createErrorPacket((ErrorResponse) backendResponse));
}
if (backendResponse instanceof UpdateResponse) {
result.add(createUpdatePacket((UpdateResponse) backendResponse));
}
if (backendResponse instanceof QueryResponse) {
Optional<PostgreSQLRowDescriptionPacket> postgreSQLRowDescriptionPacket = createQueryPacket((QueryResponse) backendResponse);
if (postgreSQLRowDescriptionPacket.isPresent()) {
result.add(postgreSQLRowDescriptionPacket.get());
}
}
return result;
......@@ -99,7 +102,7 @@ public final class PostgreSQLComBindPacketExecutor implements QueryCommandPacket
private Optional<PostgreSQLRowDescriptionPacket> createQueryPacket(final QueryResponse queryResponse) {
List<PostgreSQLColumnDescription> columnDescriptions = getPostgreSQLColumnDescriptions(queryResponse);
isQuery = !columnDescriptions.isEmpty();
if (columnDescriptions.isEmpty() || isBinaryRowData) {
if (columnDescriptions.isEmpty() || comBindPacket.isBinaryRowData()) {
return Optional.absent();
}
return Optional.of(new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), columnDescriptions));
......@@ -127,7 +130,7 @@ public final class PostgreSQLComBindPacketExecutor implements QueryCommandPacket
@Override
public PostgreSQLPacket getQueryData() throws SQLException {
QueryData queryData = databaseCommunicationEngine.getQueryData();
return isBinaryRowData ? new PostgreSQLBinaryResultSetRowPacket(queryData.getData(), getPostgreSQLColumnTypes(queryData)) : new PostgreSQLDataRowPacket(queryData.getData());
return comBindPacket.isBinaryRowData() ? new PostgreSQLBinaryResultSetRowPacket(queryData.getData(), getPostgreSQLColumnTypes(queryData)) : new PostgreSQLDataRowPacket(queryData.getData());
}
private List<PostgreSQLColumnType> getPostgreSQLColumnTypes(final QueryData queryData) {
......
......@@ -23,12 +23,12 @@ import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.comma
import org.apache.shardingsphere.shardingproxy.transport.postgresql.payload.PostgreSQLPacketPayload;
/**
* PostgreSQL command describe packet.
* Command describe packet for PostgreSQL.
*
* @author zhangyonglun
*/
@Getter
public final class PostgreSQLComDescribePacket implements PostgreSQLCommandPacket {
public final class PostgreSQLComDescribePacket extends PostgreSQLCommandPacket {
public PostgreSQLComDescribePacket(final PostgreSQLPacketPayload payload) {
payload.readInt4();
......
......@@ -17,8 +17,6 @@
package org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.describe;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
......@@ -26,7 +24,7 @@ import java.util.Collection;
import java.util.Collections;
/**
* PostgreSQL command describe packet executor.
* Command describe packet executor for PostgreSQL.
*
* @author zhangyonglun
* @author zhangliang
......@@ -34,7 +32,7 @@ import java.util.Collections;
public final class PostgreSQLComDescribePacketExecutor implements CommandPacketExecutor<PostgreSQLPacket> {
@Override
public Collection<PostgreSQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
public Collection<PostgreSQLPacket> execute() {
return Collections.emptyList();
}
}
......@@ -22,11 +22,11 @@ import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.comma
import org.apache.shardingsphere.shardingproxy.transport.postgresql.payload.PostgreSQLPacketPayload;
/**
* PostgreSQL command execute packet.
* Command execute packet for PostgreSQL.
*
* @author zhangyonglun
*/
public final class PostgreSQLComExecutePacket implements PostgreSQLCommandPacket {
public final class PostgreSQLComExecutePacket extends PostgreSQLCommandPacket {
public PostgreSQLComExecutePacket(final PostgreSQLPacketPayload payload) {
payload.readInt4();
......
......@@ -17,8 +17,6 @@
package org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.execute;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
......@@ -26,7 +24,7 @@ import java.util.Collection;
import java.util.Collections;
/**
* PostgreSQL command execute packet executor.
* Command execute packet executor for PostgreSQL.
*
* @author zhangyonglun
* @author zhangliang
......@@ -34,7 +32,7 @@ import java.util.Collections;
public final class PostgreSQLComExecutePacketExecutor implements CommandPacketExecutor<PostgreSQLPacket> {
@Override
public Collection<PostgreSQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
public Collection<PostgreSQLPacket> execute() {
return Collections.emptyList();
}
}
......@@ -21,45 +21,41 @@ import lombok.Getter;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.constant.PostgreSQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.PostgreSQLCommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.PostgreSQLCommandPacketType;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.BinaryStatementRegistry;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.ConnectionScopeBinaryStatementRegistry;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementParameterType;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.payload.PostgreSQLPacketPayload;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* PostgreSQL command parse packet.
* Command parse packet for PostgreSQL.
*
* @author zhangyonglun
*/
@Getter
public final class PostgreSQLComParsePacket implements PostgreSQLCommandPacket {
public final class PostgreSQLComParsePacket extends PostgreSQLCommandPacket {
private final ConnectionScopeBinaryStatementRegistry binaryStatementRegistry;
private String statementId;
private final String statementId;
private final String sql;
private final List<PostgreSQLBinaryStatementParameterType> binaryStatementParameterTypes = new ArrayList<>(64);
private final List<PostgreSQLBinaryStatementParameterType> binaryStatementParameterTypes;
public PostgreSQLComParsePacket(final PostgreSQLPacketPayload payload, final int connectionId) {
binaryStatementRegistry = BinaryStatementRegistry.getInstance().get(connectionId);
public PostgreSQLComParsePacket(final PostgreSQLPacketPayload payload) {
payload.readInt4();
statementId = payload.readStringNul();
sql = alterSQLToJDBCStyle(payload.readStringNul());
if (!sql.isEmpty()) {
getParameterTypes(payload);
}
binaryStatementParameterTypes = sql.isEmpty() ? Collections.<PostgreSQLBinaryStatementParameterType>emptyList() : getParameterTypes(payload);
}
private void getParameterTypes(final PostgreSQLPacketPayload payload) {
private List<PostgreSQLBinaryStatementParameterType> getParameterTypes(final PostgreSQLPacketPayload payload) {
int parameterCount = payload.readInt2();
List<PostgreSQLBinaryStatementParameterType> result = new ArrayList<>(parameterCount);
for (int i = 0; i < parameterCount; i++) {
binaryStatementParameterTypes.add(new PostgreSQLBinaryStatementParameterType(PostgreSQLColumnType.valueOf(payload.readInt4())));
result.add(new PostgreSQLBinaryStatementParameterType(PostgreSQLColumnType.valueOf(payload.readInt4())));
}
return result;
}
private String alterSQLToJDBCStyle(final String sql) {
......
......@@ -25,9 +25,10 @@ import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connec
import org.apache.shardingsphere.shardingproxy.backend.schema.LogicSchema;
import org.apache.shardingsphere.shardingproxy.backend.schema.MasterSlaveSchema;
import org.apache.shardingsphere.shardingproxy.backend.schema.ShardingSchema;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.BinaryStatementRegistry;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.ConnectionScopeBinaryStatementRegistry;
import java.util.Collection;
import java.util.Collections;
......@@ -40,16 +41,26 @@ import java.util.Collections;
*/
public final class PostgreSQLComParsePacketExecutor implements CommandPacketExecutor<PostgreSQLPacket> {
private final PostgreSQLComParsePacket comParsePacket;
private final LogicSchema logicSchema;
private final ConnectionScopeBinaryStatementRegistry binaryStatementRegistry;
public PostgreSQLComParsePacketExecutor(final PostgreSQLComParsePacket comParsePacket, final BackendConnection backendConnection) {
this.comParsePacket = comParsePacket;
logicSchema = backendConnection.getLogicSchema();
binaryStatementRegistry = BinaryStatementRegistry.getInstance().get(backendConnection.getConnectionId());
}
@Override
public Collection<PostgreSQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
PostgreSQLComParsePacket comParsePacket = (PostgreSQLComParsePacket) commandPacket;
LogicSchema logicSchema = backendConnection.getLogicSchema();
public Collection<PostgreSQLPacket> execute() {
// TODO we should use none-sharding parsing engine in future.
SQLParsingEngine sqlParsingEngine = new SQLParsingEngine(DatabaseType.PostgreSQL, comParsePacket.getSql(), getShardingRule(logicSchema), logicSchema.getMetaData().getTable());
if (!comParsePacket.getSql().isEmpty()) {
SQLStatement sqlStatement = sqlParsingEngine.parse(true);
int parametersIndex = sqlStatement.getParametersIndex();
comParsePacket.getBinaryStatementRegistry().register(comParsePacket.getStatementId(), comParsePacket.getSql(), parametersIndex, comParsePacket.getBinaryStatementParameterTypes());
binaryStatementRegistry.register(comParsePacket.getStatementId(), comParsePacket.getSql(), parametersIndex, comParsePacket.getBinaryStatementParameterTypes());
}
return Collections.<PostgreSQLPacket>singletonList(new PostgreSQLParseCompletePacket());
}
......
......@@ -22,11 +22,11 @@ import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.comma
import org.apache.shardingsphere.shardingproxy.transport.postgresql.payload.PostgreSQLPacketPayload;
/**
* PostgreSQL command sync packet.
* Command sync packet for PostgreSQL.
*
* @author zhangyonglun
*/
public final class PostgreSQLComSyncPacket implements PostgreSQLCommandPacket {
public final class PostgreSQLComSyncPacket extends PostgreSQLCommandPacket {
public PostgreSQLComSyncPacket(final PostgreSQLPacketPayload payload) {
payload.readInt4();
......
......@@ -17,8 +17,6 @@
package org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.sync;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
......@@ -26,7 +24,7 @@ import java.util.Collection;
import java.util.Collections;
/**
* PostgreSQL command sync packet executor.
* Command sync packet executor for PostgreSQL.
*
* @author zhangyonglun
* @author zhangliang
......@@ -34,7 +32,7 @@ import java.util.Collections;
public final class PostgreSQLComSyncPacketExecutor implements CommandPacketExecutor<PostgreSQLPacket> {
@Override
public Collection<PostgreSQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
public Collection<PostgreSQLPacket> execute() {
return Collections.emptyList();
}
}
......@@ -23,12 +23,12 @@ import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.comma
import org.apache.shardingsphere.shardingproxy.transport.postgresql.payload.PostgreSQLPacketPayload;
/**
* PostgreSQL command query packet.
* Command query packet for PostgreSQL.
*
* @author zhangyonglun
*/
@Getter
public final class PostgreSQLComQueryPacket implements PostgreSQLCommandPacket {
public final class PostgreSQLComQueryPacket extends PostgreSQLCommandPacket {
private final String sql;
......
......@@ -27,7 +27,6 @@ import org.apache.shardingsphere.shardingproxy.backend.response.update.UpdateRes
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.shardingproxy.context.GlobalContext;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.QueryCommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.PostgreSQLColumnDescription;
......@@ -42,21 +41,23 @@ import java.util.LinkedList;
import java.util.List;
/**
* PostgreSQL command query packet executor.
* Command query packet executor for PostgreSQL.
*
* @author zhangyonglun
* @author zhangliang
*/
public final class PostgreSQLComQueryPacketExecutor implements QueryCommandPacketExecutor<PostgreSQLPacket> {
private TextProtocolBackendHandler textProtocolBackendHandler;
private final TextProtocolBackendHandler textProtocolBackendHandler;
private boolean isQuery;
private volatile boolean isQuery;
@Override
public Collection<PostgreSQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
PostgreSQLComQueryPacket comQueryPacket = (PostgreSQLComQueryPacket) commandPacket;
public PostgreSQLComQueryPacketExecutor(final PostgreSQLComQueryPacket comQueryPacket, final BackendConnection backendConnection) {
textProtocolBackendHandler = TextProtocolBackendHandlerFactory.newInstance(comQueryPacket.getSql(), backendConnection);
}
@Override
public Collection<PostgreSQLPacket> execute() {
if (GlobalContext.getInstance().isCircuitBreak()) {
return Collections.<PostgreSQLPacket>singletonList(new PostgreSQLErrorResponsePacket());
}
......
......@@ -22,11 +22,11 @@ import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.comma
import org.apache.shardingsphere.shardingproxy.transport.postgresql.payload.PostgreSQLPacketPayload;
/**
* PostgreSQL command termination packet.
* Command termination packet for PostgreSQL.
*
* @author zhangyonglun
*/
public final class PostgreSQLComTerminationPacket implements PostgreSQLCommandPacket {
public final class PostgreSQLComTerminationPacket extends PostgreSQLCommandPacket {
public PostgreSQLComTerminationPacket(final PostgreSQLPacketPayload payload) {
payload.readInt4();
......
......@@ -17,8 +17,6 @@
package org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.generic;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.CommandPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
......@@ -26,14 +24,14 @@ import java.util.Collection;
import java.util.Collections;
/**
* PostgreSQL command termination packet executor.
* Command termination packet executor for PostgreSQL.
*
* @author zhangliang
*/
public final class PostgreSQLComTerminationPacketExecutor implements CommandPacketExecutor<PostgreSQLPacket> {
@Override
public Collection<PostgreSQLPacket> execute(final BackendConnection backendConnection, final CommandPacket commandPacket) {
public Collection<PostgreSQLPacket> execute() {
return Collections.emptyList();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册