未验证 提交 1357cede 编写于 作者: S Steve Yurong Su 提交者: GitHub

Pipe: fix some minor concurrent issues & collect pipe's logs into a separate file (#10990)

* Pipe: fix some minor concurrent issues

* Logback: seperate pipe's logs

* enabled -> enable

* Update IoTDBConfig.java
上级 d1197455
......@@ -163,23 +163,6 @@
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="SYNC">
<file>${IOTDB_HOME}/logs/log_datanode_sync.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${IOTDB_HOME}/logs/log-datanode-sync-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>168</maxHistory>
<totalSizeCap>512MB</totalSizeCap>
</rollingPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="AUDIT">
<file>${IOTDB_HOME}/logs/log_datanode_audit.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
......@@ -265,6 +248,23 @@
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="PIPE">
<file>${IOTDB_HOME}/logs/log_datanode_pipe.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>168</maxHistory>
<totalSizeCap>512MB</totalSizeCap>
</rollingPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<root level="info">
<appender-ref ref="FILETRACE"/>
<appender-ref ref="FILEDEBUG"/>
......@@ -280,9 +280,6 @@
<logger level="info" name="org.apache.iotdb.db.cost.statistic">
<appender-ref ref="FILE_COST_MEASURE"/>
</logger>
<logger level="info" name="org.apache.iotdb.db.sync">
<appender-ref ref="SYNC"/>
</logger>
<logger level="info" name="IoTDB_AUDIT_LOGGER">
<appender-ref ref="AUDIT"/>
</logger>
......@@ -299,4 +296,7 @@
<logger level="info" name="COMPACTION">
<appender-ref ref="COMPACTION"/>
</logger>
<logger level="info" name="org.apache.iotdb.db.pipe">
<appender-ref ref="PIPE"/>
</logger>
</configuration>
......@@ -558,7 +558,8 @@ public class IoTDBConfig {
private long allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 8 / 10 / 20;
/** Memory allocated proportion for wal pipe cache */
private long allocateMemoryForWALPipeCache = allocateMemoryForConsensus / 10;
private long allocateMemoryForWALPipeCache =
Math.min(allocateMemoryForConsensus / 2, 3 * getWalFileSizeThresholdInByte());
/**
* If true, we will estimate each query's possible memory footprint before executing it and deny
......
......@@ -29,8 +29,8 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls";
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY = "connector.batch.enabled";
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE = true;
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable";
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true;
public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = "connector.batch.max-delay-seconds";
public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1;
......
......@@ -35,8 +35,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
......@@ -87,8 +87,7 @@ public abstract class IoTDBConnector implements PipeConnector {
isTabletBatchModeEnabled =
parameters.getBooleanOrDefault(
CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY,
CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE);
CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled);
}
}
......@@ -89,6 +89,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
});
}
}
// check again after register close listener in case TsFile is closed during the process
isClosed.set(resource.isClosed());
}
public void waitForTsFileClose() throws InterruptedException {
......
......@@ -29,8 +29,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class TsFileEpochManager {
......@@ -39,7 +39,7 @@ public class TsFileEpochManager {
private static final String[] EMPTY_MEASUREMENT_ARRAY = new String[0];
private final Map<String, TsFileEpoch> filePath2Epoch = new HashMap<>();
private final ConcurrentMap<String, TsFileEpoch> filePath2Epoch = new ConcurrentHashMap<>();
public PipeRealtimeEvent bindPipeTsFileInsertionEvent(
PipeTsFileInsertionEvent event, TsFileResource resource) {
......
......@@ -84,6 +84,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
......@@ -161,7 +162,7 @@ public class TsFileProcessor {
"{}: {} get flushQueryLock write lock released";
/** close file listener. */
private final List<CloseFileListener> closeFileListeners = new ArrayList<>();
private final List<CloseFileListener> closeFileListeners = new CopyOnWriteArrayList<>();
/** flush file listener. */
private final List<FlushListener> flushListeners = new ArrayList<>();
......
......@@ -163,23 +163,6 @@
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="SYNC">
<file>${IOTDB_HOME}/logs/log_sync.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${IOTDB_HOME}/logs/log-sync-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>168</maxHistory>
<totalSizeCap>512MB</totalSizeCap>
</rollingPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="AUDIT">
<file>${IOTDB_HOME}/logs/log_audit.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
......@@ -265,6 +248,23 @@
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="PIPE">
<file>${IOTDB_HOME}/logs/log_datanode_pipe.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>168</maxHistory>
<totalSizeCap>512MB</totalSizeCap>
</rollingPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<root level="info">
<appender-ref ref="FILETRACE"/>
<appender-ref ref="FILEDEBUG"/>
......@@ -280,9 +280,6 @@
<logger level="info" name="org.apache.iotdb.db.cost.statistic">
<appender-ref ref="FILE_COST_MEASURE"/>
</logger>
<logger level="info" name="org.apache.iotdb.db.sync">
<appender-ref ref="SYNC"/>
</logger>
<logger level="info" name="IoTDB_AUDIT_LOGGER">
<appender-ref ref="AUDIT"/>
</logger>
......@@ -299,4 +296,7 @@
<logger level="info" name="COMPACTION">
<appender-ref ref="COMPACTION"/>
</logger>
<logger level="info" name="org.apache.iotdb.db.pipe">
<appender-ref ref="PIPE"/>
</logger>
</configuration>
......@@ -163,23 +163,6 @@
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="SYNC">
<file>${IOTDB_HOME}/logs/log_sync.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${IOTDB_HOME}/logs/log-sync-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>168</maxHistory>
<totalSizeCap>512MB</totalSizeCap>
</rollingPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="AUDIT">
<file>${IOTDB_HOME}/logs/log_audit.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
......@@ -265,6 +248,23 @@
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="PIPE">
<file>${IOTDB_HOME}/logs/log_datanode_pipe.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>168</maxHistory>
<totalSizeCap>512MB</totalSizeCap>
</rollingPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<root level="info">
<appender-ref ref="FILETRACE"/>
<appender-ref ref="FILEDEBUG"/>
......@@ -280,9 +280,6 @@
<logger level="info" name="org.apache.iotdb.db.cost.statistic">
<appender-ref ref="FILE_COST_MEASURE"/>
</logger>
<logger level="info" name="org.apache.iotdb.db.sync">
<appender-ref ref="SYNC"/>
</logger>
<logger level="info" name="IoTDB_AUDIT_LOGGER">
<appender-ref ref="AUDIT"/>
</logger>
......@@ -299,4 +296,7 @@
<logger level="info" name="COMPACTION">
<appender-ref ref="COMPACTION"/>
</logger>
<logger level="info" name="org.apache.iotdb.db.pipe">
<appender-ref ref="PIPE"/>
</logger>
</configuration>
......@@ -163,23 +163,6 @@
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="SYNC">
<file>${IOTDB_HOME}/logs/log_sync.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${IOTDB_HOME}/logs/log-sync-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>168</maxHistory>
<totalSizeCap>512MB</totalSizeCap>
</rollingPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="AUDIT">
<file>${IOTDB_HOME}/logs/log_audit.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
......@@ -265,6 +248,23 @@
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="PIPE">
<file>${IOTDB_HOME}/logs/log_datanode_pipe.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${IOTDB_HOME}/logs/log-datanode-pipe-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>168</maxHistory>
<totalSizeCap>512MB</totalSizeCap>
</rollingPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<root level="info">
<appender-ref ref="FILETRACE"/>
<appender-ref ref="FILEDEBUG"/>
......@@ -280,9 +280,6 @@
<logger level="info" name="org.apache.iotdb.db.cost.statistic">
<appender-ref ref="FILE_COST_MEASURE"/>
</logger>
<logger level="info" name="org.apache.iotdb.db.sync">
<appender-ref ref="SYNC"/>
</logger>
<logger level="info" name="IoTDB_AUDIT_LOGGER">
<appender-ref ref="AUDIT"/>
</logger>
......@@ -299,4 +296,7 @@
<logger level="info" name="COMPACTION">
<appender-ref ref="COMPACTION"/>
</logger>
<logger level="info" name="org.apache.iotdb.db.pipe">
<appender-ref ref="PIPE"/>
</logger>
</configuration>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册