未验证 提交 6a255beb 编写于 作者: Z Zhang Yonglun 提交者: GitHub

Add state machine implement for proxy (#8177)

......@@ -23,18 +23,15 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.replicaquery.route.engine.impl.PrimaryVisitedManager;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.frontend.auth.AuthenticationResult;
import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;
import org.apache.shardingsphere.proxy.frontend.executor.ChannelThreadExecutorGroup;
import org.apache.shardingsphere.proxy.frontend.executor.CommandExecutorSelector;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyStateMachine;
import org.apache.shardingsphere.replicaquery.route.engine.impl.PrimaryVisitedManager;
import org.apache.shardingsphere.transaction.core.TransactionType;
import java.util.concurrent.ExecutorService;
/**
* Frontend channel inbound handler.
*/
......@@ -65,11 +62,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
authorized = auth(context, (ByteBuf) message);
return;
}
boolean supportHint = ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.PROXY_HINT_ENABLED);
boolean isOccupyThreadForPerConnection = databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection();
ExecutorService executorService = CommandExecutorSelector.getExecutorService(
isOccupyThreadForPerConnection, supportHint, backendConnection.getTransactionStatus().getTransactionType(), context.channel().id());
executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
ProxyStateMachine.execute(context, message, databaseProtocolFrontendEngine, backendConnection);
}
private boolean auth(final ChannelHandlerContext context, final ByteBuf message) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.frontend.state;
import io.netty.channel.ChannelHandlerContext;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
/**
* Proxy state.
*/
public interface ProxyState {
/**
* Execute command.
*
* @param context channel handler context
* @param message message
* @param databaseProtocolFrontendEngine database protocol frontend engine
* @param backendConnection backend connection
*/
void execute(ChannelHandlerContext context, Object message, DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, BackendConnection backendConnection);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.frontend.state;
import io.netty.channel.ChannelHandlerContext;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.impl.OKProxyState;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
/**
* Proxy state machine.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ProxyStateMachine {
private static final Map<ProxyStateType, ProxyState> PROXY_STATE_MAP = new ConcurrentHashMap<>(3, 1);
private static final AtomicReference<ProxyState> CURRENT_STATE = new AtomicReference<>();
static {
PROXY_STATE_MAP.put(ProxyStateType.OK, new OKProxyState());
CURRENT_STATE.set(PROXY_STATE_MAP.get(ProxyStateType.OK));
}
/**
* Execute command.
*
* @param context channel handler context
* @param message message
* @param databaseProtocolFrontendEngine database protocol frontend engine
* @param backendConnection backend connection
*/
public static void execute(final ChannelHandlerContext context, final Object message,
final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
CURRENT_STATE.get().execute(context, message, databaseProtocolFrontendEngine, backendConnection);
}
/**
* Switch proxy state.
*
* @param type proxy state type
*/
public static void switchState(final ProxyStateType type) {
CURRENT_STATE.set(PROXY_STATE_MAP.get(type));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.frontend.state;
/**
* Proxy state type.
*/
public enum ProxyStateType {
OK, LOCK, CIRCUIT_BREAK
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.frontend.state.impl;
import io.netty.channel.ChannelHandlerContext;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;
import org.apache.shardingsphere.proxy.frontend.executor.CommandExecutorSelector;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
import java.util.concurrent.ExecutorService;
/**
* OK proxy state.
*/
public final class OKProxyState implements ProxyState {
@Override
public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
boolean supportHint = ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.PROXY_HINT_ENABLED);
boolean isOccupyThreadForPerConnection = databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection();
ExecutorService executorService = CommandExecutorSelector.getExecutorService(
isOccupyThreadForPerConnection, supportHint, backendConnection.getTransactionStatus().getTransactionType(), context.channel().id());
executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册