未验证 提交 c4b72fc7 编写于 作者: C Caideyipi 提交者: GitHub

[IOTDB-6126] Pipe: Added PipeHeartbeatEvent to trigger delay reporting, batch...

[IOTDB-6126] Pipe: Added PipeHeartbeatEvent to trigger delay reporting, batch transferring and async retrying (#10940)
Co-authored-by: NSteve Yurong Su <rong@apache.org>
上级 489be30b
......@@ -135,7 +135,7 @@ public interface PipeConnector extends PipePlugin {
}
/**
* This method is used to transfer the Event.
* This method is used to transfer the generic events, including HeartbeatEvent.
*
* @param event Event to be transferred
* @throws PipeConnectionException if the connection is broken
......
......@@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
import java.util.function.BiConsumer;
/** TabletInsertionEvent is used to define the event of data insertion. */
/** {@link TabletInsertionEvent} is used to define the event of data insertion. */
public interface TabletInsertionEvent extends Event {
/**
......
......@@ -22,8 +22,8 @@ package org.apache.iotdb.pipe.api.event.dml.insertion;
import org.apache.iotdb.pipe.api.event.Event;
/**
* TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
* which is compressed and encoded, and requires IO cost for computational processing.
* {@link TsFileInsertionEvent} is used to define the event of writing TsFile. Event data stores in
* disks, which is compressed and encoded, and requires IO cost for computational processing.
*/
public interface TsFileInsertionEvent extends Event {
......
......@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.task.PipeBuilder;
import org.apache.iotdb.db.pipe.task.PipeTask;
import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
......@@ -841,5 +842,7 @@ public class PipeTaskAgent {
throw new TException(e);
}
resp.setPipeMetaList(pipeMetaBinaryList);
PipeInsertionDataNodeListener.getInstance().listenToHeartbeat();
}
}
......@@ -29,6 +29,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
......@@ -249,7 +250,9 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
@Override
public void transfer(Event event) {
LOGGER.warn("IoTDBAirGapConnector does not support transfer generic event: {}.", event);
if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn("IoTDBAirGapConnector does not support transfer generic event: {}.", event);
}
}
private void doTransfer(
......
......@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnectorClient;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
......@@ -196,7 +197,9 @@ public class IoTDBLegacyPipeConnector implements PipeConnector {
@Override
public void transfer(Event event) throws Exception {
LOGGER.warn("IoTDBLegacyPipeConnector does not support transfer generic event: {}.", event);
if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn("IoTDBLegacyPipeConnector does not support transfer generic event: {}.", event);
}
}
private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent)
......
......@@ -23,9 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
......@@ -38,6 +35,7 @@ import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTran
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
......@@ -65,10 +63,7 @@ import java.io.IOException;
import java.util.Comparator;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
......@@ -86,10 +81,6 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector {
private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
asyncPipeDataTransferClientManager;
private static final AtomicReference<ScheduledExecutorService> RETRY_TRIGGER =
new AtomicReference<>();
private static final int RETRY_TRIGGER_INTERVAL_MINUTES = 1;
private final AtomicReference<Future<?>> retryTriggerFuture = new AtomicReference<>();
private final IoTDBThriftSyncConnector retryConnector = new IoTDBThriftSyncConnector();
private final PriorityBlockingQueue<Pair<Long, Event>> retryEventQueue =
new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
......@@ -350,7 +341,9 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector {
transferQueuedEventsIfNecessary();
transferBatchedEventsIfNecessary();
LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic event: {}.", event);
if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic event: {}.", event);
}
}
private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint targetNodeUrl)
......@@ -543,46 +536,12 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector {
* @param event event to retry
*/
public void addFailureEventToRetryQueue(long requestCommitId, Event event) {
if (RETRY_TRIGGER.get() == null) {
synchronized (IoTDBThriftAsyncConnector.class) {
if (RETRY_TRIGGER.get() == null) {
RETRY_TRIGGER.set(
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER.getName()));
}
}
}
if (retryTriggerFuture.get() == null) {
synchronized (IoTDBThriftAsyncConnector.class) {
if (retryTriggerFuture.get() == null) {
retryTriggerFuture.set(
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
RETRY_TRIGGER.get(),
() -> {
try {
transferQueuedEventsIfNecessary();
} catch (Exception e) {
LOGGER.warn("Failed to trigger retry.", e);
}
},
RETRY_TRIGGER_INTERVAL_MINUTES,
RETRY_TRIGGER_INTERVAL_MINUTES,
TimeUnit.MINUTES));
}
}
}
retryEventQueue.offer(new Pair<>(requestCommitId, event));
}
@Override
// synchronized to avoid close connector when transfer event
public synchronized void close() throws Exception {
if (retryTriggerFuture.get() != null) {
retryTriggerFuture.get().cancel(false);
}
retryConnector.close();
}
}
......@@ -32,6 +32,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
......@@ -258,8 +259,15 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
}
@Override
public void transfer(Event event) {
LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic event: {}.", event);
public void transfer(Event event) throws TException, IOException {
// in order to commit in order
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
doTransfer(clients.get(nextClientIndex()));
}
if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic event: {}.", event);
}
}
private void doTransfer(IoTDBThriftSyncConnectorClient client) throws IOException, TException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.event.common.heartbeat;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PipeHeartbeatEvent extends EnrichedEvent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class);
private final String dataRegionId;
private String pipeName;
private long timePublished;
private long timeAssigned;
private long timeProcessed;
private long timeTransferred;
public PipeHeartbeatEvent(String dataRegionId) {
super(null, null);
this.dataRegionId = dataRegionId;
}
public PipeHeartbeatEvent(String dataRegionId, long timePublished) {
super(null, null);
this.dataRegionId = dataRegionId;
this.timePublished = timePublished;
}
@Override
public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
return true;
}
@Override
public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
// PipeName == null indicates that the event is the raw event at disruptor,
// not the event copied and passed to the extractor
if (pipeName != null && LOGGER.isInfoEnabled()) {
LOGGER.info(this.toString());
}
return true;
}
@Override
public ProgressIndex getProgressIndex() {
return new MinimumProgressIndex();
}
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeHeartbeatEvent(dataRegionId, timePublished);
}
@Override
public boolean isGeneratedByPipe() {
return false;
}
/////////////////////////////// Delay Reporting ///////////////////////////////
public void bindPipeName(String pipeName) {
this.pipeName = pipeName;
}
public void onPublished() {
timePublished = System.currentTimeMillis();
}
public void onAssigned() {
timeAssigned = System.currentTimeMillis();
}
public void onProcessed() {
timeProcessed = System.currentTimeMillis();
}
public void onTransferred() {
timeTransferred = System.currentTimeMillis();
}
@Override
public String toString() {
final String errorMessage = "error";
final String publishedToAssignedMessage =
timeAssigned != 0 ? (timeAssigned - timePublished) + "ms" : errorMessage;
final String assignedToProcessedMessage =
timeProcessed != 0 ? (timeProcessed - timeAssigned) + "ms" : errorMessage;
final String processedToTransferredMessage =
timeTransferred != 0 ? (timeTransferred - timeProcessed) + "ms" : errorMessage;
final String totalTimeMessage =
timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" : errorMessage;
return "PipeHeartbeatEvent{"
+ "pipeName='"
+ pipeName
+ "', dataRegionId="
+ dataRegionId
+ ", startTime="
+ DateTimeUtils.convertLongToDate(timePublished, "ms")
+ ", publishedToAssigned="
+ publishedToAssignedMessage
+ ", assignedToProcessed="
+ assignedToProcessedMessage
+ ", processedToTransferred="
+ processedToTransferredMessage
+ ", totalTimeCost="
+ totalTimeMessage
+ "}";
}
}
......@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.event.realtime;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpochManager;
......@@ -48,6 +49,10 @@ public class PipeRealtimeEventFactory {
resource);
}
public static PipeRealtimeEvent createRealtimeEvent(String dataRegionId) {
return new PipeRealtimeEvent(new PipeHeartbeatEvent(dataRegionId), null, null, null);
}
private PipeRealtimeEventFactory() {
// factory class, do not instantiate
}
......
......@@ -34,6 +34,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
protected String pattern;
protected boolean isForwardingPipeRequests;
protected String pipeName;
protected String dataRegionId;
protected PipeTaskMeta pipeTaskMeta;
......@@ -60,6 +61,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
final PipeTaskExtractorRuntimeEnvironment environment =
(PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment();
pipeName = environment.getPipeName();
dataRegionId = String.valueOf(environment.getRegionId());
pipeTaskMeta = environment.getPipeTaskMeta();
}
......@@ -89,6 +91,10 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
return isForwardingPipeRequests;
}
public final String getPipeName() {
return pipeName;
}
public final PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
}
......
......@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.extractor.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
......@@ -53,6 +54,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
extractTabletInsertion(event);
} else if (eventToExtract instanceof TsFileInsertionEvent) {
extractTsFileInsertion(event);
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else {
throw new UnsupportedOperationException(
String.format(
......@@ -151,6 +154,24 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
}
}
private void extractHeartbeat(PipeRealtimeEvent event) {
if (!pendingQueue.waitedOffer(event)) {
// this would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
LOGGER.error(
"extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
+ "has reached capacity, discard heartbeat event {}",
this,
event);
// Do not report exception since the PipeHeartbeatEvent doesn't affect the correction of
// pipe progress.
// ignore this event.
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
}
}
private boolean isApproachingCapacity() {
return pendingQueue.size()
>= PipeConfig.getInstance().getPipeExtractorPendingQueueTabletLimit();
......@@ -169,6 +190,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
suppliedEvent = supplyTabletInsertion(realtimeEvent);
} else if (eventToSupply instanceof TsFileInsertionEvent) {
suppliedEvent = supplyTsFileInsertion(realtimeEvent);
} else if (eventToSupply instanceof PipeHeartbeatEvent) {
suppliedEvent = supplyHeartbeat(realtimeEvent);
} else {
throw new UnsupportedOperationException(
String.format(
......@@ -253,6 +276,23 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
return null;
}
private Event supplyHeartbeat(PipeRealtimeEvent event) {
if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
return event.getEvent();
} else {
// this would not happen, but just in case.
LOGGER.error(
"Heartbeat Event {} can not be supplied because "
+ "the reference count can not be increased",
event.getEvent());
// Do not report exception since the PipeHeartbeatEvent doesn't affect the correction of pipe
// progress.
return null;
}
}
@Override
public void close() throws Exception {
super.close();
......
......@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
......@@ -45,6 +46,11 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
@Override
public void extract(PipeRealtimeEvent event) {
if (event.getEvent() instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
return;
}
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET);
if (!(event.getEvent() instanceof TabletInsertionEvent)) {
......@@ -68,6 +74,24 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
}
}
private void extractHeartbeat(PipeRealtimeEvent event) {
if (!pendingQueue.waitedOffer(event)) {
// this would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
LOGGER.error(
"extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
+ "has reached capacity, discard heartbeat event {}",
this,
event);
// Do not report exception since the PipeHeartbeatEvent doesn't affect the correction of
// pipe progress.
// ignore this event.
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
}
}
@Override
public boolean isNeedListenToTsFile() {
return false;
......
......@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
......@@ -45,6 +46,11 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
@Override
public void extract(PipeRealtimeEvent event) {
if (event.getEvent() instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
return;
}
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
......@@ -68,6 +74,24 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
}
}
private void extractHeartbeat(PipeRealtimeEvent event) {
if (!pendingQueue.waitedOffer(event)) {
// This would not happen, but just in case.
// Pending is unbounded, so it should never reach capacity.
LOGGER.error(
"extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
+ "has reached capacity, discard heartbeat event {}",
this,
event);
// Do not report exception since the PipeHeartbeatEvent doesn't affect the correction of
// pipe progress.
// Ignore the event.
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
}
}
@Override
public boolean isNeedListenToTsFile() {
return true;
......
......@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher;
......@@ -39,7 +41,12 @@ public class PipeDataRegionAssigner {
public void publishToAssign(PipeRealtimeEvent event) {
event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
disruptor.publish(event);
if (event.getEvent() instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) event.getEvent()).onPublished();
}
}
public void assignToExtractor(PipeRealtimeEvent event, long sequence, boolean endOfBatch) {
......@@ -54,8 +61,15 @@ public class PipeDataRegionAssigner {
final PipeRealtimeEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
extractor.getPipeTaskMeta(), extractor.getPattern());
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
extractor.extract(copiedEvent);
final EnrichedEvent innerEvent = copiedEvent.getEvent();
if (innerEvent instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) innerEvent).bindPipeName(extractor.getPipeName());
((PipeHeartbeatEvent) innerEvent).onAssigned();
}
});
event.gcSchemaInfo();
event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
......
......@@ -128,6 +128,15 @@ public class PipeInsertionDataNodeListener {
PipeRealtimeEventFactory.createRealtimeEvent(walEntryHandler, insertNode, tsFileResource));
}
public void listenToHeartbeat() {
if (listenToInsertNodeExtractorCount.get() == 0 && listenToTsFileExtractorCount.get() == 0) {
return;
}
dataRegionId2Assigner.forEach(
(key, value) -> value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(key)));
}
/////////////////////////////// singleton ///////////////////////////////
private PipeInsertionDataNodeListener() {}
......
......@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.extractor.realtime.matcher;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
......@@ -94,6 +95,11 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
return matchedExtractors;
}
// HeartbeatEvent will be assigned to all extractors
if (event.getEvent() instanceof PipeHeartbeatEvent) {
return extractors;
}
for (final Map.Entry<String, String[]> entry : event.getSchemaInfo().entrySet()) {
final String device = entry.getKey();
final String[] measurements = entry.getValue();
......
......@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.DecoratingLock;
......@@ -114,6 +115,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
outputPipeConnector.transfer((TabletInsertionEvent) event);
} else if (event instanceof TsFileInsertionEvent) {
outputPipeConnector.transfer((TsFileInsertionEvent) event);
} else if (event instanceof PipeHeartbeatEvent) {
outputPipeConnector.transfer(event);
((PipeHeartbeatEvent) event).onTransferred();
} else {
outputPipeConnector.transfer(event);
}
......
......@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.task.subtask.processor;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
......@@ -85,7 +86,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
@Override
protected synchronized boolean executeOnce() throws Exception {
final Event event = lastEvent != null ? lastEvent : inputEventSupplier.supply();
// record the last event for retry when exception occurs
// Record the last event for retry when exception occurs
lastEvent = event;
if (event == null) {
return false;
......@@ -96,6 +97,9 @@ public class PipeProcessorSubtask extends PipeSubtask {
pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
((PipeHeartbeatEvent) event).onProcessed();
} else {
pipeProcessor.process(event, outputEventCollector);
}
......
......@@ -132,7 +132,6 @@ public enum ThreadName {
PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER("Pipe-Async-Connector-Retry-Trigger"),
PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
......@@ -269,7 +268,6 @@ public enum ThreadName {
PIPE_RUNTIME_HEARTBEAT,
PIPE_RUNTIME_PROCEDURE_SUBMITTER,
PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER,
PIPE_WAL_RESOURCE_TTL_CHECKER,
PIPE_RECEIVER_AIR_GAP_AGENT,
WINDOW_EVALUATION_SERVICE,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册