未验证 提交 9090c5d3 编写于 作者: L Liao Lanyu 提交者: GitHub

Fix the issue that Drivers of query with limit clause can not be finished timely

上级 3ac87d35
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.exception.exchange;
public class GetTsBlockFromClosedOrAbortedChannelException extends IllegalStateException {
public GetTsBlockFromClosedOrAbortedChannelException(String s) {
super(s);
}
}
...@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.exchange; ...@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.exchange;
import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex; import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
...@@ -115,6 +116,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { ...@@ -115,6 +116,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
try { try {
ByteBuffer serializedTsBlock = sinkChannel.getSerializedTsBlock(i); ByteBuffer serializedTsBlock = sinkChannel.getSerializedTsBlock(i);
resp.addToTsBlocks(serializedTsBlock); resp.addToTsBlocks(serializedTsBlock);
} catch (GetTsBlockFromClosedOrAbortedChannelException e) {
// Return an empty block list to indicate that getting data block failed this time.
// The SourceHandle will deal with this signal depending on its state.
return new TGetDataBlockResponse(new ArrayList<>());
} catch (IllegalStateException | IOException e) { } catch (IllegalStateException | IOException e) {
throw new TException(e); throw new TException(e);
} }
......
...@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl ...@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet; import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
...@@ -306,11 +307,11 @@ public class SinkChannel implements ISinkChannel { ...@@ -306,11 +307,11 @@ public class SinkChannel implements ISinkChannel {
public synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException { public synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
if (aborted || closed) { if (aborted || closed) {
LOGGER.warn( LOGGER.debug(
"SinkChannel still receive getting TsBlock request after being aborted={} or closed={}", "SinkChannel still receive getting TsBlock request after being aborted={} or closed={}",
aborted, aborted,
closed); closed);
throw new IllegalStateException("SinkChannel is aborted or closed. "); throw new GetTsBlockFromClosedOrAbortedChannelException("SinkChannel is aborted or closed. ");
} }
Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId); Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
if (pair == null || pair.left == null) { if (pair == null || pair.left == null) {
......
...@@ -666,7 +666,7 @@ public class SourceHandle implements ISourceHandle { ...@@ -666,7 +666,7 @@ public class SourceHandle implements ISourceHandle {
public void run() { public void run() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
LOGGER.debug( LOGGER.debug(
"[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}]).", "[SendCloseSinkChannelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
remoteFragmentInstanceId, remoteFragmentInstanceId,
indexOfUpstreamSinkHandle); indexOfUpstreamSinkHandle);
int attempt = 0; int attempt = 0;
...@@ -680,7 +680,7 @@ public class SourceHandle implements ISourceHandle { ...@@ -680,7 +680,7 @@ public class SourceHandle implements ISourceHandle {
break; break;
} catch (Throwable e) { } catch (Throwable e) {
LOGGER.warn( LOGGER.warn(
"[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).", "[SendCloseSinkChannelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
remoteFragmentInstanceId, remoteFragmentInstanceId,
indexOfUpstreamSinkHandle); indexOfUpstreamSinkHandle);
if (attempt == MAX_ATTEMPT_TIMES) { if (attempt == MAX_ATTEMPT_TIMES) {
......
...@@ -88,8 +88,11 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker { ...@@ -88,8 +88,11 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
} }
for (FragmentInstanceId fragmentInstanceId : instanceIds) { for (FragmentInstanceId fragmentInstanceId : instanceIds) {
InstanceStateMetrics stateMetrics = instanceStateMap.get(fragmentInstanceId); InstanceStateMetrics stateMetrics = instanceStateMap.get(fragmentInstanceId);
if (stateMetrics != null if (stateMetrics == null
&& (stateMetrics.lastState == null || !stateMetrics.lastState.isDone())) { || stateMetrics.lastState == null
|| !stateMetrics.lastState.isDone()) {
// FI whose state has not been updated is considered to be unfinished.(In Query with limit
// clause, it's possible that the query is finished before the state of FI being recorded.)
res.add(fragmentInstanceId); res.add(fragmentInstanceId);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册