未验证 提交 1a3568ff 编写于 作者: 马子坤 提交者: GitHub

Pipe: Improve performance for 10000+ pipes (#11021)

Improvements:

* Cache the File objects to reduce the size of TsFileInsertionEvents.

* Limit the frequency of flushing TsFiles when cocurrently creating historical extractors.

* If cluster heartbeat doesn't need PipeMetaList, do not get into PipeTaskAgent. This will prevent datanode from turning UNKNOWN when many pipes are being created.

---------
Co-authored-by: NSteve Yurong Su <rong@apache.org>
上级 7d9ac670
......@@ -36,7 +36,6 @@ import org.apache.iotdb.db.pipe.task.PipeBuilder;
import org.apache.iotdb.db.pipe.task.PipeTask;
import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
import org.apache.iotdb.db.pipe.task.PipeTaskManager;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
......@@ -777,13 +776,7 @@ public class PipeTaskAgent {
///////////////////////// Heartbeat /////////////////////////
public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp)
throws TException {
// If the pipe heartbeat is separated from the cluster heartbeat, then the lock doesn't
// need to be acquired
if (!req.isNeedPipeMetaList()) {
return;
}
public synchronized void collectPipeMetaList(THeartbeatResp resp) throws TException {
// Try the lock instead of directly acquire it to prevent the block of the cluster heartbeat
// 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class BaseNodeCache in ConfigNode
if (!tryReadLockWithTimeOut(10)) {
......
......@@ -42,6 +42,8 @@ import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
......@@ -56,6 +58,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class);
private static final Map<Integer, Long> DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>();
private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
private PipeTaskMeta pipeTaskMeta;
private ProgressIndex startIndex;
......@@ -85,6 +90,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
startIndex = environment.getPipeTaskMeta().getProgressIndex();
dataRegionId = environment.getRegionId();
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L);
}
pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, EXTRACTOR_PATTERN_DEFAULT_VALUE);
......@@ -133,7 +141,14 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
// If we don't invoke flushDataRegionAllTsFiles() in the realtime only mode, the data generated
// between the creation time of the pipe the time when the pipe starts will be lost.
if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
flushDataRegionAllTsFiles();
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
final long lastFlushedByPipeTime =
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
flushDataRegionAllTsFiles();
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis());
}
}
}
}
......@@ -163,7 +178,14 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
dataRegion.writeLock("Pipe: start to extract historical TsFile");
try {
dataRegion.syncCloseAllWorkingTsFileProcessors();
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
final long lastFlushedByPipeTime =
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
dataRegion.syncCloseAllWorkingTsFileProcessors();
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis());
}
}
final TsFileManager tsFileManager = dataRegion.getTsFileManager();
tsFileManager.readLock();
......@@ -174,7 +196,10 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
tsFileManager.getTsFileList(true).stream()
.filter(
resource ->
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
// Some resource may be not closed due to the control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
resource.isClosed()
&& !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
.map(
......@@ -193,7 +218,10 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
tsFileManager.getTsFileList(false).stream()
.filter(
resource ->
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
// Some resource may be not closed due to the control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
resource.isClosed()
&& !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
.map(
......
......@@ -34,6 +34,9 @@ public class PipeTsFileResourceManager {
private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new HashMap<>();
/** Cache the File objects here to avoid redundancy */
private final Map<String, File> fileNameToFileMap = new HashMap<>();
/**
* given a file, create a hardlink or copy it to pipe dir, maintain a reference count for the
* hardlink or copied file, and return the hardlink or copied file.
......@@ -63,13 +66,14 @@ public class PipeTsFileResourceManager {
// file in pipe dir. if so, increase reference count and return it
final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
return hardlinkOrCopiedFile;
return fileNameToFileMap.get(hardlinkOrCopiedFile.getPath());
}
// if the file is not a hardlink or copied file, and there is no related hardlink or copied
// file in pipe dir, create a hardlink or copy it to pipe dir, maintain a reference count for
// the hardlink or copied file, and return the hardlink or copied file.
hardlinkOrCopiedFileToReferenceMap.put(hardlinkOrCopiedFile.getPath(), 1);
fileNameToFileMap.put(hardlinkOrCopiedFile.getPath(), hardlinkOrCopiedFile);
// if the file is a tsfile, create a hardlink in pipe dir and return it.
// otherwise, copy the file (.mod or .resource) to pipe dir and return it.
return isTsFile
......@@ -161,6 +165,7 @@ public class PipeTsFileResourceManager {
if (updatedReference != null && updatedReference == 0) {
Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
hardlinkOrCopiedFileToReferenceMap.remove(hardlinkOrCopiedFile.getPath());
fileNameToFileMap.remove(hardlinkOrCopiedFile.getPath());
}
}
......
......@@ -1206,7 +1206,9 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
SchemaEngine.getInstance().updateAndFillSchemaCountMap(req.schemaQuotaCount, resp);
// Update pipe meta if necessary
PipeAgent.task().collectPipeMetaList(req, resp);
if (req.isNeedPipeMetaList()) {
PipeAgent.task().collectPipeMetaList(resp);
}
return resp;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册