diff --git a/iotdb-connector/flink-sql-iotdb-connector/pom.xml b/iotdb-connector/flink-sql-iotdb-connector/pom.xml
index 0a23c56989536c8df40eed193233ddf983741c1d..5335e4fbee811a95b8a5485d7d447285d7185c5f 100644
--- a/iotdb-connector/flink-sql-iotdb-connector/pom.xml
+++ b/iotdb-connector/flink-sql-iotdb-connector/pom.xml
@@ -28,7 +28,7 @@
../../pom.xml
flink-sql-iotdb-connector
- IoTDB: Connector: Apache Flink SQL Connector
+ IoTDB: Connector: Apache Flink SQL
1.3.0-SNAPSHOT
UTF-8
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
new file mode 100644
index 0000000000000000000000000000000000000000..8ec9798e187b688c03442078845d7e0c5ecb8745
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class PipeCronEventInjector {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeCronEventInjector.class);
+
+ private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS = 1;
+
+ private static final ScheduledExecutorService CRON_EVENT_INJECTOR_EXECUTOR =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_RUNTIME_CRON_EVENT_INJECTOR.getName());
+
+ private Future> injectorFuture;
+
+ public synchronized void start() {
+ if (injectorFuture == null) {
+ injectorFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ CRON_EVENT_INJECTOR_EXECUTOR,
+ this::inject,
+ CRON_EVENT_INJECTOR_INTERVAL_SECONDS,
+ CRON_EVENT_INJECTOR_INTERVAL_SECONDS,
+ TimeUnit.SECONDS);
+ LOGGER.info("Pipe cron event injector is started successfully.");
+ }
+ }
+
+ private synchronized void inject() {
+ PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(false);
+ }
+
+ public synchronized void stop() {
+ if (injectorFuture != null) {
+ injectorFuture.cancel(false);
+ injectorFuture = null;
+ LOGGER.info("Pipe cron event injector is stopped successfully.");
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 0f91b34661143679053e59ba99769ec8842a1647..dacd51d4684ef683fc89c124ee903ab4d9b970d2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -43,7 +43,9 @@ public class PipeRuntimeAgent implements IService {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
- private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ private final PipeCronEventInjector pipeCronEventInjector = new PipeCronEventInjector();
private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
new SimpleConsensusProgressIndexAssigner();
@@ -66,6 +68,7 @@ public class PipeRuntimeAgent implements IService {
public synchronized void start() throws StartupException {
PipeConfig.getInstance().printAllConfigs();
PipeAgentLauncher.launchPipeTaskAgent();
+ pipeCronEventInjector.start();
isShutdown.set(false);
}
@@ -77,6 +80,7 @@ public class PipeRuntimeAgent implements IService {
}
isShutdown.set(true);
+ pipeCronEventInjector.stop();
PipeAgent.task().dropAllPipeTasks();
}
@@ -96,6 +100,7 @@ public class PipeRuntimeAgent implements IService {
}
////////////////////// Recover ProgressIndex Assigner //////////////////////
+
public void assignRecoverProgressIndexForTsFileRecovery(TsFileResource tsFileResource) {
tsFileResource.recoverProgressIndex(
new RecoverProgressIndex(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 660b9d962e4479bdb4d1007600ef0a174b00c317..e54203f0f7cb4936502da19fbc3fe54c7d48f385 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -843,6 +843,6 @@ public class PipeTaskAgent {
}
resp.setPipeMetaList(pipeMetaBinaryList);
- PipeInsertionDataNodeListener.getInstance().listenToHeartbeat();
+ PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 3c870a9ba15d11ee664edcc002acdb4c16c4ce53..2a5b8e0a47af3cab94a138bd6a8ba45ab2403a4e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -33,7 +33,7 @@ public class PipeConnectorConstant {
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE = true;
public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = "connector.batch.max-delay-seconds";
- public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 10;
+ public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1;
public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = "connector.batch.size-bytes";
public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index f194182b83270e7f86aa0e17b45fc01499100220..71d65a6b7cd6d2f074bf3bd5d8c7daa724e68f81 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -40,15 +40,19 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private long timeProcessed;
private long timeTransferred;
- public PipeHeartbeatEvent(String dataRegionId) {
+ private final boolean shouldPrintMessage;
+
+ public PipeHeartbeatEvent(String dataRegionId, boolean shouldPrintMessage) {
super(null, null);
this.dataRegionId = dataRegionId;
+ this.shouldPrintMessage = shouldPrintMessage;
}
- public PipeHeartbeatEvent(String dataRegionId, long timePublished) {
+ public PipeHeartbeatEvent(String dataRegionId, long timePublished, boolean shouldPrintMessage) {
super(null, null);
this.dataRegionId = dataRegionId;
this.timePublished = timePublished;
+ this.shouldPrintMessage = shouldPrintMessage;
}
@Override
@@ -60,7 +64,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
// PipeName == null indicates that the event is the raw event at disruptor,
// not the event copied and passed to the extractor
- if (pipeName != null && LOGGER.isInfoEnabled()) {
+ if (shouldPrintMessage && pipeName != null && LOGGER.isInfoEnabled()) {
LOGGER.info(this.toString());
}
return true;
@@ -74,7 +78,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
- return new PipeHeartbeatEvent(dataRegionId, timePublished);
+ return new PipeHeartbeatEvent(dataRegionId, timePublished, shouldPrintMessage);
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 72c1527987845a71f21355442a0bea044caa9884..3a95f9ccf26ee853a618dbea61e27801233831aa 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -49,8 +49,10 @@ public class PipeRealtimeEventFactory {
resource);
}
- public static PipeRealtimeEvent createRealtimeEvent(String dataRegionId) {
- return new PipeRealtimeEvent(new PipeHeartbeatEvent(dataRegionId), null, null, null);
+ public static PipeRealtimeEvent createRealtimeEvent(
+ String dataRegionId, boolean shouldPrintMessage) {
+ return new PipeRealtimeEvent(
+ new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, null);
}
private PipeRealtimeEventFactory() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index 7446bef72acb024e4e0aa185dc353b0c2ce802a4..e3c7a62b6d9980d30964eef94b7c3eefeb3830ca 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -128,13 +128,15 @@ public class PipeInsertionDataNodeListener {
PipeRealtimeEventFactory.createRealtimeEvent(walEntryHandler, insertNode, tsFileResource));
}
- public void listenToHeartbeat() {
+ public void listenToHeartbeat(boolean shouldPrintMessage) {
if (listenToInsertNodeExtractorCount.get() == 0 && listenToTsFileExtractorCount.get() == 0) {
return;
}
dataRegionId2Assigner.forEach(
- (key, value) -> value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(key)));
+ (key, value) ->
+ value.publishToAssign(
+ PipeRealtimeEventFactory.createRealtimeEvent(key, shouldPrintMessage)));
}
/////////////////////////////// singleton ///////////////////////////////
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 024d1c4386c7475149e8abc5f6773ce37c8cc51a..e1cb4a3ff1b1160847fba54aa4ae1e3049cb249a 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -131,6 +131,7 @@ public enum ThreadName {
PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
+ PIPE_RUNTIME_CRON_EVENT_INJECTOR("Pipe-Runtime-Cron-Event-Injector"),
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
@@ -267,6 +268,7 @@ public enum ThreadName {
PIPE_RUNTIME_META_SYNCER,
PIPE_RUNTIME_HEARTBEAT,
PIPE_RUNTIME_PROCEDURE_SUBMITTER,
+ PIPE_RUNTIME_CRON_EVENT_INJECTOR,
PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
PIPE_WAL_RESOURCE_TTL_CHECKER,
PIPE_RECEIVER_AIR_GAP_AGENT,