diff --git a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
index 4f7373e1c2a2e84bf7de5fb53eba71367bc259e5..9194c5962308e3ebd0cd125e62087d478402a8b7 100644
--- a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
+++ b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
@@ -163,23 +163,6 @@
INFO
-
- ${IOTDB_HOME}/logs/log_datanode_sync.log
-
- ${IOTDB_HOME}/logs/log-datanode-sync-%d{yyyyMMdd}.%i.log.gz
- 10MB
- 168
- 512MB
-
- true
-
- %d [%t] %-5p %C{25}:%L - %m %n
- utf-8
-
-
- INFO
-
-
${IOTDB_HOME}/logs/log_datanode_audit.log
@@ -265,6 +248,23 @@
INFO
+
+ ${IOTDB_HOME}/logs/log_datanode_pipe.log
+
+ ${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz
+ 10MB
+ 168
+ 512MB
+
+ true
+
+ %d [%t] %-5p %C{25}:%L - %m %n
+ utf-8
+
+
+ INFO
+
+
@@ -280,9 +280,6 @@
-
-
-
@@ -299,4 +296,7 @@
+
+
+
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 37a0301b941a55c2757279f7b7a0049095d5842f..b0099f687b082e68595b6fc1d9b0b86d14852d05 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -558,7 +558,8 @@ public class IoTDBConfig {
private long allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 8 / 10 / 20;
/** Memory allocated proportion for wal pipe cache */
- private long allocateMemoryForWALPipeCache = allocateMemoryForConsensus / 10;
+ private long allocateMemoryForWALPipeCache =
+ Math.min(allocateMemoryForConsensus / 2, 3 * getWalFileSizeThresholdInByte());
/**
* If true, we will estimate each query's possible memory footprint before executing it and deny
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 2a5b8e0a47af3cab94a138bd6a8ba45ab2403a4e..a55bbbe47164f8516a004ae1d5b23bc7322a0875 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -29,8 +29,8 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls";
- public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY = "connector.batch.enabled";
- public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE = true;
+ public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable";
+ public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true;
public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = "connector.batch.max-delay-seconds";
public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index 8de5706a95882b415fe5f54628cd71e8c33d22d4..1ca5db5ba15190221ea60c4193787e9c4894dc5f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -35,8 +35,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE;
-import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
@@ -87,8 +87,7 @@ public abstract class IoTDBConnector implements PipeConnector {
isTabletBatchModeEnabled =
parameters.getBooleanOrDefault(
- CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY,
- CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE);
+ CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 051e6d0437c7dc6ff41a5817adc0fc99c3957ae8..d1d1adf1d4eb05799ef739222389e6efbf0f2af7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -89,6 +89,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
});
}
}
+ // check again after register close listener in case TsFile is closed during the process
+ isClosed.set(resource.isClosed());
}
public void waitForTsFileClose() throws InterruptedException {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
index cb94b725d1059e6a87a02ca623137d12b214f995..cfa57760985b616264b085f39ae23b12e3a52d7e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
@@ -29,8 +29,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class TsFileEpochManager {
@@ -39,7 +39,7 @@ public class TsFileEpochManager {
private static final String[] EMPTY_MEASUREMENT_ARRAY = new String[0];
- private final Map filePath2Epoch = new HashMap<>();
+ private final ConcurrentMap filePath2Epoch = new ConcurrentHashMap<>();
public PipeRealtimeEvent bindPipeTsFileInsertionEvent(
PipeTsFileInsertionEvent event, TsFileResource resource) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 45e755debb38d54f1f6619a5ca54a64b04f87b38..3cd09e73e9234a749550164344021fb7a135529e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -84,6 +84,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -161,7 +162,7 @@ public class TsFileProcessor {
"{}: {} get flushQueryLock write lock released";
/** close file listener. */
- private final List closeFileListeners = new ArrayList<>();
+ private final List closeFileListeners = new CopyOnWriteArrayList<>();
/** flush file listener. */
private final List flushListeners = new ArrayList<>();
diff --git a/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml b/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml
index fc779b7716b108ffeb6f4f66d18864d2b180de2f..0317acebcd519467d1287b857a48ebd5b6af68aa 100644
--- a/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml
+++ b/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml
@@ -163,23 +163,6 @@
INFO
-
- ${IOTDB_HOME}/logs/log_sync.log
-
- ${IOTDB_HOME}/logs/log-sync-%d{yyyyMMdd}.%i.log.gz
- 10MB
- 168
- 512MB
-
- true
-
- %d [%t] %-5p %C{25}:%L - %m %n
- utf-8
-
-
- INFO
-
-
${IOTDB_HOME}/logs/log_audit.log
@@ -265,6 +248,23 @@
INFO
+
+ ${IOTDB_HOME}/logs/log_datanode_pipe.log
+
+ ${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz
+ 10MB
+ 168
+ 512MB
+
+ true
+
+ %d [%t] %-5p %C{25}:%L - %m %n
+ utf-8
+
+
+ INFO
+
+
@@ -280,9 +280,6 @@
-
-
-
@@ -299,4 +296,7 @@
+
+
+
diff --git a/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml b/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml
index fc779b7716b108ffeb6f4f66d18864d2b180de2f..0317acebcd519467d1287b857a48ebd5b6af68aa 100644
--- a/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml
+++ b/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml
@@ -163,23 +163,6 @@
INFO
-
- ${IOTDB_HOME}/logs/log_sync.log
-
- ${IOTDB_HOME}/logs/log-sync-%d{yyyyMMdd}.%i.log.gz
- 10MB
- 168
- 512MB
-
- true
-
- %d [%t] %-5p %C{25}:%L - %m %n
- utf-8
-
-
- INFO
-
-
${IOTDB_HOME}/logs/log_audit.log
@@ -265,6 +248,23 @@
INFO
+
+ ${IOTDB_HOME}/logs/log_datanode_pipe.log
+
+ ${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz
+ 10MB
+ 168
+ 512MB
+
+ true
+
+ %d [%t] %-5p %C{25}:%L - %m %n
+ utf-8
+
+
+ INFO
+
+
@@ -280,9 +280,6 @@
-
-
-
@@ -299,4 +296,7 @@
+
+
+
diff --git a/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml b/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml
index fc779b7716b108ffeb6f4f66d18864d2b180de2f..0317acebcd519467d1287b857a48ebd5b6af68aa 100644
--- a/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml
+++ b/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml
@@ -163,23 +163,6 @@
INFO
-
- ${IOTDB_HOME}/logs/log_sync.log
-
- ${IOTDB_HOME}/logs/log-sync-%d{yyyyMMdd}.%i.log.gz
- 10MB
- 168
- 512MB
-
- true
-
- %d [%t] %-5p %C{25}:%L - %m %n
- utf-8
-
-
- INFO
-
-
${IOTDB_HOME}/logs/log_audit.log
@@ -265,6 +248,23 @@
INFO
+
+ ${IOTDB_HOME}/logs/log_datanode_pipe.log
+
+ ${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz
+ 10MB
+ 168
+ 512MB
+
+ true
+
+ %d [%t] %-5p %C{25}:%L - %m %n
+ utf-8
+
+
+ INFO
+
+
@@ -280,9 +280,6 @@
-
-
-
@@ -299,4 +296,7 @@
+
+
+