/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind; import lombok.Getter; import org.apache.shardingsphere.db.protocol.packet.DatabasePacket; import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnType; import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBinaryResultSetRowPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket; import org.apache.shardingsphere.infra.executor.sql.QueryResult; import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader; import org.apache.shardingsphere.kernel.context.SchemaContext; import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine; import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory; import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; import org.apache.shardingsphere.proxy.backend.response.BackendResponse; import org.apache.shardingsphere.proxy.backend.response.error.ErrorResponse; import org.apache.shardingsphere.proxy.backend.response.query.QueryData; import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse; import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse; import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts; import org.apache.shardingsphere.proxy.frontend.api.QueryCommandExecutor; import org.apache.shardingsphere.proxy.frontend.postgresql.PostgreSQLErrPacketFactory; import org.apache.shardingsphere.sql.parser.sql.statement.SQLStatement; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; /** * Command bind executor for PostgreSQL. */ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor { private final PostgreSQLComBindPacket packet; private final DatabaseCommunicationEngine databaseCommunicationEngine; @Getter private volatile boolean isQueryResponse; @Getter private volatile boolean isUpdateResponse; @Getter private volatile boolean isErrorResponse; public PostgreSQLComBindExecutor(final PostgreSQLComBindPacket packet, final BackendConnection backendConnection) { this.packet = packet; SchemaContext schemaContext = ProxySchemaContexts.getInstance().getSchema(backendConnection.getSchema()); if (null != packet.getSql() && null != schemaContext) { SQLStatement sqlStatement = schemaContext.getRuntimeContext().getSqlParserEngine().parse(packet.getSql(), true); databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatement, packet.getSql(), packet.getParameters(), backendConnection); } else { databaseCommunicationEngine = null; } } @Override public Collection> execute() { if (ProxySchemaContexts.getInstance().getSchemaContexts().isCircuitBreak()) { return Collections.singletonList(new PostgreSQLErrorResponsePacket()); } List> result = new LinkedList<>(); result.add(new PostgreSQLBindCompletePacket()); if (null == databaseCommunicationEngine) { return result; } BackendResponse backendResponse = databaseCommunicationEngine.execute(); if (backendResponse instanceof ErrorResponse) { isErrorResponse = true; result.add(createErrorPacket((ErrorResponse) backendResponse)); } if (backendResponse instanceof UpdateResponse) { isUpdateResponse = true; result.add(createUpdatePacket((UpdateResponse) backendResponse)); } if (backendResponse instanceof QueryResponse) { createQueryPacket((QueryResponse) backendResponse).ifPresent(result::add); } return result; } private PostgreSQLErrorResponsePacket createErrorPacket(final ErrorResponse errorResponse) { return PostgreSQLErrPacketFactory.newInstance(errorResponse.getCause()); } private PostgreSQLCommandCompletePacket createUpdatePacket(final UpdateResponse updateResponse) { return new PostgreSQLCommandCompletePacket(updateResponse.getType(), updateResponse.getUpdateCount()); } private Optional createQueryPacket(final QueryResponse queryResponse) { List columnDescriptions = getPostgreSQLColumnDescriptions(queryResponse); isQueryResponse = !columnDescriptions.isEmpty(); if (columnDescriptions.isEmpty() || packet.isBinaryRowData()) { return Optional.empty(); } return Optional.of(new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), columnDescriptions)); } private List getPostgreSQLColumnDescriptions(final QueryResponse queryResponse) { List result = new LinkedList<>(); List queryResults = queryResponse.getQueryResults(); ResultSetMetaData resultSetMetaData = !queryResults.isEmpty() ? queryResults.get(0).getResultSetMetaData() : null; int columnIndex = 0; for (QueryHeader each : queryResponse.getQueryHeaders()) { result.add(new PostgreSQLColumnDescription(each.getColumnName(), ++columnIndex, each.getColumnType(), each.getColumnLength(), resultSetMetaData)); } return result; } @Override public boolean next() throws SQLException { return null != databaseCommunicationEngine && databaseCommunicationEngine.next(); } @Override public PostgreSQLPacket getQueryData() throws SQLException { QueryData queryData = databaseCommunicationEngine.getQueryData(); return packet.isBinaryRowData() ? new PostgreSQLBinaryResultSetRowPacket(queryData.getData(), getPostgreSQLColumnTypes(queryData)) : new PostgreSQLDataRowPacket(queryData.getData()); } private List getPostgreSQLColumnTypes(final QueryData queryData) { List result = new ArrayList<>(queryData.getColumnTypes().size()); for (int i = 0; i < queryData.getColumnTypes().size(); i++) { result.add(PostgreSQLColumnType.valueOfJDBCType(queryData.getColumnTypes().get(i))); } return result; } }