From 1357cede80d67a8ecc0d8db75f629efd00d1f69d Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Tue, 29 Aug 2023 19:56:57 +0800 Subject: [PATCH] Pipe: fix some minor concurrent issues & collect pipe's logs into a separate file (#10990) * Pipe: fix some minor concurrent issues * Logback: seperate pipe's logs * enabled -> enable * Update IoTDBConfig.java --- .../resources/conf/logback-datanode.xml | 40 +++++++++---------- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 3 +- .../constant/PipeConnectorConstant.java | 4 +- .../connector/protocol/IoTDBConnector.java | 7 ++-- .../tsfile/PipeTsFileInsertionEvent.java | 2 + .../realtime/epoch/TsFileEpochManager.java | 6 +-- .../dataregion/memtable/TsFileProcessor.java | 3 +- .../test/resources/datanode1conf/logback.xml | 40 +++++++++---------- .../test/resources/datanode2conf/logback.xml | 40 +++++++++---------- .../test/resources/datanode3conf/logback.xml | 40 +++++++++---------- 10 files changed, 94 insertions(+), 91 deletions(-) diff --git a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml index 4f7373e1c2..9194c59623 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml +++ b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml @@ -163,23 +163,6 @@ INFO - - ${IOTDB_HOME}/logs/log_datanode_sync.log - - ${IOTDB_HOME}/logs/log-datanode-sync-%d{yyyyMMdd}.%i.log.gz - 10MB - 168 - 512MB - - true - - %d [%t] %-5p %C{25}:%L - %m %n - utf-8 - - - INFO - - ${IOTDB_HOME}/logs/log_datanode_audit.log @@ -265,6 +248,23 @@ INFO + + ${IOTDB_HOME}/logs/log_datanode_pipe.log + + ${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz + 10MB + 168 + 512MB + + true + + %d [%t] %-5p %C{25}:%L - %m %n + utf-8 + + + INFO + + @@ -280,9 +280,6 @@ - - - @@ -299,4 +296,7 @@ + + + diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 37a0301b94..b0099f687b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -558,7 +558,8 @@ public class IoTDBConfig { private long allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 8 / 10 / 20; /** Memory allocated proportion for wal pipe cache */ - private long allocateMemoryForWALPipeCache = allocateMemoryForConsensus / 10; + private long allocateMemoryForWALPipeCache = + Math.min(allocateMemoryForConsensus / 2, 3 * getWalFileSizeThresholdInByte()); /** * If true, we will estimate each query's possible memory footprint before executing it and deny diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java index 2a5b8e0a47..a55bbbe471 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java @@ -29,8 +29,8 @@ public class PipeConnectorConstant { public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port"; public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls"; - public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY = "connector.batch.enabled"; - public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE = true; + public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable"; + public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true; public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = "connector.batch.max-delay-seconds"; public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java index 8de5706a95..1ca5db5ba1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java @@ -35,8 +35,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY; @@ -87,8 +87,7 @@ public abstract class IoTDBConnector implements PipeConnector { isTabletBatchModeEnabled = parameters.getBooleanOrDefault( - CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY, - CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE); + CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 051e6d0437..d1d1adf1d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -89,6 +89,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns }); } } + // check again after register close listener in case TsFile is closed during the process + isClosed.set(resource.isClosed()); } public void waitForTsFileClose() throws InterruptedException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java index cb94b725d1..cfa5776098 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java @@ -29,8 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; public class TsFileEpochManager { @@ -39,7 +39,7 @@ public class TsFileEpochManager { private static final String[] EMPTY_MEASUREMENT_ARRAY = new String[0]; - private final Map filePath2Epoch = new HashMap<>(); + private final ConcurrentMap filePath2Epoch = new ConcurrentHashMap<>(); public PipeRealtimeEvent bindPipeTsFileInsertionEvent( PipeTsFileInsertionEvent event, TsFileResource resource) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 45e755debb..3cd09e73e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -84,6 +84,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -161,7 +162,7 @@ public class TsFileProcessor { "{}: {} get flushQueryLock write lock released"; /** close file listener. */ - private final List closeFileListeners = new ArrayList<>(); + private final List closeFileListeners = new CopyOnWriteArrayList<>(); /** flush file listener. */ private final List flushListeners = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml b/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml index fc779b7716..0317acebcd 100644 --- a/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml +++ b/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml @@ -163,23 +163,6 @@ INFO - - ${IOTDB_HOME}/logs/log_sync.log - - ${IOTDB_HOME}/logs/log-sync-%d{yyyyMMdd}.%i.log.gz - 10MB - 168 - 512MB - - true - - %d [%t] %-5p %C{25}:%L - %m %n - utf-8 - - - INFO - - ${IOTDB_HOME}/logs/log_audit.log @@ -265,6 +248,23 @@ INFO + + ${IOTDB_HOME}/logs/log_datanode_pipe.log + + ${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz + 10MB + 168 + 512MB + + true + + %d [%t] %-5p %C{25}:%L - %m %n + utf-8 + + + INFO + + @@ -280,9 +280,6 @@ - - - @@ -299,4 +296,7 @@ + + + diff --git a/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml b/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml index fc779b7716..0317acebcd 100644 --- a/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml +++ b/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml @@ -163,23 +163,6 @@ INFO - - ${IOTDB_HOME}/logs/log_sync.log - - ${IOTDB_HOME}/logs/log-sync-%d{yyyyMMdd}.%i.log.gz - 10MB - 168 - 512MB - - true - - %d [%t] %-5p %C{25}:%L - %m %n - utf-8 - - - INFO - - ${IOTDB_HOME}/logs/log_audit.log @@ -265,6 +248,23 @@ INFO + + ${IOTDB_HOME}/logs/log_datanode_pipe.log + + ${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz + 10MB + 168 + 512MB + + true + + %d [%t] %-5p %C{25}:%L - %m %n + utf-8 + + + INFO + + @@ -280,9 +280,6 @@ - - - @@ -299,4 +296,7 @@ + + + diff --git a/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml b/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml index fc779b7716..0317acebcd 100644 --- a/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml +++ b/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml @@ -163,23 +163,6 @@ INFO - - ${IOTDB_HOME}/logs/log_sync.log - - ${IOTDB_HOME}/logs/log-sync-%d{yyyyMMdd}.%i.log.gz - 10MB - 168 - 512MB - - true - - %d [%t] %-5p %C{25}:%L - %m %n - utf-8 - - - INFO - - ${IOTDB_HOME}/logs/log_audit.log @@ -265,6 +248,23 @@ INFO + + ${IOTDB_HOME}/logs/log_datanode_pipe.log + + ${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz + 10MB + 168 + 512MB + + true + + %d [%t] %-5p %C{25}:%L - %m %n + utf-8 + + + INFO + + @@ -280,9 +280,6 @@ - - - @@ -299,4 +296,7 @@ + + + -- GitLab