diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java index 26ac2d5f03140060460250d2f0209e1b3f650945..d95034a207b65dadd3769494e4bde45ca0dc45de 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java @@ -135,7 +135,7 @@ public interface PipeConnector extends PipePlugin { } /** - * This method is used to transfer the Event. + * This method is used to transfer the generic events, including HeartbeatEvent. * * @param event Event to be transferred * @throws PipeConnectionException if the connection is broken diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java index 4f3ecad35e450d1b59943b885f96c741caddcf80..56244574214ca0ca9452c6fe2019a9db9f112f7b 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java @@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.write.record.Tablet; import java.util.function.BiConsumer; -/** TabletInsertionEvent is used to define the event of data insertion. */ +/** {@link TabletInsertionEvent} is used to define the event of data insertion. */ public interface TabletInsertionEvent extends Event { /** diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java index 298bf803b549d36adccc087f11c89fbe9a2ed33e..815199e64671ae6077c2a318a3fd1e2d6ddcf685 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java @@ -22,8 +22,8 @@ package org.apache.iotdb.pipe.api.event.dml.insertion; import org.apache.iotdb.pipe.api.event.Event; /** - * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks, - * which is compressed and encoded, and requires IO cost for computational processing. + * {@link TsFileInsertionEvent} is used to define the event of writing TsFile. Event data stores in + * disks, which is compressed and encoded, and requires IO cost for computational processing. */ public interface TsFileInsertionEvent extends Event { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java index e9ecb26d40256646776c2e25c24a86e7b0d9e37a..660b9d962e4479bdb4d1007600ef0a174b00c317 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.task.PipeBuilder; import org.apache.iotdb.db.pipe.task.PipeTask; import org.apache.iotdb.db.pipe.task.PipeTaskBuilder; @@ -841,5 +842,7 @@ public class PipeTaskAgent { throw new TException(e); } resp.setPipeMetaList(pipeMetaBinaryList); + + PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java index 40968f4fa9f4df02d827c3fa162483cfb98500c0..dc82ef0565062b967aa8e383be3969c6637075f0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq; import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector; import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; @@ -249,7 +250,9 @@ public class IoTDBAirGapConnector extends IoTDBConnector { @Override public void transfer(Event event) { - LOGGER.warn("IoTDBAirGapConnector does not support transfer generic event: {}.", event); + if (!(event instanceof PipeHeartbeatEvent)) { + LOGGER.warn("IoTDBAirGapConnector does not support transfer generic event: {}.", event); + } } private void doTransfer( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java index 5b251a4947d5bc95bf99a50f841466377b28bb70..2fa427628978109f445842ad4150af172d922699 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData; import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnectorClient; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; @@ -196,7 +197,9 @@ public class IoTDBLegacyPipeConnector implements PipeConnector { @Override public void transfer(Event event) throws Exception { - LOGGER.warn("IoTDBLegacyPipeConnector does not support transfer generic event: {}.", event); + if (!(event instanceof PipeHeartbeatEvent)) { + LOGGER.warn("IoTDBLegacyPipeConnector does not support transfer generic event: {}.", event); + } } private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java index c649485fa8664b213928e314fbaa77eeeaec5e0a..02647965f8bebe39f6b5e4a138e724f43257c03d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java @@ -23,9 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.commons.concurrent.ThreadName; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq; @@ -38,6 +35,7 @@ import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTran import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler; import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector; import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; @@ -65,10 +63,7 @@ import java.io.IOException; import java.util.Comparator; import java.util.Optional; import java.util.PriorityQueue; -import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -86,10 +81,6 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector { private final IClientManager asyncPipeDataTransferClientManager; - private static final AtomicReference RETRY_TRIGGER = - new AtomicReference<>(); - private static final int RETRY_TRIGGER_INTERVAL_MINUTES = 1; - private final AtomicReference> retryTriggerFuture = new AtomicReference<>(); private final IoTDBThriftSyncConnector retryConnector = new IoTDBThriftSyncConnector(); private final PriorityBlockingQueue> retryEventQueue = new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left)); @@ -350,7 +341,9 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector { transferQueuedEventsIfNecessary(); transferBatchedEventsIfNecessary(); - LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic event: {}.", event); + if (!(event instanceof PipeHeartbeatEvent)) { + LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic event: {}.", event); + } } private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint targetNodeUrl) @@ -543,46 +536,12 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector { * @param event event to retry */ public void addFailureEventToRetryQueue(long requestCommitId, Event event) { - if (RETRY_TRIGGER.get() == null) { - synchronized (IoTDBThriftAsyncConnector.class) { - if (RETRY_TRIGGER.get() == null) { - RETRY_TRIGGER.set( - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( - ThreadName.PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER.getName())); - } - } - } - - if (retryTriggerFuture.get() == null) { - synchronized (IoTDBThriftAsyncConnector.class) { - if (retryTriggerFuture.get() == null) { - retryTriggerFuture.set( - ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - RETRY_TRIGGER.get(), - () -> { - try { - transferQueuedEventsIfNecessary(); - } catch (Exception e) { - LOGGER.warn("Failed to trigger retry.", e); - } - }, - RETRY_TRIGGER_INTERVAL_MINUTES, - RETRY_TRIGGER_INTERVAL_MINUTES, - TimeUnit.MINUTES)); - } - } - } - retryEventQueue.offer(new Pair<>(requestCommitId, event)); } @Override // synchronized to avoid close connector when transfer event public synchronized void close() throws Exception { - if (retryTriggerFuture.get() != null) { - retryTriggerFuture.get().cancel(false); - } - retryConnector.close(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java index 5a1e4c88db106ed80f96f5fc0ca02594b096a20e..e266e0180a08e6bd386f9af11ec05fedd8ec70f0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq; import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector; import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; @@ -258,8 +259,15 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector { } @Override - public void transfer(Event event) { - LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic event: {}.", event); + public void transfer(Event event) throws TException, IOException { + // in order to commit in order + if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) { + doTransfer(clients.get(nextClientIndex())); + } + + if (!(event instanceof PipeHeartbeatEvent)) { + LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic event: {}.", event); + } } private void doTransfer(IoTDBThriftSyncConnectorClient client) throws IOException, TException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..f194182b83270e7f86aa0e17b45fc01499100220 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.heartbeat; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.utils.DateTimeUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PipeHeartbeatEvent extends EnrichedEvent { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class); + + private final String dataRegionId; + private String pipeName; + + private long timePublished; + private long timeAssigned; + private long timeProcessed; + private long timeTransferred; + + public PipeHeartbeatEvent(String dataRegionId) { + super(null, null); + this.dataRegionId = dataRegionId; + } + + public PipeHeartbeatEvent(String dataRegionId, long timePublished) { + super(null, null); + this.dataRegionId = dataRegionId; + this.timePublished = timePublished; + } + + @Override + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { + return true; + } + + @Override + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { + // PipeName == null indicates that the event is the raw event at disruptor, + // not the event copied and passed to the extractor + if (pipeName != null && LOGGER.isInfoEnabled()) { + LOGGER.info(this.toString()); + } + return true; + } + + @Override + public ProgressIndex getProgressIndex() { + return new MinimumProgressIndex(); + } + + @Override + public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + PipeTaskMeta pipeTaskMeta, String pattern) { + return new PipeHeartbeatEvent(dataRegionId, timePublished); + } + + @Override + public boolean isGeneratedByPipe() { + return false; + } + + /////////////////////////////// Delay Reporting /////////////////////////////// + + public void bindPipeName(String pipeName) { + this.pipeName = pipeName; + } + + public void onPublished() { + timePublished = System.currentTimeMillis(); + } + + public void onAssigned() { + timeAssigned = System.currentTimeMillis(); + } + + public void onProcessed() { + timeProcessed = System.currentTimeMillis(); + } + + public void onTransferred() { + timeTransferred = System.currentTimeMillis(); + } + + @Override + public String toString() { + final String errorMessage = "error"; + + final String publishedToAssignedMessage = + timeAssigned != 0 ? (timeAssigned - timePublished) + "ms" : errorMessage; + final String assignedToProcessedMessage = + timeProcessed != 0 ? (timeProcessed - timeAssigned) + "ms" : errorMessage; + final String processedToTransferredMessage = + timeTransferred != 0 ? (timeTransferred - timeProcessed) + "ms" : errorMessage; + final String totalTimeMessage = + timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" : errorMessage; + + return "PipeHeartbeatEvent{" + + "pipeName='" + + pipeName + + "', dataRegionId=" + + dataRegionId + + ", startTime=" + + DateTimeUtils.convertLongToDate(timePublished, "ms") + + ", publishedToAssigned=" + + publishedToAssignedMessage + + ", assignedToProcessed=" + + assignedToProcessedMessage + + ", processedToTransferred=" + + processedToTransferredMessage + + ", totalTimeCost=" + + totalTimeMessage + + "}"; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index 6af720da2f2515597f76ac9403dd577a79ead405..72c1527987845a71f21355442a0bea044caa9884 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.event.realtime; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpochManager; @@ -48,6 +49,10 @@ public class PipeRealtimeEventFactory { resource); } + public static PipeRealtimeEvent createRealtimeEvent(String dataRegionId) { + return new PipeRealtimeEvent(new PipeHeartbeatEvent(dataRegionId), null, null, null); + } + private PipeRealtimeEventFactory() { // factory class, do not instantiate } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java index 22658fb3b422693e1367e43a239e8be92b1cbbad..a393662ed7f50a95df32dfe25c9b861eba259e2e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java @@ -34,6 +34,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { protected String pattern; protected boolean isForwardingPipeRequests; + protected String pipeName; protected String dataRegionId; protected PipeTaskMeta pipeTaskMeta; @@ -60,6 +61,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { final PipeTaskExtractorRuntimeEnvironment environment = (PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment(); + pipeName = environment.getPipeName(); dataRegionId = String.valueOf(environment.getRegionId()); pipeTaskMeta = environment.getPipeTaskMeta(); } @@ -89,6 +91,10 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { return isForwardingPipeRequests; } + public final String getPipeName() { + return pipeName; + } + public final PipeTaskMeta getPipeTaskMeta() { return pipeTaskMeta; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index 9b8ff792be4a6c15c556685b19ae13af87494839..b9e8f275e289a3c63636bfd2f195c1dee03786a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.extractor.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue; @@ -53,6 +54,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio extractTabletInsertion(event); } else if (eventToExtract instanceof TsFileInsertionEvent) { extractTsFileInsertion(event); + } else if (eventToExtract instanceof PipeHeartbeatEvent) { + extractHeartbeat(event); } else { throw new UnsupportedOperationException( String.format( @@ -151,6 +154,24 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } } + private void extractHeartbeat(PipeRealtimeEvent event) { + if (!pendingQueue.waitedOffer(event)) { + // this would not happen, but just in case. + // pendingQueue is unbounded, so it should never reach capacity. + LOGGER.error( + "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} " + + "has reached capacity, discard heartbeat event {}", + this, + event); + + // Do not report exception since the PipeHeartbeatEvent doesn't affect the correction of + // pipe progress. + + // ignore this event. + event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); + } + } + private boolean isApproachingCapacity() { return pendingQueue.size() >= PipeConfig.getInstance().getPipeExtractorPendingQueueTabletLimit(); @@ -169,6 +190,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio suppliedEvent = supplyTabletInsertion(realtimeEvent); } else if (eventToSupply instanceof TsFileInsertionEvent) { suppliedEvent = supplyTsFileInsertion(realtimeEvent); + } else if (eventToSupply instanceof PipeHeartbeatEvent) { + suppliedEvent = supplyHeartbeat(realtimeEvent); } else { throw new UnsupportedOperationException( String.format( @@ -253,6 +276,23 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio return null; } + private Event supplyHeartbeat(PipeRealtimeEvent event) { + if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { + return event.getEvent(); + } else { + // this would not happen, but just in case. + LOGGER.error( + "Heartbeat Event {} can not be supplied because " + + "the reference count can not be increased", + event.getEvent()); + + // Do not report exception since the PipeHeartbeatEvent doesn't affect the correction of pipe + // progress. + + return null; + } + } + @Override public void close() throws Exception { super.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java index 156c889171072b6de58f49be7535784c7d5a9208..14e899300b858adbac7b13fe8b3c0407974636f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue; @@ -45,6 +46,11 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx @Override public void extract(PipeRealtimeEvent event) { + if (event.getEvent() instanceof PipeHeartbeatEvent) { + extractHeartbeat(event); + return; + } + event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET); if (!(event.getEvent() instanceof TabletInsertionEvent)) { @@ -68,6 +74,24 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx } } + private void extractHeartbeat(PipeRealtimeEvent event) { + if (!pendingQueue.waitedOffer(event)) { + // this would not happen, but just in case. + // pendingQueue is unbounded, so it should never reach capacity. + LOGGER.error( + "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} " + + "has reached capacity, discard heartbeat event {}", + this, + event); + + // Do not report exception since the PipeHeartbeatEvent doesn't affect the correction of + // pipe progress. + + // ignore this event. + event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); + } + } + @Override public boolean isNeedListenToTsFile() { return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java index cae2e6f3c1b71f824d26799cf3d32fb35f29c190..9546e35906dbf7cabc716e3d9a2a4b4c890bc405 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue; @@ -45,6 +46,11 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio @Override public void extract(PipeRealtimeEvent event) { + if (event.getEvent() instanceof PipeHeartbeatEvent) { + extractHeartbeat(event); + return; + } + event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); if (!(event.getEvent() instanceof TsFileInsertionEvent)) { @@ -68,6 +74,24 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio } } + private void extractHeartbeat(PipeRealtimeEvent event) { + if (!pendingQueue.waitedOffer(event)) { + // This would not happen, but just in case. + // Pending is unbounded, so it should never reach capacity. + LOGGER.error( + "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} " + + "has reached capacity, discard heartbeat event {}", + this, + event); + + // Do not report exception since the PipeHeartbeatEvent doesn't affect the correction of + // pipe progress. + + // Ignore the event. + event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); + } + } + @Override public boolean isNeedListenToTsFile() { return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java index 1177634be9c72b94deca1f18f2b91b1c480f7bac..c50e15bc3923defa10854086db5c29f85bf257c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.pipe.extractor.realtime.assigner; +import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher; @@ -39,7 +41,12 @@ public class PipeDataRegionAssigner { public void publishToAssign(PipeRealtimeEvent event) { event.increaseReferenceCount(PipeDataRegionAssigner.class.getName()); + disruptor.publish(event); + + if (event.getEvent() instanceof PipeHeartbeatEvent) { + ((PipeHeartbeatEvent) event.getEvent()).onPublished(); + } } public void assignToExtractor(PipeRealtimeEvent event, long sequence, boolean endOfBatch) { @@ -54,8 +61,15 @@ public class PipeDataRegionAssigner { final PipeRealtimeEvent copiedEvent = event.shallowCopySelfAndBindPipeTaskMetaForProgressReport( extractor.getPipeTaskMeta(), extractor.getPattern()); + copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName()); extractor.extract(copiedEvent); + + final EnrichedEvent innerEvent = copiedEvent.getEvent(); + if (innerEvent instanceof PipeHeartbeatEvent) { + ((PipeHeartbeatEvent) innerEvent).bindPipeName(extractor.getPipeName()); + ((PipeHeartbeatEvent) innerEvent).onAssigned(); + } }); event.gcSchemaInfo(); event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java index 35648b1bb5c4e10be7cbd019781256838fd316f7..7446bef72acb024e4e0aa185dc353b0c2ce802a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java @@ -128,6 +128,15 @@ public class PipeInsertionDataNodeListener { PipeRealtimeEventFactory.createRealtimeEvent(walEntryHandler, insertNode, tsFileResource)); } + public void listenToHeartbeat() { + if (listenToInsertNodeExtractorCount.get() == 0 && listenToTsFileExtractorCount.get() == 0) { + return; + } + + dataRegionId2Assigner.forEach( + (key, value) -> value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(key))); + } + /////////////////////////////// singleton /////////////////////////////// private PipeInsertionDataNodeListener() {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java index bc9621d31df588ceed2ebc834cb6617a30d42e71..32c0fb2e9d0af1f281f710c8d1328ffd1d8d98c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.extractor.realtime.matcher; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -94,6 +95,11 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { return matchedExtractors; } + // HeartbeatEvent will be assigned to all extractors + if (event.getEvent() instanceof PipeHeartbeatEvent) { + return extractors; + } + for (final Map.Entry entry : event.getSchemaInfo().entrySet()) { final String device = entry.getKey(); final String[] measurements = entry.getValue(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index da756d9d60527f64d99e41cae18d9c5d2b02ac68..31c73e6c468b173990cdb728c74c279e3dd2bd9a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler; import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue; import org.apache.iotdb.db.pipe.task.subtask.DecoratingLock; @@ -114,6 +115,9 @@ public class PipeConnectorSubtask extends PipeSubtask { outputPipeConnector.transfer((TabletInsertionEvent) event); } else if (event instanceof TsFileInsertionEvent) { outputPipeConnector.transfer((TsFileInsertionEvent) event); + } else if (event instanceof PipeHeartbeatEvent) { + outputPipeConnector.transfer(event); + ((PipeHeartbeatEvent) event).onTransferred(); } else { outputPipeConnector.transfer(event); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java index ab3fac5e60055ae906e092a835c5cf3d8a3b5880..0c7b96238c1add0669c717b66ff31fa8f9ede2f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.task.subtask.processor; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler; import org.apache.iotdb.db.pipe.task.connection.EventSupplier; import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask; @@ -85,7 +86,7 @@ public class PipeProcessorSubtask extends PipeSubtask { @Override protected synchronized boolean executeOnce() throws Exception { final Event event = lastEvent != null ? lastEvent : inputEventSupplier.supply(); - // record the last event for retry when exception occurs + // Record the last event for retry when exception occurs lastEvent = event; if (event == null) { return false; @@ -96,6 +97,9 @@ public class PipeProcessorSubtask extends PipeSubtask { pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector); } else if (event instanceof TsFileInsertionEvent) { pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector); + } else if (event instanceof PipeHeartbeatEvent) { + pipeProcessor.process(event, outputEventCollector); + ((PipeHeartbeatEvent) event).onProcessed(); } else { pipeProcessor.process(event, outputEventCollector); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index ebc7a2ef5119c8fa21561476ff46f92c0e5eaab8..024d1c4386c7475149e8abc5f6773ce37c8cc51a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -132,7 +132,6 @@ public enum ThreadName { PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"), PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"), PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"), - PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER("Pipe-Async-Connector-Retry-Trigger"), PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"), PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"), WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"), @@ -269,7 +268,6 @@ public enum ThreadName { PIPE_RUNTIME_HEARTBEAT, PIPE_RUNTIME_PROCEDURE_SUBMITTER, PIPE_ASYNC_CONNECTOR_CLIENT_POOL, - PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER, PIPE_WAL_RESOURCE_TTL_CHECKER, PIPE_RECEIVER_AIR_GAP_AGENT, WINDOW_EVALUATION_SERVICE,