未验证 提交 daebc419 编写于 作者: S Steve Yurong Su 提交者: GitHub

Pipe: Increase the injection frequency of HeartBeatEvent to reduce the delay...

Pipe: Increase the injection frequency of HeartBeatEvent to reduce the delay in log transferring (#10970)
上级 3073fe63
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<relativePath>../../pom.xml</relativePath> <relativePath>../../pom.xml</relativePath>
</parent> </parent>
<artifactId>flink-sql-iotdb-connector</artifactId> <artifactId>flink-sql-iotdb-connector</artifactId>
<name>IoTDB: Connector: Apache Flink SQL Connector</name> <name>IoTDB: Connector: Apache Flink SQL</name>
<version>1.3.0-SNAPSHOT</version> <version>1.3.0-SNAPSHOT</version>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.agent.runtime;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class PipeCronEventInjector {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeCronEventInjector.class);
private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS = 1;
private static final ScheduledExecutorService CRON_EVENT_INJECTOR_EXECUTOR =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.PIPE_RUNTIME_CRON_EVENT_INJECTOR.getName());
private Future<?> injectorFuture;
public synchronized void start() {
if (injectorFuture == null) {
injectorFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
CRON_EVENT_INJECTOR_EXECUTOR,
this::inject,
CRON_EVENT_INJECTOR_INTERVAL_SECONDS,
CRON_EVENT_INJECTOR_INTERVAL_SECONDS,
TimeUnit.SECONDS);
LOGGER.info("Pipe cron event injector is started successfully.");
}
}
private synchronized void inject() {
PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(false);
}
public synchronized void stop() {
if (injectorFuture != null) {
injectorFuture.cancel(false);
injectorFuture = null;
LOGGER.info("Pipe cron event injector is stopped successfully.");
}
}
}
...@@ -43,7 +43,9 @@ public class PipeRuntimeAgent implements IService { ...@@ -43,7 +43,9 @@ public class PipeRuntimeAgent implements IService {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class); private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private static final AtomicBoolean isShutdown = new AtomicBoolean(false); private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final PipeCronEventInjector pipeCronEventInjector = new PipeCronEventInjector();
private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner = private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
new SimpleConsensusProgressIndexAssigner(); new SimpleConsensusProgressIndexAssigner();
...@@ -66,6 +68,7 @@ public class PipeRuntimeAgent implements IService { ...@@ -66,6 +68,7 @@ public class PipeRuntimeAgent implements IService {
public synchronized void start() throws StartupException { public synchronized void start() throws StartupException {
PipeConfig.getInstance().printAllConfigs(); PipeConfig.getInstance().printAllConfigs();
PipeAgentLauncher.launchPipeTaskAgent(); PipeAgentLauncher.launchPipeTaskAgent();
pipeCronEventInjector.start();
isShutdown.set(false); isShutdown.set(false);
} }
...@@ -77,6 +80,7 @@ public class PipeRuntimeAgent implements IService { ...@@ -77,6 +80,7 @@ public class PipeRuntimeAgent implements IService {
} }
isShutdown.set(true); isShutdown.set(true);
pipeCronEventInjector.stop();
PipeAgent.task().dropAllPipeTasks(); PipeAgent.task().dropAllPipeTasks();
} }
...@@ -96,6 +100,7 @@ public class PipeRuntimeAgent implements IService { ...@@ -96,6 +100,7 @@ public class PipeRuntimeAgent implements IService {
} }
////////////////////// Recover ProgressIndex Assigner ////////////////////// ////////////////////// Recover ProgressIndex Assigner //////////////////////
public void assignRecoverProgressIndexForTsFileRecovery(TsFileResource tsFileResource) { public void assignRecoverProgressIndexForTsFileRecovery(TsFileResource tsFileResource) {
tsFileResource.recoverProgressIndex( tsFileResource.recoverProgressIndex(
new RecoverProgressIndex( new RecoverProgressIndex(
......
...@@ -843,6 +843,6 @@ public class PipeTaskAgent { ...@@ -843,6 +843,6 @@ public class PipeTaskAgent {
} }
resp.setPipeMetaList(pipeMetaBinaryList); resp.setPipeMetaList(pipeMetaBinaryList);
PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(); PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
} }
} }
...@@ -33,7 +33,7 @@ public class PipeConnectorConstant { ...@@ -33,7 +33,7 @@ public class PipeConnectorConstant {
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE = true; public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE = true;
public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = "connector.batch.max-delay-seconds"; public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = "connector.batch.max-delay-seconds";
public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 10; public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1;
public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = "connector.batch.size-bytes"; public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = "connector.batch.size-bytes";
public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB; public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
......
...@@ -40,15 +40,19 @@ public class PipeHeartbeatEvent extends EnrichedEvent { ...@@ -40,15 +40,19 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private long timeProcessed; private long timeProcessed;
private long timeTransferred; private long timeTransferred;
public PipeHeartbeatEvent(String dataRegionId) { private final boolean shouldPrintMessage;
public PipeHeartbeatEvent(String dataRegionId, boolean shouldPrintMessage) {
super(null, null); super(null, null);
this.dataRegionId = dataRegionId; this.dataRegionId = dataRegionId;
this.shouldPrintMessage = shouldPrintMessage;
} }
public PipeHeartbeatEvent(String dataRegionId, long timePublished) { public PipeHeartbeatEvent(String dataRegionId, long timePublished, boolean shouldPrintMessage) {
super(null, null); super(null, null);
this.dataRegionId = dataRegionId; this.dataRegionId = dataRegionId;
this.timePublished = timePublished; this.timePublished = timePublished;
this.shouldPrintMessage = shouldPrintMessage;
} }
@Override @Override
...@@ -60,7 +64,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { ...@@ -60,7 +64,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
// PipeName == null indicates that the event is the raw event at disruptor, // PipeName == null indicates that the event is the raw event at disruptor,
// not the event copied and passed to the extractor // not the event copied and passed to the extractor
if (pipeName != null && LOGGER.isInfoEnabled()) { if (shouldPrintMessage && pipeName != null && LOGGER.isInfoEnabled()) {
LOGGER.info(this.toString()); LOGGER.info(this.toString());
} }
return true; return true;
...@@ -74,7 +78,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { ...@@ -74,7 +78,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
@Override @Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) { PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeHeartbeatEvent(dataRegionId, timePublished); return new PipeHeartbeatEvent(dataRegionId, timePublished, shouldPrintMessage);
} }
@Override @Override
......
...@@ -49,8 +49,10 @@ public class PipeRealtimeEventFactory { ...@@ -49,8 +49,10 @@ public class PipeRealtimeEventFactory {
resource); resource);
} }
public static PipeRealtimeEvent createRealtimeEvent(String dataRegionId) { public static PipeRealtimeEvent createRealtimeEvent(
return new PipeRealtimeEvent(new PipeHeartbeatEvent(dataRegionId), null, null, null); String dataRegionId, boolean shouldPrintMessage) {
return new PipeRealtimeEvent(
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, null);
} }
private PipeRealtimeEventFactory() { private PipeRealtimeEventFactory() {
......
...@@ -128,13 +128,15 @@ public class PipeInsertionDataNodeListener { ...@@ -128,13 +128,15 @@ public class PipeInsertionDataNodeListener {
PipeRealtimeEventFactory.createRealtimeEvent(walEntryHandler, insertNode, tsFileResource)); PipeRealtimeEventFactory.createRealtimeEvent(walEntryHandler, insertNode, tsFileResource));
} }
public void listenToHeartbeat() { public void listenToHeartbeat(boolean shouldPrintMessage) {
if (listenToInsertNodeExtractorCount.get() == 0 && listenToTsFileExtractorCount.get() == 0) { if (listenToInsertNodeExtractorCount.get() == 0 && listenToTsFileExtractorCount.get() == 0) {
return; return;
} }
dataRegionId2Assigner.forEach( dataRegionId2Assigner.forEach(
(key, value) -> value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(key))); (key, value) ->
value.publishToAssign(
PipeRealtimeEventFactory.createRealtimeEvent(key, shouldPrintMessage)));
} }
/////////////////////////////// singleton /////////////////////////////// /////////////////////////////// singleton ///////////////////////////////
......
...@@ -131,6 +131,7 @@ public enum ThreadName { ...@@ -131,6 +131,7 @@ public enum ThreadName {
PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"), PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"), PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"), PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
PIPE_RUNTIME_CRON_EVENT_INJECTOR("Pipe-Runtime-Cron-Event-Injector"),
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"), PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"), PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"), PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
...@@ -267,6 +268,7 @@ public enum ThreadName { ...@@ -267,6 +268,7 @@ public enum ThreadName {
PIPE_RUNTIME_META_SYNCER, PIPE_RUNTIME_META_SYNCER,
PIPE_RUNTIME_HEARTBEAT, PIPE_RUNTIME_HEARTBEAT,
PIPE_RUNTIME_PROCEDURE_SUBMITTER, PIPE_RUNTIME_PROCEDURE_SUBMITTER,
PIPE_RUNTIME_CRON_EVENT_INJECTOR,
PIPE_ASYNC_CONNECTOR_CLIENT_POOL, PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
PIPE_WAL_RESOURCE_TTL_CHECKER, PIPE_WAL_RESOURCE_TTL_CHECKER,
PIPE_RECEIVER_AIR_GAP_AGENT, PIPE_RECEIVER_AIR_GAP_AGENT,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册