提交 ef553f6b 编写于 作者: T terrymanu

for #1898, merge ExecuteResponse & BackendResponse

上级 5b247bad
......@@ -32,15 +32,13 @@ import org.apache.shardingsphere.shardingproxy.backend.communication.DatabaseCom
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.ConnectionStatus;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.JDBCExecuteEngine;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteQueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteResponse;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteUpdateResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.BackendResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.common.FailureResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.common.SuccessResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.update.UpdateResponse;
import org.apache.shardingsphere.shardingproxy.runtime.GlobalRegistry;
import org.apache.shardingsphere.shardingproxy.runtime.schema.LogicSchema;
import org.apache.shardingsphere.shardingproxy.runtime.schema.ShardingSchema;
......@@ -71,7 +69,7 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
private final DatabaseType databaseType = GlobalRegistry.getInstance().getDatabaseType();
private ExecuteResponse executeResponse;
private BackendResponse response;
private MergedResult mergedResult;
......@@ -94,7 +92,7 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
return new FailureResponse(
MySQLServerErrorCode.ER_ERROR_ON_MODIFYING_GTID_EXECUTED_TABLE, sqlStatement.getTables().isSingleTable() ? sqlStatement.getTables().getSingleTableName() : "unknown_table");
}
executeResponse = executeEngine.execute(routeResult);
response = executeEngine.execute(routeResult);
if (logicSchema instanceof ShardingSchema) {
logicSchema.refreshTableMetaData(routeResult.getSqlStatement());
}
......@@ -112,15 +110,15 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
}
private BackendResponse merge(final SQLStatement sqlStatement) throws SQLException {
if (executeResponse instanceof ExecuteUpdateResponse) {
return ((ExecuteUpdateResponse) executeResponse).getBackendResponse(!isAllBroadcastTables(sqlStatement));
if (response instanceof UpdateResponse) {
return ((UpdateResponse) response).getResponse(!isAllBroadcastTables(sqlStatement));
}
mergedResult = MergeEngineFactory.newInstance(
databaseType, getShardingRule(), sqlStatement, logicSchema.getMetaData().getTable(), ((ExecuteQueryResponse) executeResponse).getQueryResults()).merge();
databaseType, getShardingRule(), sqlStatement, logicSchema.getMetaData().getTable(), ((QueryResponse) response).getQueryResults()).merge();
if (mergedResult instanceof ShowTablesMergedResult) {
((ShowTablesMergedResult) mergedResult).resetColumnLabel(logicSchema.getName());
}
return getQueryHeaderResponseWithoutDerivedColumns(((ExecuteQueryResponse) executeResponse).getQueryHeaders());
return getQueryHeaderResponseWithoutDerivedColumns(((QueryResponse) response).getQueryHeaders());
}
private boolean isAllBroadcastTables(final SQLStatement sqlStatement) {
......@@ -131,7 +129,7 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
return logicSchema instanceof ShardingSchema ? ((ShardingSchema) logicSchema).getShardingRule() : new ShardingRule(new ShardingRuleConfiguration(), logicSchema.getDataSources().keySet());
}
private QueryHeaderResponse getQueryHeaderResponseWithoutDerivedColumns(final List<QueryHeader> queryHeaders) {
private QueryResponse getQueryHeaderResponseWithoutDerivedColumns(final List<QueryHeader> queryHeaders) {
List<QueryHeader> derivedColumnQueryHeaders = new LinkedList<>();
for (QueryHeader each : queryHeaders) {
if (DerivedColumn.isDerivedColumn(each.getColumnLabel())) {
......@@ -139,7 +137,7 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
}
}
queryHeaders.removeAll(derivedColumnQueryHeaders);
return new QueryHeaderResponse(queryHeaders);
return new QueryResponse(queryHeaders);
}
@Override
......@@ -149,7 +147,7 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
@Override
public QueryData getQueryData() throws SQLException {
List<QueryHeader> queryHeaders = ((ExecuteQueryResponse) executeResponse).getQueryHeaders();
List<QueryHeader> queryHeaders = ((QueryResponse) response).getQueryHeaders();
List<Object> row = new ArrayList<>(queryHeaders.size());
for (int columnIndex = 1; columnIndex <= queryHeaders.size(); columnIndex++) {
row.add(mergedResult.getValue(columnIndex, Object.class));
......
......@@ -29,13 +29,13 @@ import org.apache.shardingsphere.core.routing.SQLRouteResult;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.callback.ProxyJDBCExecutePrepareCallback;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.callback.ProxySQLExecuteCallback;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteQueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteResponse;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteUpdateResponse;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit.ExecuteQueryResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit.ExecuteResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteQueryResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.wrapper.JDBCExecutorWrapper;
import org.apache.shardingsphere.shardingproxy.backend.result.BackendResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.update.UpdateResponse;
import org.apache.shardingsphere.shardingproxy.runtime.ExecutorContext;
import org.apache.shardingsphere.shardingproxy.runtime.GlobalRegistry;
......@@ -72,7 +72,7 @@ public final class JDBCExecuteEngine implements SQLExecuteEngine {
@SuppressWarnings("unchecked")
@Override
public ExecuteResponse execute(final SQLRouteResult routeResult) throws SQLException {
public BackendResponse execute(final SQLRouteResult routeResult) throws SQLException {
boolean isReturnGeneratedKeys = routeResult.getSqlStatement() instanceof InsertStatement;
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
Collection<ShardingExecuteGroup<StatementExecuteUnit>> sqlExecuteGroups = sqlExecutePrepareTemplate.getExecuteUnitGroups(
......@@ -82,11 +82,11 @@ public final class JDBCExecuteEngine implements SQLExecuteEngine {
new ProxySQLExecuteCallback(backendConnection, jdbcExecutorWrapper, isExceptionThrown, isReturnGeneratedKeys, false));
ExecuteResponseUnit firstExecuteResponseUnit = executeResponseUnits.iterator().next();
return firstExecuteResponseUnit instanceof ExecuteQueryResponseUnit
? getExecuteQueryResponse(((ExecuteQueryResponseUnit) firstExecuteResponseUnit).getQueryHeaders(), executeResponseUnits) : new ExecuteUpdateResponse(executeResponseUnits);
? getExecuteQueryResponse(((ExecuteQueryResponseUnit) firstExecuteResponseUnit).getQueryHeaders(), executeResponseUnits) : new UpdateResponse(executeResponseUnits);
}
private ExecuteResponse getExecuteQueryResponse(final List<QueryHeader> queryHeaders, final Collection<ExecuteResponseUnit> executeResponseUnits) {
ExecuteQueryResponse result = new ExecuteQueryResponse(queryHeaders);
private BackendResponse getExecuteQueryResponse(final List<QueryHeader> queryHeaders, final Collection<ExecuteResponseUnit> executeResponseUnits) {
QueryResponse result = new QueryResponse(queryHeaders);
for (ExecuteResponseUnit each : executeResponseUnits) {
result.getQueryResults().add(((ExecuteQueryResponseUnit) each).getQueryResult());
}
......
......@@ -18,7 +18,7 @@
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute;
import org.apache.shardingsphere.core.routing.SQLRouteResult;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.BackendResponse;
import java.sql.SQLException;
......@@ -36,5 +36,5 @@ public interface SQLExecuteEngine {
* @return execute response
* @throws SQLException SQL exception
*/
ExecuteResponse execute(SQLRouteResult routeResult) throws SQLException;
BackendResponse execute(SQLRouteResult routeResult) throws SQLException;
}
......@@ -25,9 +25,9 @@ import org.apache.shardingsphere.core.merger.QueryResult;
import org.apache.shardingsphere.core.routing.RouteUnit;
import org.apache.shardingsphere.core.rule.ShardingRule;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit.ExecuteQueryResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit.ExecuteResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit.ExecuteUpdateResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteQueryResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteUpdateResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.wrapper.JDBCExecutorWrapper;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import org.apache.shardingsphere.shardingproxy.runtime.GlobalRegistry;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.core.merger.QueryResult;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import java.util.LinkedList;
import java.util.List;
/**
* Execute query response.
*
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
public final class ExecuteQueryResponse implements ExecuteResponse {
private final List<QueryHeader> queryHeaders;
private final List<QueryResult> queryResults = new LinkedList<>();
}
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit;
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response;
/**
* Execute response.
*
* @author zhangliang
*/
public interface ExecuteResponse {
}
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit;
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response;
/**
* Execute response unit.
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit;
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
......
......@@ -19,18 +19,22 @@ package org.apache.shardingsphere.shardingproxy.backend.result.query;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.core.merger.QueryResult;
import org.apache.shardingsphere.shardingproxy.backend.result.BackendResponse;
import java.util.LinkedList;
import java.util.List;
/**
* Query header response.
* Query response.
*
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
public final class QueryHeaderResponse implements BackendResponse {
public final class QueryResponse implements BackendResponse {
private final List<QueryHeader> queryHeaders;
private final List<QueryResult> queryResults = new LinkedList<>();
}
......@@ -15,10 +15,10 @@
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response;
package org.apache.shardingsphere.shardingproxy.backend.result.update;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit.ExecuteResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.unit.ExecuteUpdateResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.response.ExecuteUpdateResponseUnit;
import org.apache.shardingsphere.shardingproxy.backend.result.BackendResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.common.SuccessResponse;
......@@ -27,17 +27,17 @@ import java.util.LinkedList;
import java.util.List;
/**
* Execute update response.
* Update response.
*
* @author zhangliang
*/
public final class ExecuteUpdateResponse implements ExecuteResponse {
public final class UpdateResponse implements BackendResponse {
private final List<Integer> updateCounts = new LinkedList<>();
private final List<Long> lastInsertIds = new LinkedList<>();
public ExecuteUpdateResponse(final Collection<ExecuteResponseUnit> responseUnits) {
public UpdateResponse(final Collection<ExecuteResponseUnit> responseUnits) {
for (ExecuteResponseUnit each : responseUnits) {
updateCounts.add(((ExecuteUpdateResponseUnit) each).getUpdateCount());
lastInsertIds.add(((ExecuteUpdateResponseUnit) each).getLastInsertId());
......@@ -45,12 +45,12 @@ public final class ExecuteUpdateResponse implements ExecuteResponse {
}
/**
* Get backend response.
* Get response.
*
* @param isMerge is need merge
* @return backend response
*/
public BackendResponse getBackendResponse(final boolean isMerge) {
public BackendResponse getResponse(final boolean isMerge) {
return isMerge ? new SuccessResponse(mergeUpdateCount(), mergeLastInsertId()) : new SuccessResponse(updateCounts.get(0), lastInsertIds.get(0));
}
......
......@@ -22,7 +22,7 @@ import org.apache.shardingsphere.core.merger.dal.show.ShowDatabasesMergedResult;
import org.apache.shardingsphere.shardingproxy.backend.result.BackendResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.shardingproxy.runtime.GlobalRegistry;
......@@ -43,7 +43,7 @@ public final class ShowDatabasesBackendHandler implements TextProtocolBackendHan
@Override
public BackendResponse execute() {
mergedResult = new ShowDatabasesMergedResult(GlobalRegistry.getInstance().getSchemaNames());
return new QueryHeaderResponse(Collections.singletonList(new QueryHeader("", "", "", "Database", 100, Types.VARCHAR, 0)));
return new QueryResponse(Collections.singletonList(new QueryHeader("", "", "", "Database", 100, Types.VARCHAR, 0)));
}
@Override
......
......@@ -25,7 +25,7 @@ import org.apache.shardingsphere.shardingproxy.backend.result.BackendResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.common.FailureResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandler;
import java.sql.SQLException;
......@@ -69,7 +69,7 @@ public final class ShardingCTLShowBackendHandler implements TextProtocolBackendH
private BackendResponse createResponsePackets(final String columnName, final Object... values) {
mergedResult = new ShowShardingCTLMergedResult(Arrays.asList(values));
return new QueryHeaderResponse(Collections.singletonList(new QueryHeader("", "", columnName, columnName, 100, Types.VARCHAR, 0)));
return new QueryResponse(Collections.singletonList(new QueryHeader("", "", columnName, columnName, 100, Types.VARCHAR, 0)));
}
@Override
......
......@@ -29,7 +29,7 @@ import org.apache.shardingsphere.shardingproxy.backend.result.common.FailureResp
import org.apache.shardingsphere.shardingproxy.backend.result.common.SuccessResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.runtime.GlobalRegistry;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.command.CommandResponsePackets;
......@@ -162,7 +162,7 @@ public final class MySQLQueryComStmtExecutePacket implements MySQLQueryCommandPa
if (backendResponse instanceof FailureResponse) {
return Optional.of(new CommandResponsePackets(createDatabaseFailurePacket((FailureResponse) backendResponse)));
}
Collection<DataHeaderPacket> dataHeaderPackets = createDataHeaderPackets(((QueryHeaderResponse) backendResponse).getQueryHeaders());
Collection<DataHeaderPacket> dataHeaderPackets = createDataHeaderPackets(((QueryResponse) backendResponse).getQueryHeaders());
dataHeaderEofSequenceId = dataHeaderPackets.size() + 2;
return Optional.<CommandResponsePackets>of(new QueryResponsePackets(dataHeaderPackets, dataHeaderEofSequenceId));
}
......
......@@ -26,7 +26,7 @@ import org.apache.shardingsphere.shardingproxy.backend.result.common.FailureResp
import org.apache.shardingsphere.shardingproxy.backend.result.common.SuccessResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.shardingproxy.runtime.GlobalRegistry;
......@@ -100,7 +100,7 @@ public final class MySQLComPacketQuery implements MySQLQueryCommandPacket {
if (backendResponse instanceof FailureResponse) {
return Optional.of(new CommandResponsePackets(createDatabaseFailurePacket((FailureResponse) backendResponse)));
}
Collection<DataHeaderPacket> dataHeaderPackets = createDataHeaderPackets((QueryHeaderResponse) backendResponse);
Collection<DataHeaderPacket> dataHeaderPackets = createDataHeaderPackets((QueryResponse) backendResponse);
dataHeaderEofSequenceId = dataHeaderPackets.size() + 2;
return Optional.<CommandResponsePackets>of(new QueryResponsePackets(dataHeaderPackets, dataHeaderEofSequenceId));
}
......@@ -113,10 +113,10 @@ public final class MySQLComPacketQuery implements MySQLQueryCommandPacket {
return new DatabaseFailurePacket(1, failureResponse.getErrorCode(), failureResponse.getSqlState(), failureResponse.getErrorMessage());
}
private Collection<DataHeaderPacket> createDataHeaderPackets(final QueryHeaderResponse queryHeaderResponse) {
private Collection<DataHeaderPacket> createDataHeaderPackets(final QueryResponse queryResponse) {
Collection<DataHeaderPacket> result = new LinkedList<>();
int sequenceId = 1;
for (QueryHeader each : queryHeaderResponse.getQueryHeaders()) {
for (QueryHeader each : queryResponse.getQueryHeaders()) {
result.add(new DataHeaderPacket(
++sequenceId, each.getSchema(), each.getTable(), each.getTable(), each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), each.getColumnType(), each.getDecimals()));
}
......
......@@ -28,7 +28,7 @@ import org.apache.shardingsphere.shardingproxy.backend.result.common.FailureResp
import org.apache.shardingsphere.shardingproxy.backend.result.common.SuccessResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.runtime.GlobalRegistry;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.command.CommandResponsePackets;
......@@ -122,7 +122,7 @@ public final class PostgreSQLComBindPacket implements PostgreSQLQueryCommandPack
if (backendResponse instanceof FailureResponse) {
return Optional.of(new CommandResponsePackets(createDatabaseFailurePacket((FailureResponse) backendResponse)));
}
Collection<DataHeaderPacket> dataHeaderPackets = createDataHeaderPackets(((QueryHeaderResponse) backendResponse).getQueryHeaders());
Collection<DataHeaderPacket> dataHeaderPackets = createDataHeaderPackets(((QueryResponse) backendResponse).getQueryHeaders());
return Optional.<CommandResponsePackets>of(new QueryResponsePackets(dataHeaderPackets, dataHeaderPackets.size() + 2));
}
return Optional.of(result);
......
......@@ -25,7 +25,7 @@ import org.apache.shardingsphere.shardingproxy.backend.result.BackendResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.common.FailureResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.common.SuccessResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeader;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.shardingproxy.runtime.GlobalRegistry;
......@@ -83,7 +83,7 @@ public final class PostgreSQLComQueryPacket implements PostgreSQLQueryCommandPac
if (backendResponse instanceof FailureResponse) {
return Optional.of(new CommandResponsePackets(createDatabaseFailurePacket((FailureResponse) backendResponse)));
}
Collection<DataHeaderPacket> dataHeaderPackets = createDataHeaderPackets((QueryHeaderResponse) backendResponse);
Collection<DataHeaderPacket> dataHeaderPackets = createDataHeaderPackets((QueryResponse) backendResponse);
return Optional.<CommandResponsePackets>of(new QueryResponsePackets(dataHeaderPackets, dataHeaderPackets.size() + 2));
}
......@@ -95,10 +95,10 @@ public final class PostgreSQLComQueryPacket implements PostgreSQLQueryCommandPac
return new DatabaseFailurePacket(1, failureResponse.getErrorCode(), failureResponse.getSqlState(), failureResponse.getErrorMessage());
}
private Collection<DataHeaderPacket> createDataHeaderPackets(final QueryHeaderResponse queryHeaderResponse) {
private Collection<DataHeaderPacket> createDataHeaderPackets(final QueryResponse queryResponse) {
Collection<DataHeaderPacket> result = new LinkedList<>();
int sequenceId = 1;
for (QueryHeader each : queryHeaderResponse.getQueryHeaders()) {
for (QueryHeader each : queryResponse.getQueryHeaders()) {
result.add(new DataHeaderPacket(
++sequenceId, each.getSchema(), each.getTable(), each.getTable(), each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), each.getColumnType(), each.getDecimals()));
}
......
......@@ -8,51 +8,51 @@
#
######################################################################################################
#
#schemaName: sharding_db
#
#dataSources:
# ds_0:
# url: jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false
# username: root
# password:
# connectionTimeoutMilliseconds: 30000
# idleTimeoutMilliseconds: 60000
# maxLifetimeMilliseconds: 1800000
# maxPoolSize: 50
# ds_1:
# url: jdbc:mysql://127.0.0.1:3306/demo_ds_1?serverTimezone=UTC&useSSL=false
# username: root
# password:
# connectionTimeoutMilliseconds: 30000
# idleTimeoutMilliseconds: 60000
# maxLifetimeMilliseconds: 1800000
# maxPoolSize: 50
#
#shardingRule:
# tables:
# t_order:
# actualDataNodes: ds_${0..1}.t_order_${0..1}
# tableStrategy:
# inline:
# shardingColumn: order_id
# algorithmExpression: t_order_${order_id % 2}
# keyGenerator:
# type: SNOWFLAKE
# column: order_id
# t_order_item:
# actualDataNodes: ds_${0..1}.t_order_item_${0..1}
# tableStrategy:
# inline:
# shardingColumn: order_id
# algorithmExpression: t_order_item_${order_id % 2}
# keyGenerator:
# type: SNOWFLAKE
# column: order_item_id
# bindingTables:
# - t_order,t_order_item
# defaultDatabaseStrategy:
# inline:
# shardingColumn: user_id
# algorithmExpression: ds_${user_id % 2}
# defaultTableStrategy:
# none:
schemaName: sharding_db
dataSources:
ds_0:
url: jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false
username: root
password:
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
ds_1:
url: jdbc:mysql://127.0.0.1:3306/demo_ds_1?serverTimezone=UTC&useSSL=false
username: root
password:
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
shardingRule:
tables:
t_order:
actualDataNodes: ds_${0..1}.t_order_${0..1}
tableStrategy:
inline:
shardingColumn: order_id
algorithmExpression: t_order_${order_id % 2}
keyGenerator:
type: SNOWFLAKE
column: order_id
t_order_item:
actualDataNodes: ds_${0..1}.t_order_item_${0..1}
tableStrategy:
inline:
shardingColumn: order_id
algorithmExpression: t_order_item_${order_id % 2}
keyGenerator:
type: SNOWFLAKE
column: order_item_id
bindingTables:
- t_order,t_order_item
defaultDatabaseStrategy:
inline:
shardingColumn: user_id
algorithmExpression: ds_${user_id % 2}
defaultTableStrategy:
none:
......@@ -11,18 +11,18 @@
# serverLists: localhost:2181
# namespace: orchestration
#
#authentication:
# username: root
# password: root
#
#props:
# max.connections.size.per.query: 1
# acceptor.size: 16 # The default value is available processors count * 2.
# executor.size: 16 # Infinite by default.
# proxy.frontend.flush.threshold: 128 # The default value is 128.
# # LOCAL: Proxy will run with LOCAL transaction.
# # XA: Proxy will run with XA transaction.
# # BASE: Proxy will run with B.A.S.E transaction.
# proxy.transaction.type: LOCAL
# proxy.opentracing.enabled: false
# sql.show: false
authentication:
username: root
password: root
props:
max.connections.size.per.query: 1
acceptor.size: 16 # The default value is available processors count * 2.
executor.size: 16 # Infinite by default.
proxy.frontend.flush.threshold: 128 # The default value is 128.
# LOCAL: Proxy will run with LOCAL transaction.
# XA: Proxy will run with XA transaction.
# BASE: Proxy will run with B.A.S.E transaction.
proxy.transaction.type: LOCAL
proxy.opentracing.enabled: false
sql.show: false
......@@ -19,7 +19,7 @@ package org.apache.shardingsphere.shardingproxy.backend.text.admin;
import org.apache.shardingsphere.shardingproxy.backend.MockGlobalRegistryUtil;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -44,8 +44,8 @@ public final class ShowDatabasesBackendHandlerTest {
@Test
public void assertExecuteShowDatabaseBackendHandler() {
QueryHeaderResponse actual = (QueryHeaderResponse) showDatabasesBackendHandler.execute();
assertThat(actual, instanceOf(QueryHeaderResponse.class));
QueryResponse actual = (QueryResponse) showDatabasesBackendHandler.execute();
assertThat(actual, instanceOf(QueryResponse.class));
assertThat(actual.getQueryHeaders().size(), is(1));
}
......
......@@ -21,7 +21,7 @@ import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connec
import org.apache.shardingsphere.shardingproxy.backend.result.BackendResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.common.FailureResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
......@@ -42,8 +42,8 @@ public final class ShardingCTLShowBackendHandlerTest {
backendConnection.setCurrentSchema("schema");
ShardingCTLShowBackendHandler backendHandler = new ShardingCTLShowBackendHandler("sctl:show transaction_type", backendConnection);
BackendResponse actual = backendHandler.execute();
assertThat(actual, instanceOf(QueryHeaderResponse.class));
assertThat(((QueryHeaderResponse) actual).getQueryHeaders().size(), is(1));
assertThat(actual, instanceOf(QueryResponse.class));
assertThat(((QueryResponse) actual).getQueryHeaders().size(), is(1));
backendHandler.next();
QueryData queryData = backendHandler.getQueryData();
assertThat(queryData.getData().iterator().next(), CoreMatchers.<Object>is("LOCAL"));
......@@ -54,8 +54,8 @@ public final class ShardingCTLShowBackendHandlerTest {
backendConnection.setCurrentSchema("schema");
ShardingCTLShowBackendHandler backendHandler = new ShardingCTLShowBackendHandler("sctl:show cached_connections", backendConnection);
BackendResponse actual = backendHandler.execute();
assertThat(actual, instanceOf(QueryHeaderResponse.class));
assertThat(((QueryHeaderResponse) actual).getQueryHeaders().size(), is(1));
assertThat(actual, instanceOf(QueryResponse.class));
assertThat(((QueryResponse) actual).getQueryHeaders().size(), is(1));
backendHandler.next();
QueryData queryData = backendHandler.getQueryData();
assertThat(queryData.getData().iterator().next(), CoreMatchers.<Object>is(0));
......
......@@ -22,7 +22,7 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.shardingproxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.common.packet.command.CommandResponsePackets;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.MySQLPacketPayload;
......@@ -84,7 +84,7 @@ public final class MySQLComStmtExecutePacketTest {
DatabaseCommunicationEngine databaseCommunicationEngine = mock(DatabaseCommunicationEngine.class);
when(payload.readInt4()).thenReturn(1);
when(payload.readInt1()).thenReturn(0, 1);
when(databaseCommunicationEngine.execute()).thenReturn(mock(QueryHeaderResponse.class));
when(databaseCommunicationEngine.execute()).thenReturn(mock(QueryResponse.class));
when(databaseCommunicationEngine.next()).thenReturn(true, false);
when(databaseCommunicationEngine.getQueryData()).thenReturn(new QueryData(Collections.singletonList(Types.BIGINT), Collections.<Object>singletonList(99999L)));
MySQLQueryComStmtExecutePacket packet = new MySQLQueryComStmtExecutePacket(1, payload, backendConnection);
......
......@@ -23,7 +23,7 @@ import org.apache.shardingsphere.core.constant.ShardingConstant;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.ConnectionStatus;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryData;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryHeaderResponse;
import org.apache.shardingsphere.shardingproxy.backend.result.query.QueryResponse;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.shardingproxy.runtime.GlobalRegistry;
import org.apache.shardingsphere.shardingproxy.runtime.schema.ShardingSchema;
......@@ -101,8 +101,8 @@ public final class MySQLComQueryPacketTest {
public void assertExecuteWithoutTransaction() throws SQLException {
when(payload.readStringEOF()).thenReturn("SELECT id FROM tbl");
MySQLComPacketQuery packet = new MySQLComPacketQuery(1, payload, backendConnection);
QueryHeaderResponse queryHeaderResponse = mock(QueryHeaderResponse.class);
setBackendHandler(packet, queryHeaderResponse);
QueryResponse queryResponse = mock(QueryResponse.class);
setBackendHandler(packet, queryResponse);
Optional<CommandResponsePackets> actual = packet.execute();
assertTrue(actual.isPresent());
assertTrue(packet.next());
......@@ -112,11 +112,11 @@ public final class MySQLComQueryPacketTest {
}
@SneakyThrows
private void setBackendHandler(final MySQLComPacketQuery packet, final QueryHeaderResponse queryHeaderResponse) {
private void setBackendHandler(final MySQLComPacketQuery packet, final QueryResponse queryResponse) {
TextProtocolBackendHandler textProtocolBackendHandler = mock(TextProtocolBackendHandler.class);
when(textProtocolBackendHandler.next()).thenReturn(true, false);
when(textProtocolBackendHandler.getQueryData()).thenReturn(new QueryData(Collections.singletonList(Types.VARCHAR), Collections.<Object>singletonList("id")));
when(textProtocolBackendHandler.execute()).thenReturn(queryHeaderResponse);
when(textProtocolBackendHandler.execute()).thenReturn(queryResponse);
when(textProtocolBackendHandler.next()).thenReturn(true, false);
when(textProtocolBackendHandler.getQueryData()).thenReturn(new QueryData(Collections.singletonList(Types.BIGINT), Collections.<Object>singletonList(99999L)));
Field field = MySQLComPacketQuery.class.getDeclaredField("textProtocolBackendHandler");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册