MySQLCommandExecuteEngine.java 4.7 KB
Newer Older
T
terrymanu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.proxy.frontend.mysql.command;
T
terrymanu 已提交
19 20

import io.netty.channel.ChannelHandlerContext;
21
import lombok.SneakyThrows;
22 23 24 25 26 27 28 29 30 31
import org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketFactory;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketTypeLoader;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
import org.apache.shardingsphere.db.protocol.packet.CommandPacket;
import org.apache.shardingsphere.db.protocol.packet.CommandPacketType;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
32
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
33
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
34
import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts;
35 36 37 38
import org.apache.shardingsphere.proxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.proxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.engine.CommandExecuteEngine;
import org.apache.shardingsphere.proxy.frontend.mysql.MySQLErrPacketFactory;
T
terrymanu 已提交
39 40

import java.sql.SQLException;
41
import java.util.Optional;
T
terrymanu 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63

/**
 * Command execute engine for MySQL.
 */
public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
    
    @Override
    public MySQLCommandPacketType getCommandPacketType(final PacketPayload payload) {
        return MySQLCommandPacketTypeLoader.getCommandPacketType((MySQLPacketPayload) payload);
    }
    
    @Override
    public MySQLCommandPacket getCommandPacket(final PacketPayload payload, final CommandPacketType type, final BackendConnection backendConnection) throws SQLException {
        return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) type, (MySQLPacketPayload) payload);
    }
    
    @Override
    public CommandExecutor getCommandExecutor(final CommandPacketType type, final CommandPacket packet, final BackendConnection backendConnection) {
        return MySQLCommandExecutorFactory.newInstance((MySQLCommandPacketType) type, packet, backendConnection);
    }
    
    @Override
L
Liang Zhang 已提交
64
    public DatabasePacket<?> getErrorPacket(final Exception cause) {
T
terrymanu 已提交
65 66 67
        return MySQLErrPacketFactory.newInstance(1, cause);
    }
    
68
    @Override
L
Liang Zhang 已提交
69
    public Optional<DatabasePacket<?>> getOtherPacket() {
70 71 72
        return Optional.empty();
    }
    
T
terrymanu 已提交
73
    @Override
74
    @SneakyThrows
T
terrymanu 已提交
75
    public void writeQueryData(final ChannelHandlerContext context,
76
                               final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) {
77
        if (!queryCommandExecutor.isQueryResponse() || !context.channel().isActive()) {
T
terrymanu 已提交
78 79 80
            return;
        }
        int count = 0;
L
Liang Zhang 已提交
81
        int flushThreshold = ProxySchemaContexts.getInstance().getSchemaContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD);
T
terrymanu 已提交
82 83 84 85 86
        int currentSequenceId = 0;
        while (queryCommandExecutor.next()) {
            count++;
            while (!context.channel().isWritable() && context.channel().isActive()) {
                context.flush();
87
                backendConnection.getResourceSynchronizer().doAwaitUntil();
T
terrymanu 已提交
88
            }
L
Liang Zhang 已提交
89
            DatabasePacket<?> dataValue = queryCommandExecutor.getQueryData();
T
terrymanu 已提交
90 91 92 93 94 95 96 97 98 99
            context.write(dataValue);
            if (flushThreshold == count) {
                context.flush();
                count = 0;
            }
            currentSequenceId++;
        }
        context.write(new MySQLEofPacket(++currentSequenceId + headerPackagesCount));
    }
}