提交 b1c8e4af 编写于 作者: T terrymanu

for #1941, remove generic for CommandExecutor

上级 ca9f5a65
......@@ -37,7 +37,6 @@ import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacke
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.api.payload.PacketPayload;
import org.apache.shardingsphere.shardingproxy.transport.mysql.constant.MySQLServerErrorCode;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.MySQLCommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.MySQLCommandPacketFactory;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.MySQLCommandPacketType;
......@@ -67,7 +66,7 @@ import java.util.Collection;
@Slf4j
public final class MySQLFrontendEngine implements DatabaseFrontendEngine {
private final MySQLAuthenticationHandler mysqlAuthenticationHandler = new MySQLAuthenticationHandler();
private final MySQLAuthenticationHandler authenticationHandler = new MySQLAuthenticationHandler();
@Override
public PacketPayload createPacketPayload(final ByteBuf message) {
......@@ -88,14 +87,14 @@ public final class MySQLFrontendEngine implements DatabaseFrontendEngine {
public void handshake(final ChannelHandlerContext context, final BackendConnection backendConnection) {
int connectionId = MySQLConnectionIdGenerator.getInstance().nextId();
backendConnection.setConnectionId(connectionId);
context.writeAndFlush(new MySQLHandshakePacket(connectionId, mysqlAuthenticationHandler.getAuthPluginData()));
context.writeAndFlush(new MySQLHandshakePacket(connectionId, authenticationHandler.getAuthPluginData()));
}
@Override
public boolean auth(final ChannelHandlerContext context, final ByteBuf message, final BackendConnection backendConnection) {
try (MySQLPacketPayload payload = new MySQLPacketPayload(message)) {
MySQLHandshakeResponse41Packet response41 = new MySQLHandshakeResponse41Packet(payload);
if (mysqlAuthenticationHandler.login(response41.getUsername(), response41.getAuthResponse())) {
if (authenticationHandler.login(response41.getUsername(), response41.getAuthResponse())) {
if (!Strings.isNullOrEmpty(response41.getDatabase()) && !LogicSchemas.getInstance().schemaExists(response41.getDatabase())) {
context.writeAndFlush(new MySQLErrPacket(response41.getSequenceId() + 1, MySQLServerErrorCode.ER_BAD_DB_ERROR, response41.getDatabase()));
return true;
......@@ -122,7 +121,7 @@ public final class MySQLFrontendEngine implements DatabaseFrontendEngine {
}
@Override
public CommandExecutor<MySQLPacket> getCommandExecutor(final CommandPacketType type, final CommandPacket packet, final BackendConnection backendConnection) {
public CommandExecutor getCommandExecutor(final CommandPacketType type, final CommandPacket packet, final BackendConnection backendConnection) {
return MySQLCommandExecutorFactory.newInstance((MySQLCommandPacketType) type, packet, backendConnection);
}
......@@ -144,24 +143,24 @@ public final class MySQLFrontendEngine implements DatabaseFrontendEngine {
}
private void writePackets(final ChannelHandlerContext context, final MySQLPacketPayload payload, final BackendConnection backendConnection) throws SQLException {
MySQLCommandPacketType type = getCommandPacketType(payload);
MySQLCommandPacket commandPacket = getCommandPacket(payload, type, backendConnection);
CommandExecutor<MySQLPacket> commandExecutor = getCommandExecutor(type, commandPacket, backendConnection);
Collection<MySQLPacket> responsePackets = commandExecutor.execute();
CommandPacketType type = getCommandPacketType(payload);
CommandPacket commandPacket = getCommandPacket(payload, type, backendConnection);
CommandExecutor commandExecutor = getCommandExecutor(type, commandPacket, backendConnection);
Collection<DatabasePacket> responsePackets = commandExecutor.execute();
if (responsePackets.isEmpty()) {
return;
}
for (MySQLPacket each : responsePackets) {
for (DatabasePacket each : responsePackets) {
context.write(each);
}
if (commandExecutor instanceof QueryCommandExecutor) {
writeQueryData(context, backendConnection, (QueryCommandExecutor<MySQLPacket>) commandExecutor, responsePackets.size());
writeQueryData(context, backendConnection, (QueryCommandExecutor) commandExecutor, responsePackets.size());
}
}
@Override
public void writeQueryData(final ChannelHandlerContext context,
final BackendConnection backendConnection, final QueryCommandExecutor<?> queryCommandExecutor, final int sequenceIdOffset) throws SQLException {
final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) throws SQLException {
if (!queryCommandExecutor.isQuery() || !context.channel().isActive()) {
return;
}
......@@ -182,7 +181,7 @@ public final class MySQLFrontendEngine implements DatabaseFrontendEngine {
}
currentSequenceId++;
}
context.write(new MySQLEofPacket(++currentSequenceId + sequenceIdOffset));
context.write(new MySQLEofPacket(++currentSequenceId + headerPackagesCount));
}
@Override
......
......@@ -31,7 +31,6 @@ import org.apache.shardingsphere.shardingproxy.frontend.mysql.executor.query.bin
import org.apache.shardingsphere.shardingproxy.frontend.mysql.executor.query.text.fieldlist.MySQLComFieldListPacketExecutor;
import org.apache.shardingsphere.shardingproxy.frontend.mysql.executor.query.text.query.MySQLComQueryPacketExecutor;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.MySQLCommandPacketType;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
......@@ -56,7 +55,7 @@ public final class MySQLCommandExecutorFactory {
* @param backendConnection backend connection
* @return command executor
*/
public static CommandExecutor<MySQLPacket> newInstance(final MySQLCommandPacketType commandPacketType, final CommandPacket commandPacket, final BackendConnection backendConnection) {
public static CommandExecutor newInstance(final MySQLCommandPacketType commandPacketType, final CommandPacket commandPacket, final BackendConnection backendConnection) {
switch (commandPacketType) {
case COM_QUIT:
return new MySQLComQuitExecutor();
......
......@@ -21,8 +21,8 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.schema.LogicSchemas;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.constant.MySQLServerErrorCode;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.generic.MySQLErrPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.generic.MySQLOKPacket;
......@@ -36,18 +36,18 @@ import java.util.Collections;
* @author zhangliang
*/
@RequiredArgsConstructor
public final class MySQLComInitDbExecutor implements CommandExecutor<MySQLPacket> {
public final class MySQLComInitDbExecutor implements CommandExecutor {
private final MySQLComInitDbPacket packet;
private final BackendConnection backendConnection;
@Override
public Collection<MySQLPacket> execute() {
public Collection<DatabasePacket> execute() {
if (LogicSchemas.getInstance().schemaExists(packet.getSchema())) {
backendConnection.setCurrentSchema(packet.getSchema());
return Collections.<MySQLPacket>singletonList(new MySQLOKPacket(1));
return Collections.<DatabasePacket>singletonList(new MySQLOKPacket(1));
}
return Collections.<MySQLPacket>singletonList(new MySQLErrPacket(1, MySQLServerErrorCode.ER_BAD_DB_ERROR, packet.getSchema()));
return Collections.<DatabasePacket>singletonList(new MySQLErrPacket(1, MySQLServerErrorCode.ER_BAD_DB_ERROR, packet.getSchema()));
}
}
......@@ -17,9 +17,8 @@
package org.apache.shardingsphere.shardingproxy.frontend.mysql.executor.admin.ping;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.generic.MySQLOKPacket;
import java.util.Collection;
......@@ -30,11 +29,10 @@ import java.util.Collections;
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class MySQLComPingExecutor implements CommandExecutor<MySQLPacket> {
public final class MySQLComPingExecutor implements CommandExecutor {
@Override
public Collection<MySQLPacket> execute() {
return Collections.<MySQLPacket>singletonList(new MySQLOKPacket(1));
public Collection<DatabasePacket> execute() {
return Collections.<DatabasePacket>singletonList(new MySQLOKPacket(1));
}
}
......@@ -17,9 +17,8 @@
package org.apache.shardingsphere.shardingproxy.frontend.mysql.executor.admin.quit;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.generic.MySQLOKPacket;
import java.util.Collection;
......@@ -30,11 +29,10 @@ import java.util.Collections;
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class MySQLComQuitExecutor implements CommandExecutor<MySQLPacket> {
public final class MySQLComQuitExecutor implements CommandExecutor {
@Override
public Collection<MySQLPacket> execute() {
return Collections.<MySQLPacket>singletonList(new MySQLOKPacket(1));
public Collection<DatabasePacket> execute() {
return Collections.<DatabasePacket>singletonList(new MySQLOKPacket(1));
}
}
......@@ -20,7 +20,7 @@ package org.apache.shardingsphere.shardingproxy.frontend.mysql.executor.generic;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.shardingproxy.error.CommonErrorCode;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.MySQLCommandPacketType;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.generic.MySQLErrPacket;
......@@ -33,12 +33,12 @@ import java.util.Collections;
* @author zhangliang
*/
@RequiredArgsConstructor
public final class MySQLUnsupportedCommandExecutor implements CommandExecutor<MySQLPacket> {
public final class MySQLUnsupportedCommandExecutor implements CommandExecutor {
private final MySQLCommandPacketType type;
@Override
public Collection<MySQLPacket> execute() {
return Collections.<MySQLPacket>singletonList(new MySQLErrPacket(1, CommonErrorCode.UNSUPPORTED_COMMAND, type));
public Collection<DatabasePacket> execute() {
return Collections.<DatabasePacket>singletonList(new MySQLErrPacket(1, CommonErrorCode.UNSUPPORTED_COMMAND, type));
}
}
......@@ -19,7 +19,7 @@ package org.apache.shardingsphere.shardingproxy.frontend.mysql.executor.query.bi
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
import java.util.Collection;
......@@ -31,12 +31,12 @@ import java.util.Collections;
* @author zhangliang
*/
@RequiredArgsConstructor
public final class MySQLComStmtCloseExecutor implements CommandExecutor<MySQLPacket> {
public final class MySQLComStmtCloseExecutor implements CommandExecutor {
private final MySQLComStmtClosePacket packet;
@Override
public Collection<MySQLPacket> execute() {
public Collection<DatabasePacket> execute() {
packet.removeCachedStatement();
return Collections.emptyList();
}
......
......@@ -29,6 +29,7 @@ import org.apache.shardingsphere.shardingproxy.backend.response.update.UpdateRes
import org.apache.shardingsphere.shardingproxy.context.GlobalContext;
import org.apache.shardingsphere.shardingproxy.error.CommonErrorCode;
import org.apache.shardingsphere.shardingproxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.MySQLColumnDefinition41Packet;
......@@ -53,7 +54,7 @@ import java.util.List;
* @author zhangyonglun
* @author zhangliang
*/
public final class MySQLQueryComStmtExecuteExecutor implements QueryCommandExecutor<MySQLPacket> {
public final class MySQLQueryComStmtExecuteExecutor implements QueryCommandExecutor {
private final DatabaseCommunicationEngine databaseCommunicationEngine;
......@@ -67,16 +68,16 @@ public final class MySQLQueryComStmtExecuteExecutor implements QueryCommandExecu
}
@Override
public Collection<MySQLPacket> execute() {
public Collection<DatabasePacket> execute() {
if (GlobalContext.getInstance().isCircuitBreak()) {
return Collections.<MySQLPacket>singletonList(new MySQLErrPacket(1, CommonErrorCode.CIRCUIT_BREAK_MODE));
return Collections.<DatabasePacket>singletonList(new MySQLErrPacket(1, CommonErrorCode.CIRCUIT_BREAK_MODE));
}
BackendResponse backendResponse = databaseCommunicationEngine.execute();
if (backendResponse instanceof ErrorResponse) {
return Collections.<MySQLPacket>singletonList(createErrorPacket(((ErrorResponse) backendResponse).getCause()));
return Collections.<DatabasePacket>singletonList(createErrorPacket(((ErrorResponse) backendResponse).getCause()));
}
if (backendResponse instanceof UpdateResponse) {
return Collections.<MySQLPacket>singletonList(createUpdatePacket((UpdateResponse) backendResponse));
return Collections.<DatabasePacket>singletonList(createUpdatePacket((UpdateResponse) backendResponse));
}
isQuery = true;
return createQueryPacket((QueryResponse) backendResponse);
......@@ -90,8 +91,8 @@ public final class MySQLQueryComStmtExecuteExecutor implements QueryCommandExecu
return new MySQLOKPacket(1, updateResponse.getUpdateCount(), updateResponse.getLastInsertId());
}
private Collection<MySQLPacket> createQueryPacket(final QueryResponse backendResponse) {
Collection<MySQLPacket> result = new LinkedList<>();
private Collection<DatabasePacket> createQueryPacket(final QueryResponse backendResponse) {
Collection<DatabasePacket> result = new LinkedList<>();
List<QueryHeader> queryHeader = backendResponse.getQueryHeaders();
result.add(new MySQLFieldCountPacket(++currentSequenceId, queryHeader.size()));
for (QueryHeader each : queryHeader) {
......
......@@ -28,8 +28,8 @@ import org.apache.shardingsphere.shardingproxy.backend.schema.LogicSchemas;
import org.apache.shardingsphere.shardingproxy.backend.schema.MasterSlaveSchema;
import org.apache.shardingsphere.shardingproxy.backend.schema.ShardingSchema;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.MySQLColumnDefinition41Packet;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.MySQLBinaryStatementRegistry;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.prepare.MySQLComStmtPrepareOKPacket;
......@@ -45,7 +45,7 @@ import java.util.LinkedList;
* @author zhangyonglun
* @author zhangliang
*/
public final class MySQLComStmtPrepareExecutor implements CommandExecutor<MySQLPacket> {
public final class MySQLComStmtPrepareExecutor implements CommandExecutor {
private static final MySQLBinaryStatementRegistry PREPARED_STATEMENT_REGISTRY = MySQLBinaryStatementRegistry.getInstance();
......@@ -62,11 +62,11 @@ public final class MySQLComStmtPrepareExecutor implements CommandExecutor<MySQLP
}
@Override
public Collection<MySQLPacket> execute() {
public Collection<DatabasePacket> execute() {
// TODO we should use none-sharding parsing engine in future.
SQLParsingEngine sqlParsingEngine = new SQLParsingEngine(
LogicSchemas.getInstance().getDatabaseType(), packet.getSql(), getShardingRule(logicSchema), logicSchema.getMetaData().getTable());
Collection<MySQLPacket> result = new LinkedList<>();
Collection<DatabasePacket> result = new LinkedList<>();
int currentSequenceId = 0;
SQLStatement sqlStatement = sqlParsingEngine.parse(true);
int parametersIndex = sqlStatement.getParametersIndex();
......
......@@ -23,8 +23,8 @@ import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connec
import org.apache.shardingsphere.shardingproxy.backend.response.BackendResponse;
import org.apache.shardingsphere.shardingproxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.MySQLColumnDefinition41Packet;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.fieldlist.MySQLComFieldListPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.generic.MySQLEofPacket;
......@@ -40,7 +40,7 @@ import java.util.LinkedList;
*
* @author zhangliang
*/
public final class MySQLComFieldListPacketExecutor implements CommandExecutor<MySQLPacket> {
public final class MySQLComFieldListPacketExecutor implements CommandExecutor {
private static final String SQL = "SHOW COLUMNS FROM %s FROM %s";
......@@ -57,9 +57,9 @@ public final class MySQLComFieldListPacketExecutor implements CommandExecutor<My
}
@Override
public Collection<MySQLPacket> execute() throws SQLException {
public Collection<DatabasePacket> execute() throws SQLException {
BackendResponse backendResponse = databaseCommunicationEngine.execute();
return backendResponse instanceof ErrorResponse ? Collections.<MySQLPacket>singletonList(MySQLErrPacketFactory.newInstance(1, ((ErrorResponse) backendResponse).getCause()))
return backendResponse instanceof ErrorResponse ? Collections.<DatabasePacket>singletonList(MySQLErrPacketFactory.newInstance(1, ((ErrorResponse) backendResponse).getCause()))
: getColumnDefinition41Packets();
}
......@@ -67,8 +67,8 @@ public final class MySQLComFieldListPacketExecutor implements CommandExecutor<My
return String.format(SQL, packet.getTable(), schemaName);
}
private Collection<MySQLPacket> getColumnDefinition41Packets() throws SQLException {
Collection<MySQLPacket> result = new LinkedList<>();
private Collection<DatabasePacket> getColumnDefinition41Packets() throws SQLException {
Collection<DatabasePacket> result = new LinkedList<>();
int currentSequenceId = 0;
while (databaseCommunicationEngine.next()) {
String columnName = databaseCommunicationEngine.getQueryData().getData().get(0).toString();
......
......@@ -28,6 +28,7 @@ import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendH
import org.apache.shardingsphere.shardingproxy.context.GlobalContext;
import org.apache.shardingsphere.shardingproxy.error.CommonErrorCode;
import org.apache.shardingsphere.shardingproxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.MySQLColumnDefinition41Packet;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.MySQLFieldCountPacket;
......@@ -49,7 +50,7 @@ import java.util.List;
*
* @author zhangliang
*/
public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor<MySQLPacket> {
public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
private final TextProtocolBackendHandler textProtocolBackendHandler;
......@@ -62,16 +63,16 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor<M
}
@Override
public Collection<MySQLPacket> execute() {
public Collection<DatabasePacket> execute() {
if (GlobalContext.getInstance().isCircuitBreak()) {
return Collections.<MySQLPacket>singletonList(new MySQLErrPacket(1, CommonErrorCode.CIRCUIT_BREAK_MODE));
return Collections.<DatabasePacket>singletonList(new MySQLErrPacket(1, CommonErrorCode.CIRCUIT_BREAK_MODE));
}
BackendResponse backendResponse = textProtocolBackendHandler.execute();
if (backendResponse instanceof ErrorResponse) {
return Collections.<MySQLPacket>singletonList(createErrorPacket(((ErrorResponse) backendResponse).getCause()));
return Collections.<DatabasePacket>singletonList(createErrorPacket(((ErrorResponse) backendResponse).getCause()));
}
if (backendResponse instanceof UpdateResponse) {
return Collections.<MySQLPacket>singletonList(createUpdatePacket((UpdateResponse) backendResponse));
return Collections.<DatabasePacket>singletonList(createUpdatePacket((UpdateResponse) backendResponse));
}
isQuery = true;
return createQueryPackets((QueryResponse) backendResponse);
......@@ -85,8 +86,8 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor<M
return new MySQLOKPacket(1, updateResponse.getUpdateCount(), updateResponse.getLastInsertId());
}
private Collection<MySQLPacket> createQueryPackets(final QueryResponse backendResponse) {
Collection<MySQLPacket> result = new LinkedList<>();
private Collection<DatabasePacket> createQueryPackets(final QueryResponse backendResponse) {
Collection<DatabasePacket> result = new LinkedList<>();
List<QueryHeader> queryHeader = backendResponse.getQueryHeaders();
result.add(new MySQLFieldCountPacket(++currentSequenceId, queryHeader.size()));
for (QueryHeader each : queryHeader) {
......
......@@ -35,7 +35,6 @@ import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacke
import org.apache.shardingsphere.shardingproxy.transport.api.packet.CommandPacketType;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.api.payload.PacketPayload;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.PostgreSQLCommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.PostgreSQLCommandPacketFactory;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.PostgreSQLCommandPacketType;
......@@ -129,7 +128,7 @@ public final class PostgreSQLFrontendEngine implements DatabaseFrontendEngine {
}
@Override
public CommandExecutor<PostgreSQLPacket> getCommandExecutor(final CommandPacketType type, final CommandPacket packet, final BackendConnection backendConnection) {
public CommandExecutor getCommandExecutor(final CommandPacketType type, final CommandPacket packet, final BackendConnection backendConnection) {
return PostgreSQLCommandExecutorFactory.newInstance((PostgreSQLCommandPacketType) type, (PostgreSQLCommandPacket) packet, backendConnection);
}
......@@ -148,10 +147,10 @@ public final class PostgreSQLFrontendEngine implements DatabaseFrontendEngine {
}
private void writePackets(final ChannelHandlerContext context, final PostgreSQLPacketPayload payload, final BackendConnection backendConnection) throws SQLException {
PostgreSQLCommandPacketType type = getCommandPacketType(payload);
PostgreSQLCommandPacket commandPacket = getCommandPacket(payload, type, backendConnection);
CommandExecutor<PostgreSQLPacket> commandExecutor = getCommandExecutor(type, commandPacket, backendConnection);
Collection<PostgreSQLPacket> responsePackets = commandExecutor.execute();
CommandPacketType type = getCommandPacketType(payload);
CommandPacket commandPacket = getCommandPacket(payload, type, backendConnection);
CommandExecutor commandExecutor = getCommandExecutor(type, commandPacket, backendConnection);
Collection<DatabasePacket> responsePackets = commandExecutor.execute();
if (commandPacket instanceof PostgreSQLComSyncPacket) {
context.write(new PostgreSQLCommandCompletePacket());
context.writeAndFlush(new PostgreSQLReadyForQueryPacket());
......@@ -160,11 +159,11 @@ public final class PostgreSQLFrontendEngine implements DatabaseFrontendEngine {
if (responsePackets.isEmpty()) {
return;
}
for (PostgreSQLPacket each : responsePackets) {
for (DatabasePacket each : responsePackets) {
context.write(each);
}
if (commandExecutor instanceof QueryCommandExecutor) {
writeQueryData(context, backendConnection, (QueryCommandExecutor<PostgreSQLPacket>) commandExecutor, 0);
writeQueryData(context, backendConnection, (QueryCommandExecutor) commandExecutor, responsePackets.size());
}
if (commandPacket instanceof PostgreSQLComQueryPacket) {
context.write(new PostgreSQLCommandCompletePacket());
......@@ -174,7 +173,7 @@ public final class PostgreSQLFrontendEngine implements DatabaseFrontendEngine {
@Override
public void writeQueryData(final ChannelHandlerContext context,
final BackendConnection backendConnection, final QueryCommandExecutor<?> queryCommandExecutor, final int sequenceIdOffset) throws SQLException {
final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) throws SQLException {
if (queryCommandExecutor.isQuery() && !context.channel().isActive()) {
return;
}
......
......@@ -29,7 +29,6 @@ import org.apache.shardingsphere.shardingproxy.frontend.postgresql.executor.quer
import org.apache.shardingsphere.shardingproxy.frontend.postgresql.executor.query.binary.parse.PostgreSQLComParseExecutor;
import org.apache.shardingsphere.shardingproxy.frontend.postgresql.executor.query.binary.sync.PostgreSQLComSyncExecutor;
import org.apache.shardingsphere.shardingproxy.frontend.postgresql.executor.query.text.PostgreSQLComQueryExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.PostgreSQLCommandPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.PostgreSQLCommandPacketType;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
......@@ -52,8 +51,7 @@ public final class PostgreSQLCommandExecutorFactory {
* @param backendConnection backend connection
* @return command executor
*/
public static CommandExecutor<PostgreSQLPacket> newInstance(
final PostgreSQLCommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket, final BackendConnection backendConnection) {
public static CommandExecutor newInstance(final PostgreSQLCommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket, final BackendConnection backendConnection) {
switch (commandPacketType) {
case QUERY:
return new PostgreSQLComQueryExecutor((PostgreSQLComQueryPacket) commandPacket, backendConnection);
......
......@@ -18,7 +18,7 @@
package org.apache.shardingsphere.shardingproxy.frontend.postgresql.executor.generic;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import java.util.Collection;
import java.util.Collections;
......@@ -28,10 +28,10 @@ import java.util.Collections;
*
* @author zhangliang
*/
public final class PostgreSQLComTerminationExecutor implements CommandExecutor<PostgreSQLPacket> {
public final class PostgreSQLComTerminationExecutor implements CommandExecutor {
@Override
public Collection<PostgreSQLPacket> execute() {
public Collection<DatabasePacket> execute() {
return Collections.emptyList();
}
}
......@@ -18,7 +18,7 @@
package org.apache.shardingsphere.shardingproxy.frontend.postgresql.executor.generic;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import java.util.Collection;
......@@ -30,10 +30,10 @@ import java.util.Collections;
* @author zhangyonglun
* @author zhangliang
*/
public final class PostgreSQLUnsupportedCommandExecutor implements CommandExecutor<PostgreSQLPacket> {
public final class PostgreSQLUnsupportedCommandExecutor implements CommandExecutor {
@Override
public Collection<PostgreSQLPacket> execute() {
return Collections.<PostgreSQLPacket>singletonList(new PostgreSQLErrorResponsePacket());
public Collection<DatabasePacket> execute() {
return Collections.<DatabasePacket>singletonList(new PostgreSQLErrorResponsePacket());
}
}
......@@ -29,6 +29,7 @@ import org.apache.shardingsphere.shardingproxy.backend.response.query.QueryRespo
import org.apache.shardingsphere.shardingproxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.shardingproxy.context.GlobalContext;
import org.apache.shardingsphere.shardingproxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.constant.PostgreSQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.PostgreSQLColumnDescription;
......@@ -53,7 +54,7 @@ import java.util.List;
* @author zhangyonglun
* @author zhangliang
*/
public final class PostgreSQLComBindExecutor implements QueryCommandExecutor<PostgreSQLPacket> {
public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
private final PostgreSQLComBindPacket packet;
......@@ -68,11 +69,11 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor<Pos
}
@Override
public Collection<PostgreSQLPacket> execute() {
public Collection<DatabasePacket> execute() {
if (GlobalContext.getInstance().isCircuitBreak()) {
return Collections.<PostgreSQLPacket>singletonList(new PostgreSQLErrorResponsePacket());
return Collections.<DatabasePacket>singletonList(new PostgreSQLErrorResponsePacket());
}
List<PostgreSQLPacket> result = new LinkedList<>();
List<DatabasePacket> result = new LinkedList<>();
result.add(new PostgreSQLBindCompletePacket());
if (null == databaseCommunicationEngine) {
return result;
......
......@@ -18,7 +18,7 @@
package org.apache.shardingsphere.shardingproxy.frontend.postgresql.executor.query.binary.describe;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import java.util.Collection;
import java.util.Collections;
......@@ -29,10 +29,10 @@ import java.util.Collections;
* @author zhangyonglun
* @author zhangliang
*/
public final class PostgreSQLComDescribeExecutor implements CommandExecutor<PostgreSQLPacket> {
public final class PostgreSQLComDescribeExecutor implements CommandExecutor {
@Override
public Collection<PostgreSQLPacket> execute() {
public Collection<DatabasePacket> execute() {
return Collections.emptyList();
}
}
......@@ -18,7 +18,7 @@
package org.apache.shardingsphere.shardingproxy.frontend.postgresql.executor.query.binary.execute;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import java.util.Collection;
import java.util.Collections;
......@@ -29,10 +29,10 @@ import java.util.Collections;
* @author zhangyonglun
* @author zhangliang
*/
public final class PostgreSQLComExecuteExecutor implements CommandExecutor<PostgreSQLPacket> {
public final class PostgreSQLComExecuteExecutor implements CommandExecutor {
@Override
public Collection<PostgreSQLPacket> execute() {
public Collection<DatabasePacket> execute() {
return Collections.emptyList();
}
}
......@@ -26,7 +26,7 @@ import org.apache.shardingsphere.shardingproxy.backend.schema.LogicSchema;
import org.apache.shardingsphere.shardingproxy.backend.schema.MasterSlaveSchema;
import org.apache.shardingsphere.shardingproxy.backend.schema.ShardingSchema;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.BinaryStatementRegistry;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.ConnectionScopeBinaryStatementRegistry;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.parse.PostgreSQLComParsePacket;
......@@ -41,7 +41,7 @@ import java.util.Collections;
* @author zhangyonglun
* @author zhangliang
*/
public final class PostgreSQLComParseExecutor implements CommandExecutor<PostgreSQLPacket> {
public final class PostgreSQLComParseExecutor implements CommandExecutor {
private final PostgreSQLComParsePacket packet;
......@@ -56,7 +56,7 @@ public final class PostgreSQLComParseExecutor implements CommandExecutor<Postgre
}
@Override
public Collection<PostgreSQLPacket> execute() {
public Collection<DatabasePacket> execute() {
// TODO we should use none-sharding parsing engine in future.
SQLParsingEngine sqlParsingEngine = new SQLParsingEngine(DatabaseType.PostgreSQL, packet.getSql(), getShardingRule(logicSchema), logicSchema.getMetaData().getTable());
if (!packet.getSql().isEmpty()) {
......@@ -64,7 +64,7 @@ public final class PostgreSQLComParseExecutor implements CommandExecutor<Postgre
int parametersIndex = sqlStatement.getParametersIndex();
binaryStatementRegistry.register(packet.getStatementId(), packet.getSql(), parametersIndex, packet.getBinaryStatementParameterTypes());
}
return Collections.<PostgreSQLPacket>singletonList(new PostgreSQLParseCompletePacket());
return Collections.<DatabasePacket>singletonList(new PostgreSQLParseCompletePacket());
}
private ShardingRule getShardingRule(final LogicSchema logicSchema) {
......
......@@ -18,7 +18,7 @@
package org.apache.shardingsphere.shardingproxy.frontend.postgresql.executor.query.binary.sync;
import org.apache.shardingsphere.shardingproxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import java.util.Collection;
import java.util.Collections;
......@@ -29,10 +29,10 @@ import java.util.Collections;
* @author zhangyonglun
* @author zhangliang
*/
public final class PostgreSQLComSyncExecutor implements CommandExecutor<PostgreSQLPacket> {
public final class PostgreSQLComSyncExecutor implements CommandExecutor {
@Override
public Collection<PostgreSQLPacket> execute() {
public Collection<DatabasePacket> execute() {
return Collections.emptyList();
}
}
......@@ -28,6 +28,7 @@ import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendH
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.shardingproxy.context.GlobalContext;
import org.apache.shardingsphere.shardingproxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.api.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.PostgreSQLColumnDescription;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
......@@ -48,7 +49,7 @@ import java.util.List;
* @author zhangyonglun
* @author zhangliang
*/
public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor<PostgreSQLPacket> {
public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
private final TextProtocolBackendHandler textProtocolBackendHandler;
......@@ -59,19 +60,19 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor<Po
}
@Override
public Collection<PostgreSQLPacket> execute() {
public Collection<DatabasePacket> execute() {
if (GlobalContext.getInstance().isCircuitBreak()) {
return Collections.<PostgreSQLPacket>singletonList(new PostgreSQLErrorResponsePacket());
return Collections.<DatabasePacket>singletonList(new PostgreSQLErrorResponsePacket());
}
BackendResponse backendResponse = textProtocolBackendHandler.execute();
if (backendResponse instanceof ErrorResponse) {
return Collections.<PostgreSQLPacket>singletonList(createErrorPacket((ErrorResponse) backendResponse));
return Collections.<DatabasePacket>singletonList(createErrorPacket((ErrorResponse) backendResponse));
}
if (backendResponse instanceof UpdateResponse) {
return Collections.<PostgreSQLPacket>singletonList(createUpdatePacket((UpdateResponse) backendResponse));
return Collections.<DatabasePacket>singletonList(createUpdatePacket((UpdateResponse) backendResponse));
}
Optional<PostgreSQLRowDescriptionPacket> result = createQueryPacket((QueryResponse) backendResponse);
return result.isPresent() ? Collections.<PostgreSQLPacket>singletonList(result.get()) : Collections.<PostgreSQLPacket>emptyList();
return result.isPresent() ? Collections.<DatabasePacket>singletonList(result.get()) : Collections.<DatabasePacket>emptyList();
}
private PostgreSQLErrorResponsePacket createErrorPacket(final ErrorResponse errorResponse) {
......
......@@ -26,10 +26,8 @@ import java.util.Collection;
* Command executor.
*
* @author zhangliang
*
* @param <T> type of database packet
*/
public interface CommandExecutor<T extends DatabasePacket> {
public interface CommandExecutor {
/**
* Execute command.
......@@ -37,5 +35,5 @@ public interface CommandExecutor<T extends DatabasePacket> {
* @return database packets to be sent
* @throws SQLException SQL exception
*/
Collection<T> execute() throws SQLException;
Collection<DatabasePacket> execute() throws SQLException;
}
......@@ -25,10 +25,8 @@ import java.sql.SQLException;
* Query command executor.
*
* @author zhangliang
*
* @param <T> type of database packet
*/
public interface QueryCommandExecutor<T extends DatabasePacket> extends CommandExecutor<T> {
public interface QueryCommandExecutor extends CommandExecutor {
/**
* Judge is query SQL or not.
......@@ -51,5 +49,5 @@ public interface QueryCommandExecutor<T extends DatabasePacket> extends CommandE
* @return database packet of query data
* @throws SQLException SQL exception
*/
T getQueryData() throws SQLException;
DatabasePacket getQueryData() throws SQLException;
}
......@@ -115,13 +115,14 @@ public interface DatabaseFrontendEngine {
/**
* Write query data.
*
* @param context channel handler context
* @param backendConnection backend connection
* @param queryCommandExecutor query command executor
* @param sequenceIdOffset sequence ID offset
* @param headerPackagesCount count of header packages
* @throws SQLException SQL exception
*/
void writeQueryData(ChannelHandlerContext context, BackendConnection backendConnection, QueryCommandExecutor<?> queryCommandExecutor, int sequenceIdOffset) throws SQLException;
void writeQueryData(ChannelHandlerContext context, BackendConnection backendConnection, QueryCommandExecutor queryCommandExecutor, int headerPackagesCount) throws SQLException;
/**
* Release resource.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册