From 1ae952ce14c400dfdea984b299ee57abf723f274 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Mon, 28 Aug 2023 01:15:03 +0800 Subject: [PATCH] [IOTDB-6127] Pipe: buffered events in processor stage can not be consumed by connector (#10962) --- ...peHistoricalDataRegionTsFileExtractor.java | 21 ++++-- .../processor/PipeDoNothingProcessor.java | 67 ------------------- .../task/connection/PipeEventCollector.java | 22 ++++-- .../task/stage/PipeTaskProcessorStage.java | 4 +- .../connector/PipeConnectorSubtask.java | 23 +++---- .../processor/PipeProcessorSubtask.java | 11 +-- .../PipeProcessorSubtaskExecutorTest.java | 4 +- .../TsFileResourceProgressIndexTest.java | 21 ++++++ .../iotdb/commons/conf/CommonConfig.java | 2 +- .../builtin/processor/DoNothingProcessor.java | 15 ++--- .../plugin/builtin/BuiltinPipePluginTest.java | 12 ++-- 11 files changed, 87 insertions(+), 115 deletions(-) delete mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index 506dd6adcd..c522cd6eb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.ZoneId; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Queue; import java.util.stream.Collectors; @@ -168,7 +169,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa tsFileManager.readLock(); try { pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + tsFileManager.size(false)); - pendingQueue.addAll( + + final Collection sequenceFileInsertionEvents = tsFileManager.getTsFileList(true).stream() .filter( resource -> @@ -184,8 +186,10 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa pattern, historicalDataExtractionStartTime, historicalDataExtractionEndTime)) - .collect(Collectors.toList())); - pendingQueue.addAll( + .collect(Collectors.toList()); + pendingQueue.addAll(sequenceFileInsertionEvents); + + final Collection unsequenceFileInsertionEvents = tsFileManager.getTsFileList(false).stream() .filter( resource -> @@ -201,11 +205,20 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa pattern, historicalDataExtractionStartTime, historicalDataExtractionEndTime)) - .collect(Collectors.toList())); + .collect(Collectors.toList()); + pendingQueue.addAll(unsequenceFileInsertionEvents); + pendingQueue.forEach( event -> event.increaseReferenceCount( PipeHistoricalDataRegionTsFileExtractor.class.getName())); + + LOGGER.info( + "Pipe: start to extract historical TsFile, data region {}, " + + "sequence file count {}, unsequence file count {}", + dataRegionId, + sequenceFileInsertionEvents.size(), + unsequenceFileInsertionEvents.size()); } finally { tsFileManager.readUnlock(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java deleted file mode 100644 index e1de3e1333..0000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.processor; - -import org.apache.iotdb.pipe.api.PipeProcessor; -import org.apache.iotdb.pipe.api.collector.EventCollector; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; -import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; - -import java.io.IOException; - -public class PipeDoNothingProcessor implements PipeProcessor { - - @Override - public void validate(PipeParameterValidator validator) { - // do nothing - } - - @Override - public void customize( - PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { - // do nothing - } - - @Override - public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) - throws IOException { - eventCollector.collect(tabletInsertionEvent); - } - - @Override - public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) - throws IOException { - eventCollector.collect(tsFileInsertionEvent); - } - - @Override - public void process(Event event, EventCollector eventCollector) throws IOException { - eventCollector.collect(event); - } - - @Override - public void close() { - // do nothing - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java index 8ec84529d0..bf57908c71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java @@ -23,16 +23,11 @@ import org.apache.iotdb.db.pipe.event.EnrichedEvent; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.event.Event; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.LinkedList; import java.util.Queue; public class PipeEventCollector implements EventCollector { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeEventCollector.class); - private final BoundedBlockingPendingQueue pendingQueue; private final Queue bufferQueue; @@ -64,4 +59,21 @@ public class PipeEventCollector implements EventCollector { bufferQueue.offer(event); } } + + /** + * Try to collect buffered events into pending queue. + * + * @return true if there are still buffered events after this operation, false otherwise. + */ + public synchronized boolean tryCollectBufferedEvents() { + while (!bufferQueue.isEmpty()) { + final Event bufferedEvent = bufferQueue.peek(); + if (pendingQueue.waitedOffer(bufferedEvent)) { + bufferQueue.poll(); + } else { + return true; + } + } + return false; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java index d187bab7e5..894e7d1499 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java @@ -21,13 +21,13 @@ package org.apache.iotdb.db.pipe.task.stage; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment; import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor; import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager; -import org.apache.iotdb.db.pipe.processor.PipeDoNothingProcessor; import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue; import org.apache.iotdb.db.pipe.task.connection.EventSupplier; import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector; @@ -68,7 +68,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage { PipeProcessorConstant.PROCESSOR_KEY, BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()) .equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()) - ? new PipeDoNothingProcessor() + ? new DoNothingProcessor() : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters); // validate and customize should be called before createSubtask. this allows extractor exposing diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index 31c73e6c46..c4ad416b1c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -53,10 +53,6 @@ public class PipeConnectorSubtask extends PipeSubtask { private final BoundedBlockingPendingQueue inputPendingQueue; private final PipeConnector outputPipeConnector; - // For heartbeat scheduling - private static final int HEARTBEAT_CHECK_INTERVAL = 1000; - private int executeOnceInvokedTimes; - // For thread pool to execute callbacks protected final DecoratingLock callbackDecoratingLock = new DecoratingLock(); protected ExecutorService subtaskCallbackListeningExecutor; @@ -68,7 +64,6 @@ public class PipeConnectorSubtask extends PipeSubtask { super(taskID); this.inputPendingQueue = inputPendingQueue; this.outputPipeConnector = outputPipeConnector; - executeOnceInvokedTimes = 0; } @Override @@ -94,15 +89,6 @@ public class PipeConnectorSubtask extends PipeSubtask { @Override protected synchronized boolean executeOnce() { - try { - if (executeOnceInvokedTimes++ % HEARTBEAT_CHECK_INTERVAL == 0) { - outputPipeConnector.heartbeat(); - } - } catch (Exception e) { - throw new PipeConnectionException( - "PipeConnector: failed to connect to the target system.", e); - } - final Event event = lastEvent != null ? lastEvent : inputPendingQueue.waitedPoll(); // Record this event for retrying on connection failure or other exceptions lastEvent = event; @@ -116,7 +102,14 @@ public class PipeConnectorSubtask extends PipeSubtask { } else if (event instanceof TsFileInsertionEvent) { outputPipeConnector.transfer((TsFileInsertionEvent) event); } else if (event instanceof PipeHeartbeatEvent) { - outputPipeConnector.transfer(event); + try { + outputPipeConnector.heartbeat(); + outputPipeConnector.transfer(event); + } catch (Exception e) { + throw new PipeConnectionException( + "PipeConnector: " + outputPipeConnector.getClass().getName() + " heartbeat failed", + e); + } ((PipeHeartbeatEvent) event).onTransferred(); } else { outputPipeConnector.transfer(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java index 0c7b96238c..1e0feb2abe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java @@ -22,9 +22,9 @@ package org.apache.iotdb.db.pipe.task.subtask.processor; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler; import org.apache.iotdb.db.pipe.task.connection.EventSupplier; +import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask; import org.apache.iotdb.pipe.api.PipeProcessor; -import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -47,7 +47,7 @@ public class PipeProcessorSubtask extends PipeSubtask { private final EventSupplier inputEventSupplier; private final PipeProcessor pipeProcessor; - private final EventCollector outputEventCollector; + private final PipeEventCollector outputEventCollector; private final AtomicBoolean isClosed; @@ -55,7 +55,7 @@ public class PipeProcessorSubtask extends PipeSubtask { String taskID, EventSupplier inputEventSupplier, PipeProcessor pipeProcessor, - EventCollector outputEventCollector) { + PipeEventCollector outputEventCollector) { super(taskID); this.inputEventSupplier = inputEventSupplier; this.pipeProcessor = pipeProcessor; @@ -89,7 +89,10 @@ public class PipeProcessorSubtask extends PipeSubtask { // Record the last event for retry when exception occurs lastEvent = event; if (event == null) { - return false; + // Though there is no event to process, there may still be some buffered events + // in the outputEventCollector. Return true if there are still buffered events, + // false otherwise. + return outputEventCollector.tryCollectBufferedEvents(); } try { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java index 143025f495..eea84c9bb6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java @@ -21,9 +21,9 @@ package org.apache.iotdb.db.pipe.execution; import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor; import org.apache.iotdb.db.pipe.task.connection.EventSupplier; +import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask; import org.apache.iotdb.pipe.api.PipeProcessor; -import org.apache.iotdb.pipe.api.collector.EventCollector; import org.junit.Before; import org.mockito.Mockito; @@ -42,6 +42,6 @@ public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest { "PipeProcessorSubtaskExecutorTest", mock(EventSupplier.class), mock(PipeProcessor.class), - mock(EventCollector.class))); + mock(PipeEventCollector.class))); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java index 62b9caa2f3..0dd80260ce 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -197,4 +198,24 @@ public class TsFileResourceProgressIndexTest { throw new UnsupportedOperationException("method not implemented."); } } + + @Test + public void testHybridProgressIndex() { + final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex(1, 123L); + final RecoverProgressIndex recoverProgressIndex = + new RecoverProgressIndex(1, new SimpleProgressIndex(2, 2)); + final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex(); + + hybridProgressIndex.updateToMinimumIsAfterProgressIndex(ioTProgressIndex); + hybridProgressIndex.updateToMinimumIsAfterProgressIndex(recoverProgressIndex); + + Assert.assertTrue(hybridProgressIndex.isAfter(new IoTProgressIndex(1, 100L))); + Assert.assertTrue( + hybridProgressIndex.isAfter(new RecoverProgressIndex(1, new SimpleProgressIndex(1, 2)))); + + Assert.assertFalse(hybridProgressIndex.isAfter(new IoTProgressIndex(1, 200L))); + Assert.assertFalse(hybridProgressIndex.isAfter(new IoTProgressIndex(2, 200L))); + Assert.assertFalse( + hybridProgressIndex.isAfter(new RecoverProgressIndex(1, new SimpleProgressIndex(2, 21)))); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index abc39f11ee..4226adb5b6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -159,7 +159,7 @@ public class CommonConfig { private int pipeExtractorAssignerDisruptorRingBufferSize = 65536; private int pipeExtractorMatcherCacheSize = 1024; - private int pipeExtractorPendingQueueCapacity = 16; + private int pipeExtractorPendingQueueCapacity = 256; private int pipeExtractorPendingQueueTabletLimit = pipeExtractorPendingQueueCapacity / 2; private int pipeDataStructureTabletRowSize = 2048; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java index 535481712a..0c70cb6620 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java @@ -30,41 +30,38 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import java.io.IOException; -/** This class is a placeholder and should not be used. */ public class DoNothingProcessor implements PipeProcessor { - private static final String PLACEHOLDER_ERROR_MSG = - "This class is a placeholder and should not be used."; @Override public void validate(PipeParameterValidator validator) { - throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + // do nothing } @Override public void customize( PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { - throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + // do nothing } @Override public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) throws IOException { - throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + eventCollector.collect(tabletInsertionEvent); } @Override public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws IOException { - throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + eventCollector.collect(tsFileInsertionEvent); } @Override public void process(Event event, EventCollector eventCollector) throws IOException { - throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + eventCollector.collect(event); } @Override public void close() { - throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + // do nothing } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePluginTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePluginTest.java index df76817362..89e893c734 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePluginTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePluginTest.java @@ -76,34 +76,34 @@ public class BuiltinPipePluginTest { PipeProcessor processor = new DoNothingProcessor(); try { processor.validate(mock(PipeParameterValidator.class)); - Assert.fail(); } catch (Exception ignored) { + Assert.fail(); } try { processor.customize( mock(PipeParameters.class), mock(PipeProcessorRuntimeConfiguration.class)); - Assert.fail(); } catch (Exception ignored) { + Assert.fail(); } try { processor.process(mock(TabletInsertionEvent.class), mock(EventCollector.class)); - Assert.fail(); } catch (Exception ignored) { + Assert.fail(); } try { processor.process(mock(TsFileInsertionEvent.class), mock(EventCollector.class)); - Assert.fail(); } catch (Exception ignored) { + Assert.fail(); } try { processor.process(mock(Event.class), mock(EventCollector.class)); - Assert.fail(); } catch (Exception ignored) { + Assert.fail(); } try { processor.close(); - Assert.fail(); } catch (Exception ignored) { + Assert.fail(); } PipeConnector connector = new DoNothingConnector(); -- GitLab