PostgreSQLComQueryExecutor.java 6.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.text;
19

20
import lombok.Getter;
21 22 23 24 25 26 27 28
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLComQueryPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
29
import org.apache.shardingsphere.infra.database.type.DatabaseTypes;
30
import org.apache.shardingsphere.infra.executor.sql.QueryResult;
L
Liang Zhang 已提交
31
import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
32 33 34 35 36
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
37
import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts;
38 39 40 41
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.proxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.postgresql.PostgreSQLErrPacketFactory;
42

L
Liang Zhang 已提交
43
import java.sql.ResultSetMetaData;
44 45 46 47 48
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
L
Liang Zhang 已提交
49
import java.util.Optional;
50 51

/**
52
 * Command query executor for PostgreSQL.
53
 */
54
public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
55
    
56
    private final TextProtocolBackendHandler textProtocolBackendHandler;
57
    
58 59
    @Getter
    private volatile boolean isQueryResponse;
60
    
61 62 63 64
    @Getter
    private volatile boolean isUpdateResponse;
    
    @Getter
65 66
    private volatile boolean isErrorResponse;
    
67
    public PostgreSQLComQueryExecutor(final PostgreSQLComQueryPacket comQueryPacket, final BackendConnection backendConnection) {
68
        textProtocolBackendHandler = TextProtocolBackendHandlerFactory.newInstance(DatabaseTypes.getActualDatabaseType("PostgreSQL"), comQueryPacket.getSql(), backendConnection);
69 70 71
    }
    
    @Override
L
Liang Zhang 已提交
72
    public Collection<DatabasePacket<?>> execute() throws SQLException {
73
        if (ProxySchemaContexts.getInstance().getSchemaContexts().isCircuitBreak()) {
L
Liang Zhang 已提交
74
            return Collections.singletonList(new PostgreSQLErrorResponsePacket());
75
        }
76
        BackendResponse backendResponse = getBackendResponse();
77 78 79
        if (backendResponse instanceof QueryResponse) {
            Optional<PostgreSQLRowDescriptionPacket> result = createQueryPacket((QueryResponse) backendResponse);
            return result.<List<DatabasePacket<?>>>map(Collections::singletonList).orElseGet(Collections::emptyList);
80 81
        }
        if (backendResponse instanceof UpdateResponse) {
82
            isUpdateResponse = true;
L
Liang Zhang 已提交
83
            return Collections.singletonList(createUpdatePacket((UpdateResponse) backendResponse));
84
        }
85 86
        isErrorResponse = true;
        return Collections.singletonList(createErrorPacket((ErrorResponse) backendResponse));
87 88
    }
    
89 90 91 92 93 94 95 96 97 98 99 100
    private BackendResponse getBackendResponse() {
        BackendResponse result;
        try {
            result = textProtocolBackendHandler.execute();
        // CHECKSTYLE:OFF
        } catch (final Exception ex) {
        // CHECKSTYLE:OFF
            result = new ErrorResponse(ex);
        }
        return result;
    }
    
101 102
    private Optional<PostgreSQLRowDescriptionPacket> createQueryPacket(final QueryResponse queryResponse) {
        List<PostgreSQLColumnDescription> columnDescriptions = getPostgreSQLColumnDescriptions(queryResponse);
103
        isQueryResponse = !columnDescriptions.isEmpty();
104
        if (columnDescriptions.isEmpty()) {
L
Liang Zhang 已提交
105
            return Optional.empty();
106 107 108 109 110 111
        }
        return Optional.of(new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), columnDescriptions));
    }
    
    private List<PostgreSQLColumnDescription> getPostgreSQLColumnDescriptions(final QueryResponse queryResponse) {
        List<PostgreSQLColumnDescription> result = new LinkedList<>();
112
        List<QueryResult> queryResults = queryResponse.getQueryResults();
113
        ResultSetMetaData resultSetMetaData = queryResults.isEmpty() ? null : queryResults.get(0).getResultSetMetaData();
114 115
        int columnIndex = 0;
        for (QueryHeader each : queryResponse.getQueryHeaders()) {
116
            result.add(new PostgreSQLColumnDescription(each.getColumnName(), ++columnIndex, each.getColumnType(), each.getColumnLength(), resultSetMetaData));
117 118 119 120
        }
        return result;
    }
    
121 122 123 124 125 126 127 128
    private PostgreSQLCommandCompletePacket createUpdatePacket(final UpdateResponse updateResponse) {
        return new PostgreSQLCommandCompletePacket(updateResponse.getType(), updateResponse.getUpdateCount());
    }
    
    private PostgreSQLErrorResponsePacket createErrorPacket(final ErrorResponse errorResponse) {
        return PostgreSQLErrPacketFactory.newInstance(errorResponse.getCause());
    }
    
129 130 131 132 133 134 135 136 137 138
    @Override
    public boolean next() throws SQLException {
        return textProtocolBackendHandler.next();
    }
    
    @Override
    public PostgreSQLPacket getQueryData() throws SQLException {
        return new PostgreSQLDataRowPacket(textProtocolBackendHandler.getQueryData().getData());
    }
}