/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.shardingsphere.proxy.frontend.mysql.command; import io.netty.channel.ChannelHandlerContext; import lombok.SneakyThrows; import org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacket; import org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketFactory; import org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType; import org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketTypeLoader; import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket; import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload; import org.apache.shardingsphere.db.protocol.packet.CommandPacket; import org.apache.shardingsphere.db.protocol.packet.CommandPacketType; import org.apache.shardingsphere.db.protocol.packet.DatabasePacket; import org.apache.shardingsphere.db.protocol.payload.PacketPayload; import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey; import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts; import org.apache.shardingsphere.proxy.frontend.api.CommandExecutor; import org.apache.shardingsphere.proxy.frontend.api.QueryCommandExecutor; import org.apache.shardingsphere.proxy.frontend.engine.CommandExecuteEngine; import org.apache.shardingsphere.proxy.frontend.mysql.MySQLErrPacketFactory; import java.sql.SQLException; import java.util.Optional; /** * Command execute engine for MySQL. */ public final class MySQLCommandExecuteEngine implements CommandExecuteEngine { @Override public MySQLCommandPacketType getCommandPacketType(final PacketPayload payload) { return MySQLCommandPacketTypeLoader.getCommandPacketType((MySQLPacketPayload) payload); } @Override public MySQLCommandPacket getCommandPacket(final PacketPayload payload, final CommandPacketType type, final BackendConnection backendConnection) throws SQLException { return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) type, (MySQLPacketPayload) payload); } @Override public CommandExecutor getCommandExecutor(final CommandPacketType type, final CommandPacket packet, final BackendConnection backendConnection) { return MySQLCommandExecutorFactory.newInstance((MySQLCommandPacketType) type, packet, backendConnection); } @Override public DatabasePacket getErrorPacket(final Exception cause) { return MySQLErrPacketFactory.newInstance(1, cause); } @Override public Optional> getOtherPacket() { return Optional.empty(); } @Override @SneakyThrows public void writeQueryData(final ChannelHandlerContext context, final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) { if (!queryCommandExecutor.isQueryResponse() || !context.channel().isActive()) { return; } int count = 0; int flushThreshold = ProxySchemaContexts.getInstance().getSchemaContexts().getProps().getValue(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD); int currentSequenceId = 0; while (queryCommandExecutor.next()) { count++; while (!context.channel().isWritable() && context.channel().isActive()) { context.flush(); backendConnection.getResourceSynchronizer().doAwaitUntil(); } DatabasePacket dataValue = queryCommandExecutor.getQueryData(); context.write(dataValue); if (flushThreshold == count) { context.flush(); count = 0; } currentSequenceId++; } context.write(new MySQLEofPacket(++currentSequenceId + headerPackagesCount)); } }