提交 b28c9fb6 编写于 作者: T terrymanu

MySQLResultCache => FutureRegistry

上级 788d3c3f
......@@ -33,7 +33,9 @@ import io.shardingsphere.core.routing.SQLRouteResult;
import io.shardingsphere.core.routing.StatementRoutingEngine;
import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter;
import io.shardingsphere.proxy.backend.AbstractBackendHandler;
import io.shardingsphere.proxy.backend.BackendExecutorContext;
import io.shardingsphere.proxy.backend.ResultPacket;
import io.shardingsphere.proxy.backend.netty.future.FutureRegistry;
import io.shardingsphere.proxy.backend.netty.mysql.MySQLQueryResult;
import io.shardingsphere.proxy.config.ProxyTableMetaDataConnectionManager;
import io.shardingsphere.proxy.config.RuleRegistry;
......@@ -44,9 +46,7 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePac
import io.shardingsphere.proxy.transport.mysql.packet.command.query.text.query.ComQueryPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket;
import io.shardingsphere.proxy.backend.BackendExecutorContext;
import io.shardingsphere.proxy.util.MySQLResultCache;
import io.shardingsphere.proxy.util.SynchronizedFuture;
import io.shardingsphere.proxy.backend.netty.future.SynchronizedFuture;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
......@@ -101,10 +101,10 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
private CommandResponsePackets executeForMasterSlave() throws InterruptedException, ExecutionException, TimeoutException {
String dataSourceName = new MasterSlaveRouter(RULE_REGISTRY.getMasterSlaveRule(), RULE_REGISTRY.isShowSQL()).route(sql).iterator().next();
synchronizedFuture = new SynchronizedFuture(1);
MySQLResultCache.getInstance().putFuture(connectionId, synchronizedFuture);
executeCommand(dataSourceName, sql);
FutureRegistry.getInstance().put(connectionId, synchronizedFuture);
executeSQL(dataSourceName, sql);
List<QueryResult> queryResults = synchronizedFuture.get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS);
MySQLResultCache.getInstance().deleteFuture(connectionId);
FutureRegistry.getInstance().delete(connectionId);
List<CommandResponsePackets> packets = new LinkedList<>();
for (QueryResult each : queryResults) {
packets.add(((MySQLQueryResult) each).getCommandResponsePackets());
......@@ -120,12 +120,12 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
return new CommandResponsePackets(new OKPacket(1));
}
synchronizedFuture = new SynchronizedFuture(routeResult.getExecutionUnits().size());
MySQLResultCache.getInstance().putFuture(connectionId, synchronizedFuture);
FutureRegistry.getInstance().put(connectionId, synchronizedFuture);
for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
executeCommand(each.getDataSource(), each.getSqlUnit().getSql());
executeSQL(each.getDataSource(), each.getSqlUnit().getSql());
}
List<QueryResult> queryResults = synchronizedFuture.get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS);
MySQLResultCache.getInstance().deleteFuture(connectionId);
FutureRegistry.getInstance().delete(connectionId);
List<CommandResponsePackets> packets = Lists.newArrayListWithCapacity(queryResults.size());
for (QueryResult each : queryResults) {
......@@ -149,7 +149,7 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
return result;
}
private void executeCommand(final String dataSourceName, final String sql) throws InterruptedException, ExecutionException, TimeoutException {
private void executeSQL(final String dataSourceName, final String sql) throws InterruptedException, ExecutionException, TimeoutException {
if (!channelMap.containsKey(dataSourceName)) {
channelMap.put(dataSourceName, new ArrayList<Channel>());
}
......
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.proxy.util;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.concurrent.TimeUnit;
/**
* Cache for synchronized future.
*
* @author wangkai
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MySQLResultCache {
private static final MySQLResultCache INSTANCE = new MySQLResultCache();
//TODO expire time should be set.
private final Cache<Integer, SynchronizedFuture> resultCache = CacheBuilder.newBuilder().expireAfterWrite(60, TimeUnit.SECONDS).build();
/**
* Get instance of MySQL result cache.
*
* @return instance of MySQL result cache
*/
public static MySQLResultCache getInstance() {
return INSTANCE;
}
/**
* Put synchronized future by connection id.
*
* @param connectionId MySQL connection id
* @param synchronizedFuture multiple result set
*/
public void putFuture(final int connectionId, final SynchronizedFuture synchronizedFuture) {
resultCache.put(connectionId, synchronizedFuture);
}
/**
* Get synchronized future by connection id.
*
* @param connectionId MySQL connection id
* @return multiple result set
*/
public SynchronizedFuture getFuture(final int connectionId) {
return resultCache.getIfPresent(connectionId);
}
/**
* Delete synchronized future by connection id.
*
* @param connectionId MySQL connection id
*/
public void deleteFuture(final int connectionId) {
resultCache.invalidate(connectionId);
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.proxy.backend.netty.future;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.concurrent.TimeUnit;
/**
* Future registry.
*
* @author wangkai
* @author zhangliang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class FutureRegistry {
private static final FutureRegistry INSTANCE = new FutureRegistry();
//TODO expire time should be set.
private final Cache<Integer, SynchronizedFuture> resultCache = CacheBuilder.newBuilder().expireAfterWrite(60, TimeUnit.SECONDS).build();
/**
* Get instance of future registry.
*
* @return instance of future registry
*/
public static FutureRegistry getInstance() {
return INSTANCE;
}
/**
* Put synchronized future by connection ID.
*
* @param connectionId database connection ID
* @param synchronizedFuture synchronized future
*/
public void put(final int connectionId, final SynchronizedFuture synchronizedFuture) {
resultCache.put(connectionId, synchronizedFuture);
}
/**
* Get synchronized future by connection ID.
*
* @param connectionId database connection ID
* @return synchronized future
*/
public SynchronizedFuture get(final int connectionId) {
return resultCache.getIfPresent(connectionId);
}
/**
* Delete synchronized future by connection ID.
*
* @param connectionId database connection ID
*/
public void delete(final int connectionId) {
resultCache.invalidate(connectionId);
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.proxy.util;
import com.google.common.collect.Lists;
import io.shardingsphere.core.merger.QueryResult;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Synchronized future for get multiple netty returns.
*
* @author wangkai
* @author linjiaqi
*/
@Slf4j
public class SynchronizedFuture implements Future<List<QueryResult>> {
private final CountDownLatch latch;
private final List<QueryResult> responses;
private boolean isDone;
public SynchronizedFuture(final int resultSize) {
latch = new CountDownLatch(resultSize);
responses = Lists.newArrayListWithCapacity(resultSize);
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isDone;
}
@Override
public List<QueryResult> get() throws InterruptedException {
latch.await();
return responses;
}
/**
* Get responses for waiting time.
*
* @param timeout wait timeout
* @param unit time unit
* @return responses
*/
@Override
public List<QueryResult> get(final long timeout, final TimeUnit unit) {
try {
latch.await(timeout, unit);
isDone = true;
} catch (final InterruptedException ex) {
log.error(ex.getMessage(), ex);
}
return responses;
}
/**
* Set response and count down.
*
* @param response SQL command result
*/
public void setResponse(final QueryResult response) {
responses.add(response);
latch.countDown();
}
}
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.shardingsphere.proxy.backend.netty.future;
import com.google.common.collect.Lists;
import io.shardingsphere.core.merger.QueryResult;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Synchronized future for get multiple netty returns.
*
* @author wangkai
* @author linjiaqi
*/
@Slf4j
public class SynchronizedFuture implements Future<List<QueryResult>> {
private final CountDownLatch latch;
private final List<QueryResult> responses;
private boolean isDone;
public SynchronizedFuture(final int resultSize) {
latch = new CountDownLatch(resultSize);
responses = Lists.newArrayListWithCapacity(resultSize);
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isDone;
}
@Override
public List<QueryResult> get() throws InterruptedException {
latch.await();
return responses;
}
/**
* Get responses for waiting time.
*
* @param timeout wait timeout
* @param unit time unit
* @return responses
*/
@Override
public List<QueryResult> get(final long timeout, final TimeUnit unit) {
try {
latch.await(timeout, unit);
isDone = true;
} catch (final InterruptedException ex) {
log.error(ex.getMessage(), ex);
}
return responses;
}
/**
* Set response and count down.
*
* @param response SQL command result
*/
public void setResponse(final QueryResult response) {
responses.add(response);
latch.countDown();
}
}
......@@ -35,7 +35,7 @@ import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket;
import io.shardingsphere.proxy.transport.mysql.packet.handshake.HandshakePacket;
import io.shardingsphere.proxy.transport.mysql.packet.handshake.HandshakeResponse41Packet;
import io.shardingsphere.proxy.util.MySQLResultCache;
import io.shardingsphere.proxy.backend.netty.future.FutureRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -190,8 +190,8 @@ public class MySQLBackendHandler extends CommandResponsePacketsHandler {
private void setResponse(final ChannelHandlerContext context) {
int connectionId = ChannelRegistry.getInstance().getConnectionId(context.channel().id().asShortText());
if (MySQLResultCache.getInstance().getFuture(connectionId) != null) {
MySQLResultCache.getInstance().getFuture(connectionId).setResponse(resultMap.get(connectionId));
if (FutureRegistry.getInstance().get(connectionId) != null) {
FutureRegistry.getInstance().get(connectionId).setResponse(resultMap.get(connectionId));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册