PostgreSQLComBindExecutor.java 8.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind;
19

20
import lombok.Getter;
21 22 23 24 25 26 27 28 29 30 31
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnType;
import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBinaryResultSetRowPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
32
import org.apache.shardingsphere.infra.context.SchemaContext;
33
import org.apache.shardingsphere.infra.executor.sql.QueryResult;
L
Liang Zhang 已提交
34
import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
35 36 37 38 39 40 41 42
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryData;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
43
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
44
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
45
import org.apache.shardingsphere.proxy.frontend.postgresql.PostgreSQLErrPacketFactory;
T
tristaZero 已提交
46
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
47

L
Liang Zhang 已提交
48
import java.sql.ResultSetMetaData;
49 50 51 52 53 54
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
L
Liang Zhang 已提交
55
import java.util.Optional;
56 57

/**
58
 * Command bind executor for PostgreSQL.
59
 */
60
public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
61
    
62
    private final PostgreSQLComBindPacket packet;
63 64
            
    private final DatabaseCommunicationEngine databaseCommunicationEngine;
65
    
66 67
    @Getter
    private volatile boolean isQueryResponse;
68
    
69 70 71 72
    @Getter
    private volatile boolean isUpdateResponse;
    
    @Getter
73 74
    private volatile boolean isErrorResponse;
    
75 76
    public PostgreSQLComBindExecutor(final PostgreSQLComBindPacket packet, final BackendConnection backendConnection) {
        this.packet = packet;
L
Liang Zhang 已提交
77
        SchemaContext schemaContext = ProxyContext.getInstance().getSchema(backendConnection.getSchema());
78 79
        if (null != packet.getSql() && null != schemaContext) {
            SQLStatement sqlStatement = schemaContext.getRuntimeContext().getSqlParserEngine().parse(packet.getSql(), true);
T
tristaZero 已提交
80 81 82 83 84
            databaseCommunicationEngine =
                    DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatement, packet.getSql(), packet.getParameters(), backendConnection);
        } else {
            databaseCommunicationEngine = null;
        }
85
    }
86 87
    
    @Override
L
Liang Zhang 已提交
88
    public Collection<DatabasePacket<?>> execute() {
L
Liang Zhang 已提交
89
        if (ProxyContext.getInstance().getSchemaContexts().isCircuitBreak()) {
L
Liang Zhang 已提交
90
            return Collections.singletonList(new PostgreSQLErrorResponsePacket());
91
        }
L
Liang Zhang 已提交
92
        List<DatabasePacket<?>> result = new LinkedList<>();
93
        result.add(new PostgreSQLBindCompletePacket());
94 95 96
        if (null == databaseCommunicationEngine) {
            return result;
        }
97
        BackendResponse backendResponse = getBackendResponse();
98 99
        if (backendResponse instanceof QueryResponse) {
            createQueryPacket((QueryResponse) backendResponse).ifPresent(result::add);
100 101
        }
        if (backendResponse instanceof UpdateResponse) {
102
            isUpdateResponse = true;
103 104
            result.add(createUpdatePacket((UpdateResponse) backendResponse));
        }
105 106 107
        if (backendResponse instanceof ErrorResponse) {
            isErrorResponse = true;
            result.add(createErrorPacket((ErrorResponse) backendResponse));
108
        }
109
        return result;
110 111
    }
    
112 113 114 115 116 117 118 119 120 121 122 123
    private BackendResponse getBackendResponse() {
        BackendResponse result;
        try {
            result = databaseCommunicationEngine.execute();
        // CHECKSTYLE:OFF
        } catch (final Exception ex) {
        // CHECKSTYLE:OFF
            result = new ErrorResponse(ex);
        }
        return result;
    }
    
124 125
    private Optional<PostgreSQLRowDescriptionPacket> createQueryPacket(final QueryResponse queryResponse) {
        List<PostgreSQLColumnDescription> columnDescriptions = getPostgreSQLColumnDescriptions(queryResponse);
126
        isQueryResponse = !columnDescriptions.isEmpty();
127
        if (columnDescriptions.isEmpty() || packet.isBinaryRowData()) {
L
Liang Zhang 已提交
128
            return Optional.empty();
129 130 131 132 133 134
        }
        return Optional.of(new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), columnDescriptions));
    }
    
    private List<PostgreSQLColumnDescription> getPostgreSQLColumnDescriptions(final QueryResponse queryResponse) {
        List<PostgreSQLColumnDescription> result = new LinkedList<>();
135
        List<QueryResult> queryResults = queryResponse.getQueryResults();
136
        ResultSetMetaData resultSetMetaData = queryResults.isEmpty() ? null : queryResults.get(0).getResultSetMetaData();
137 138
        int columnIndex = 0;
        for (QueryHeader each : queryResponse.getQueryHeaders()) {
139
            result.add(new PostgreSQLColumnDescription(each.getColumnName(), ++columnIndex, each.getColumnType(), each.getColumnLength(), resultSetMetaData));
140 141 142 143
        }
        return result;
    }
    
144 145 146 147 148 149 150 151
    private PostgreSQLCommandCompletePacket createUpdatePacket(final UpdateResponse updateResponse) {
        return new PostgreSQLCommandCompletePacket(updateResponse.getType(), updateResponse.getUpdateCount());
    }
    
    private PostgreSQLErrorResponsePacket createErrorPacket(final ErrorResponse errorResponse) {
        return PostgreSQLErrPacketFactory.newInstance(errorResponse.getCause());
    }
    
152 153 154 155 156 157 158 159
    @Override
    public boolean next() throws SQLException {
        return null != databaseCommunicationEngine && databaseCommunicationEngine.next();
    }
    
    @Override
    public PostgreSQLPacket getQueryData() throws SQLException {
        QueryData queryData = databaseCommunicationEngine.getQueryData();
160
        return packet.isBinaryRowData() ? new PostgreSQLBinaryResultSetRowPacket(queryData.getData(), getPostgreSQLColumnTypes(queryData)) : new PostgreSQLDataRowPacket(queryData.getData());
161 162 163 164 165 166 167 168 169 170
    }
    
    private List<PostgreSQLColumnType> getPostgreSQLColumnTypes(final QueryData queryData) {
        List<PostgreSQLColumnType> result = new ArrayList<>(queryData.getColumnTypes().size());
        for (int i = 0; i < queryData.getColumnTypes().size(); i++) {
            result.add(PostgreSQLColumnType.valueOfJDBCType(queryData.getColumnTypes().get(i)));
        }
        return result;
    }
}