未验证 提交 1ae952ce 编写于 作者: S Steve Yurong Su 提交者: GitHub

[IOTDB-6127] Pipe: buffered events in processor stage can not be consumed by connector (#10962)

上级 bd588e27
......@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Queue;
import java.util.stream.Collectors;
......@@ -168,7 +169,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
tsFileManager.readLock();
try {
pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + tsFileManager.size(false));
pendingQueue.addAll(
final Collection<PipeTsFileInsertionEvent> sequenceFileInsertionEvents =
tsFileManager.getTsFileList(true).stream()
.filter(
resource ->
......@@ -184,8 +186,10 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
pattern,
historicalDataExtractionStartTime,
historicalDataExtractionEndTime))
.collect(Collectors.toList()));
pendingQueue.addAll(
.collect(Collectors.toList());
pendingQueue.addAll(sequenceFileInsertionEvents);
final Collection<PipeTsFileInsertionEvent> unsequenceFileInsertionEvents =
tsFileManager.getTsFileList(false).stream()
.filter(
resource ->
......@@ -201,11 +205,20 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
pattern,
historicalDataExtractionStartTime,
historicalDataExtractionEndTime))
.collect(Collectors.toList()));
.collect(Collectors.toList());
pendingQueue.addAll(unsequenceFileInsertionEvents);
pendingQueue.forEach(
event ->
event.increaseReferenceCount(
PipeHistoricalDataRegionTsFileExtractor.class.getName()));
LOGGER.info(
"Pipe: start to extract historical TsFile, data region {}, "
+ "sequence file count {}, unsequence file count {}",
dataRegionId,
sequenceFileInsertionEvents.size(),
unsequenceFileInsertionEvents.size());
} finally {
tsFileManager.readUnlock();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.processor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.io.IOException;
public class PipeDoNothingProcessor implements PipeProcessor {
@Override
public void validate(PipeParameterValidator validator) {
// do nothing
}
@Override
public void customize(
PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) {
// do nothing
}
@Override
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
throws IOException {
eventCollector.collect(tabletInsertionEvent);
}
@Override
public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
throws IOException {
eventCollector.collect(tsFileInsertionEvent);
}
@Override
public void process(Event event, EventCollector eventCollector) throws IOException {
eventCollector.collect(event);
}
@Override
public void close() {
// do nothing
}
}
......@@ -23,16 +23,11 @@ import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.Queue;
public class PipeEventCollector implements EventCollector {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeEventCollector.class);
private final BoundedBlockingPendingQueue<Event> pendingQueue;
private final Queue<Event> bufferQueue;
......@@ -64,4 +59,21 @@ public class PipeEventCollector implements EventCollector {
bufferQueue.offer(event);
}
}
/**
* Try to collect buffered events into pending queue.
*
* @return true if there are still buffered events after this operation, false otherwise.
*/
public synchronized boolean tryCollectBufferedEvents() {
while (!bufferQueue.isEmpty()) {
final Event bufferedEvent = bufferQueue.peek();
if (pendingQueue.waitedOffer(bufferedEvent)) {
bufferQueue.poll();
} else {
return true;
}
}
return false;
}
}
......@@ -21,13 +21,13 @@ package org.apache.iotdb.db.pipe.task.stage;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.processor.PipeDoNothingProcessor;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
......@@ -68,7 +68,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
PipeProcessorConstant.PROCESSOR_KEY,
BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
? new PipeDoNothingProcessor()
? new DoNothingProcessor()
: PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
// validate and customize should be called before createSubtask. this allows extractor exposing
......
......@@ -53,10 +53,6 @@ public class PipeConnectorSubtask extends PipeSubtask {
private final BoundedBlockingPendingQueue<Event> inputPendingQueue;
private final PipeConnector outputPipeConnector;
// For heartbeat scheduling
private static final int HEARTBEAT_CHECK_INTERVAL = 1000;
private int executeOnceInvokedTimes;
// For thread pool to execute callbacks
protected final DecoratingLock callbackDecoratingLock = new DecoratingLock();
protected ExecutorService subtaskCallbackListeningExecutor;
......@@ -68,7 +64,6 @@ public class PipeConnectorSubtask extends PipeSubtask {
super(taskID);
this.inputPendingQueue = inputPendingQueue;
this.outputPipeConnector = outputPipeConnector;
executeOnceInvokedTimes = 0;
}
@Override
......@@ -94,15 +89,6 @@ public class PipeConnectorSubtask extends PipeSubtask {
@Override
protected synchronized boolean executeOnce() {
try {
if (executeOnceInvokedTimes++ % HEARTBEAT_CHECK_INTERVAL == 0) {
outputPipeConnector.heartbeat();
}
} catch (Exception e) {
throw new PipeConnectionException(
"PipeConnector: failed to connect to the target system.", e);
}
final Event event = lastEvent != null ? lastEvent : inputPendingQueue.waitedPoll();
// Record this event for retrying on connection failure or other exceptions
lastEvent = event;
......@@ -116,7 +102,14 @@ public class PipeConnectorSubtask extends PipeSubtask {
} else if (event instanceof TsFileInsertionEvent) {
outputPipeConnector.transfer((TsFileInsertionEvent) event);
} else if (event instanceof PipeHeartbeatEvent) {
outputPipeConnector.transfer(event);
try {
outputPipeConnector.heartbeat();
outputPipeConnector.transfer(event);
} catch (Exception e) {
throw new PipeConnectionException(
"PipeConnector: " + outputPipeConnector.getClass().getName() + " heartbeat failed",
e);
}
((PipeHeartbeatEvent) event).onTransferred();
} else {
outputPipeConnector.transfer(event);
......
......@@ -22,9 +22,9 @@ package org.apache.iotdb.db.pipe.task.subtask.processor;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
......@@ -47,7 +47,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
private final EventSupplier inputEventSupplier;
private final PipeProcessor pipeProcessor;
private final EventCollector outputEventCollector;
private final PipeEventCollector outputEventCollector;
private final AtomicBoolean isClosed;
......@@ -55,7 +55,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
String taskID,
EventSupplier inputEventSupplier,
PipeProcessor pipeProcessor,
EventCollector outputEventCollector) {
PipeEventCollector outputEventCollector) {
super(taskID);
this.inputEventSupplier = inputEventSupplier;
this.pipeProcessor = pipeProcessor;
......@@ -89,7 +89,10 @@ public class PipeProcessorSubtask extends PipeSubtask {
// Record the last event for retry when exception occurs
lastEvent = event;
if (event == null) {
return false;
// Though there is no event to process, there may still be some buffered events
// in the outputEventCollector. Return true if there are still buffered events,
// false otherwise.
return outputEventCollector.tryCollectBufferedEvents();
}
try {
......
......@@ -21,9 +21,9 @@ package org.apache.iotdb.db.pipe.execution;
import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.junit.Before;
import org.mockito.Mockito;
......@@ -42,6 +42,6 @@ public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
"PipeProcessorSubtaskExecutorTest",
mock(EventSupplier.class),
mock(PipeProcessor.class),
mock(EventCollector.class)));
mock(PipeEventCollector.class)));
}
}
......@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
......@@ -197,4 +198,24 @@ public class TsFileResourceProgressIndexTest {
throw new UnsupportedOperationException("method not implemented.");
}
}
@Test
public void testHybridProgressIndex() {
final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex(1, 123L);
final RecoverProgressIndex recoverProgressIndex =
new RecoverProgressIndex(1, new SimpleProgressIndex(2, 2));
final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
hybridProgressIndex.updateToMinimumIsAfterProgressIndex(ioTProgressIndex);
hybridProgressIndex.updateToMinimumIsAfterProgressIndex(recoverProgressIndex);
Assert.assertTrue(hybridProgressIndex.isAfter(new IoTProgressIndex(1, 100L)));
Assert.assertTrue(
hybridProgressIndex.isAfter(new RecoverProgressIndex(1, new SimpleProgressIndex(1, 2))));
Assert.assertFalse(hybridProgressIndex.isAfter(new IoTProgressIndex(1, 200L)));
Assert.assertFalse(hybridProgressIndex.isAfter(new IoTProgressIndex(2, 200L)));
Assert.assertFalse(
hybridProgressIndex.isAfter(new RecoverProgressIndex(1, new SimpleProgressIndex(2, 21))));
}
}
......@@ -159,7 +159,7 @@ public class CommonConfig {
private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
private int pipeExtractorMatcherCacheSize = 1024;
private int pipeExtractorPendingQueueCapacity = 16;
private int pipeExtractorPendingQueueCapacity = 256;
private int pipeExtractorPendingQueueTabletLimit = pipeExtractorPendingQueueCapacity / 2;
private int pipeDataStructureTabletRowSize = 2048;
......
......@@ -30,41 +30,38 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.io.IOException;
/** This class is a placeholder and should not be used. */
public class DoNothingProcessor implements PipeProcessor {
private static final String PLACEHOLDER_ERROR_MSG =
"This class is a placeholder and should not be used.";
@Override
public void validate(PipeParameterValidator validator) {
throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
// do nothing
}
@Override
public void customize(
PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) {
throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
// do nothing
}
@Override
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
throws IOException {
throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
eventCollector.collect(tabletInsertionEvent);
}
@Override
public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
throws IOException {
throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
eventCollector.collect(tsFileInsertionEvent);
}
@Override
public void process(Event event, EventCollector eventCollector) throws IOException {
throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
eventCollector.collect(event);
}
@Override
public void close() {
throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
// do nothing
}
}
......@@ -76,34 +76,34 @@ public class BuiltinPipePluginTest {
PipeProcessor processor = new DoNothingProcessor();
try {
processor.validate(mock(PipeParameterValidator.class));
Assert.fail();
} catch (Exception ignored) {
Assert.fail();
}
try {
processor.customize(
mock(PipeParameters.class), mock(PipeProcessorRuntimeConfiguration.class));
Assert.fail();
} catch (Exception ignored) {
Assert.fail();
}
try {
processor.process(mock(TabletInsertionEvent.class), mock(EventCollector.class));
Assert.fail();
} catch (Exception ignored) {
Assert.fail();
}
try {
processor.process(mock(TsFileInsertionEvent.class), mock(EventCollector.class));
Assert.fail();
} catch (Exception ignored) {
Assert.fail();
}
try {
processor.process(mock(Event.class), mock(EventCollector.class));
Assert.fail();
} catch (Exception ignored) {
Assert.fail();
}
try {
processor.close();
Assert.fail();
} catch (Exception ignored) {
Assert.fail();
}
PipeConnector connector = new DoNothingConnector();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册