提交 4fb28184 编写于 作者: Q qiaojialin

fix conflict after merging master

......@@ -25,6 +25,7 @@ import java.sql.SQLException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.jdbc.Constant;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.web.grafana.bean.TimeValues;
import org.apache.iotdb.web.grafana.dao.BasicDao;
......@@ -56,7 +57,7 @@ public class BasicDaoImpl implements BasicDao {
ConnectionCallback<Object> connectionCallback = new ConnectionCallback<Object>() {
public Object doInConnection(Connection connection) throws SQLException {
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet resultSet = databaseMetaData.getColumns(null, null, "root.*", null);
ResultSet resultSet = databaseMetaData.getColumns(Constant.CATALOG_TIMESERIES, "root.*", "root.*", null);
logger.info("Start to get timeseries");
List<String> columnsName = new ArrayList<>();
while (resultSet.next()) {
......
......@@ -47,7 +47,11 @@ goto :eof
REM -----------------------------------------------------------------------------
:okClasspath
"%JAVA_HOME%\bin\java" %JAVA_OPTS% -cp "%CLASSPATH%" %MAIN_CLASS% %*
set PARAMETERS=%*
if "%PARAMETERS%" == "" set PARAMETERS=-h 127.0.0.1 -p 6667 -u root -pw root
"%JAVA_HOME%\bin\java" %JAVA_OPTS% -cp "%CLASSPATH%" %MAIN_CLASS% %PARAMETERS%
goto finally
......
......@@ -48,6 +48,13 @@ else
JAVA=java
fi
exec "$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
PARAMETERS="$@"
if [ $# -eq 0 ]
then
PARAMETERS="-h 127.0.0.1 -p 6667 -u root -pw root"
fi
exec "$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" $PARAMETERS
exit $?
......@@ -25,6 +25,18 @@ rpc_address=0.0.0.0
rpc_port=6667
####################
### Dynamic Parameter Adapter Configuration
####################
# Is dynamic parameter adapter enable. It's recommended for users to enable parameter adapter.
# The adapter can dynamically adjust the following two parameters according to the memory load of the system:
# 1. tsfile_size_threshold which is introduced below.
# 2. memtable_size_threshold which is introduced below.
# By dynamically adjusting these two parameters, the probability of system memory explosion is greatly reduced.
# When this parameter is set true, it will refuse to create time series or add storage groups under high system load.
enable_parameter_adapter=true
####################
### Write Ahead Log Configuration
####################
......@@ -100,6 +112,10 @@ force_wal_period_in_ms=10
### Memory Control Configuration
####################
# Memory Allocation Ratio: Write, Read, and Free Memory.
# The parameter form is a:b:c, where a, b and c are integers. for example: 1:1:1 , 6:3:1
write_read_free_memory_proportion=6:3:1
# The maximum concurrent thread number for merging
# Increase this value, it will increase IO and CPU consumption
# Decrease this value, when there is much unsequence data, it will increase disk usage, which will reduce read speed
......@@ -111,11 +127,9 @@ fetch_size=10000
# Size of log buffer in each log node(in byte).
# If WAL is enabled and the size of a insert plan is smaller than this parameter, then the insert plan will be rejected by WAL
# If it sets a value smaller than 0, use the default value 16777216
wal_buffer_size=16777216
# total number of memtables in memtable pool, should be set at least as twice of the number of storage groups.
memtable_number=20
# time zone of server side
# default value is +08:00
# eg. +08:00, -01:00
......@@ -124,6 +138,9 @@ time_zone=+08:00
# When a TsFile's file size (in byte) exceeds this, the TsFile is forced closed. The default threshold is 512 MB.
tsfile_size_threshold=536870912
# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 128 MB.
memtable_size_threshold=134217728
# How many threads can concurrently flush. When <= 0, use CPU core number.
concurrent_flush_thread=0
......
......@@ -34,6 +34,8 @@ import org.apache.iotdb.db.auth.entity.PathPrivilege;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.utils.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class store each role in a separate sequential file. Role file schema : Int32 role name
......@@ -45,7 +47,7 @@ import org.apache.iotdb.db.utils.IOUtils;
* privilege[n][2] ... Int32 privilege[n][kn]
*/
public class LocalFileRoleAccessor implements IRoleAccessor {
private static final Logger logger = LoggerFactory.getLogger(LocalFileRoleAccessor.class);
private static final String TEMP_SUFFIX = ".temp";
private static final String STRING_ENCODING = "utf-8";
......@@ -161,6 +163,10 @@ public class LocalFileRoleAccessor implements IRoleAccessor {
@Override
public void reset() {
new File(roleDirPath).mkdirs();
if (new File(roleDirPath).mkdirs()) {
logger.info("role info dir {} is created", roleDirPath);
} else if (!new File(roleDirPath).exists()) {
logger.error("role info dir {} can not be created", roleDirPath);
}
}
}
......@@ -26,6 +26,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
......@@ -203,6 +204,10 @@ public class LocalFileUserAccessor implements IUserAccessor {
@Override
public void reset() {
new File(userDirPath).mkdirs();
if (new File(userDirPath).mkdirs()) {
logger.info("user info dir {} is created", userDirPath);
} else if (!new File(userDirPath).exists()) {
logger.error("user info dir {} can not be created", userDirPath);
}
}
}
......@@ -37,11 +37,27 @@ public class IoTDBConfig {
private static final String DEFAULT_MULTI_DIR_STRATEGY = "MaxDiskUsableSpaceFirstStrategy";
private String rpcAddress = "0.0.0.0";
/**
* Port which the JDBC server listens to.
*/
private int rpcPort = 6667;
/**
* Memory allocated for the read process
*/
private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 6 / 10;
/**
* Memory allocated for the write process
*/
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
/**
* Is dynamic parameter adapter enable.
*/
private boolean enableParameterAdapter = true;
/**
* Is the write ahead log enable.
*/
......@@ -97,8 +113,10 @@ public class IoTDBConfig {
*/
private String indexFileDir = "data/index";
private int memtableNumber = 20;
/**
* Maximum MemTable number in MemTable pool.
*/
private int maxMemtableNumber = 20;
/**
* The maximum concurrent thread number for merging. When the value <=0 or > CPU core number, use
......@@ -123,6 +141,11 @@ public class IoTDBConfig {
*/
private long tsFileSizeThreshold = 512 * 1024 * 1024L;
/**
* When a memTable's size (in byte) exceeds this, the memtable is flushed to disk.
*/
private long memtableSizeThreshold = 128 * 1024 * 1024L;
/**
* The statMonitor writes statistics info into IoTDB every backLoopPeriodSec secs. The default
* value is 5s.
......@@ -356,12 +379,12 @@ public class IoTDBConfig {
this.fetchSize = fetchSize;
}
public int getMemtableNumber() {
return memtableNumber;
public int getMaxMemtableNumber() {
return maxMemtableNumber;
}
void setMemtableNumber(int memtableNumber) {
this.memtableNumber = memtableNumber;
public void setMaxMemtableNumber(int maxMemtableNumber) {
this.maxMemtableNumber = maxMemtableNumber;
}
public int getConcurrentFlushThread() {
......@@ -380,7 +403,7 @@ public class IoTDBConfig {
return tsFileSizeThreshold;
}
void setTsFileSizeThreshold(long tsFileSizeThreshold) {
public void setTsFileSizeThreshold(long tsFileSizeThreshold) {
this.tsFileSizeThreshold = tsFileSizeThreshold;
}
......@@ -512,6 +535,30 @@ public class IoTDBConfig {
this.chunkBufferPoolEnable = chunkBufferPoolEnable;
}
public boolean isEnableParameterAdapter() {
return enableParameterAdapter;
}
public void setEnableParameterAdapter(boolean enableParameterAdapter) {
this.enableParameterAdapter = enableParameterAdapter;
}
public long getAllocateMemoryForWrite() {
return allocateMemoryForWrite;
}
public void setAllocateMemoryForWrite(long allocateMemoryForWrite) {
this.allocateMemoryForWrite = allocateMemoryForWrite;
}
public long getAllocateMemoryForRead() {
return allocateMemoryForRead;
}
public void setAllocateMemoryForRead(long allocateMemoryForRead) {
this.allocateMemoryForRead = allocateMemoryForRead;
}
public boolean isEnablePerformanceStat() {
return enablePerformanceStat;
}
......@@ -535,4 +582,12 @@ public class IoTDBConfig {
public void setPerformance_stat_memory_in_kb(int performance_stat_memory_in_kb) {
this.performance_stat_memory_in_kb = performance_stat_memory_in_kb;
}
public long getMemtableSizeThreshold() {
return memtableSizeThreshold;
}
public void setMemtableSizeThreshold(long memtableSizeThreshold) {
this.memtableSizeThreshold = memtableSizeThreshold;
}
}
......@@ -60,4 +60,5 @@ public class IoTDBConstant {
public static final String ROLE = "Role";
public static final String USER = "User";
public static final String PRIVILEGE = "Privilege";
}
......@@ -123,6 +123,12 @@ public class IoTDBDescriptor {
conf.setRpcPort(Integer.parseInt(properties.getProperty("rpc_port",
Integer.toString(conf.getRpcPort()))));
conf.setEnableParameterAdapter(
Boolean.parseBoolean(properties.getProperty("enable_parameter_adapter",
Boolean.toString(conf.isEnableParameterAdapter()))));
initMemoryAllocate(properties);
conf.setEnableWal(Boolean.parseBoolean(properties.getProperty("enable_wal",
Boolean.toString(conf.isEnableWal()))));
......@@ -135,10 +141,6 @@ public class IoTDBDescriptor {
conf.setWalFolder(properties.getProperty("wal_dir", conf.getWalFolder()));
conf.setMemtableNumber(Integer
.parseInt(properties.getProperty("memtable_number",
Integer.toString(conf.getMemtableNumber()))));
conf.setFlushWalThreshold(Integer
.parseInt(properties.getProperty("flush_wal_threshold",
Integer.toString(conf.getFlushWalThreshold()))));
......@@ -146,8 +148,12 @@ public class IoTDBDescriptor {
conf.setForceWalPeriodInMs(Long
.parseLong(properties.getProperty("force_wal_period_in_ms",
Long.toString(conf.getForceWalPeriodInMs()))));
conf.setWalBufferSize(Integer.parseInt(properties.getProperty("wal_buffer_size",
Integer.toString(conf.getWalBufferSize()))));
int walBufferSize = Integer.parseInt(properties.getProperty("wal_buffer_size",
Integer.toString(conf.getWalBufferSize())));
if (walBufferSize > 0) {
conf.setWalBufferSize(walBufferSize);
}
conf.setMultiDirStrategyClassName(properties.getProperty("mult_dir_strategy",
conf.getMultiDirStrategyClassName()));
......@@ -163,16 +169,28 @@ public class IoTDBDescriptor {
conf.setFetchSize(Integer.parseInt(properties.getProperty("fetch_size",
Integer.toString(conf.getFetchSize()))));
conf.setTsFileSizeThreshold(Long.parseLong(properties
long tsfileSizeThreshold = Long.parseLong(properties
.getProperty("tsfile_size_threshold",
Long.toString(conf.getTsFileSizeThreshold())).trim()));
Long.toString(conf.getTsFileSizeThreshold())).trim());
if (tsfileSizeThreshold > 0) {
conf.setTsFileSizeThreshold(tsfileSizeThreshold);
}
long memTableSizeThreshold = Long.parseLong(properties
.getProperty("memtable_size_threshold",
Long.toString(conf.getMemtableSizeThreshold())).trim());
if (memTableSizeThreshold > 0) {
conf.setMemtableSizeThreshold(memTableSizeThreshold);
}
conf.setSyncEnable(Boolean
.parseBoolean(properties.getProperty("is_sync_enable",
Boolean.toString(conf.isSyncEnable()))));
conf.setSyncServerPort(Integer
.parseInt(properties.getProperty("sync_server_port",
Integer.toString(conf.getSyncServerPort())).trim()));
conf.setUpdateHistoricalDataPossibility(Boolean.parseBoolean(
properties.getProperty("update_historical_data_possibility",
Boolean.toString(conf.isSyncEnable()))));
......@@ -181,6 +199,7 @@ public class IoTDBDescriptor {
conf.setConcurrentFlushThread(Integer
.parseInt(properties.getProperty("concurrent_flush_thread",
Integer.toString(conf.getConcurrentFlushThread()))));
if (conf.getConcurrentFlushThread() <= 0) {
conf.setConcurrentFlushThread(Runtime.getRuntime().availableProcessors());
}
......@@ -225,6 +244,22 @@ public class IoTDBDescriptor {
}
}
private void initMemoryAllocate(Properties properties) {
String memoryAllocateProportion = properties.getProperty("write_read_free_memory_proportion");
if (memoryAllocateProportion != null) {
String[] proportions = memoryAllocateProportion.split(":");
int proportionSum = 0;
for (String proportion : proportions) {
proportionSum += Integer.parseInt(proportion.trim());
}
long maxMemoryAvailable = Runtime.getRuntime().maxMemory();
conf.setAllocateMemoryForWrite(
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
conf.setAllocateMemoryForRead(
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
}
}
private static class IoTDBDescriptorHolder {
private static final IoTDBDescriptor INSTANCE = new IoTDBDescriptor();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.conf.adapter;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is used to count, compute and persist the compression ratio of tsfiles. Whenever the
* task of closing a file ends, the compression ratio of the file is calculated based on the total
* MemTable size and the total size of the tsfile on disk. {@code compressionRatioSum} records the
* sum of these compression ratios, and {@Code calcTimes} records the number of closed file tasks.
* When the compression rate of the current system is obtained, the average compression ratio is
* returned as the result, that is {@code compressionRatioSum}/{@Code calcTimes}. At the same time,
* each time the compression ratio statistics are updated, these two parameters are persisted on
* disk for system recovery.
*/
public class CompressionRatio {
private static final Logger LOGGER = LoggerFactory.getLogger(CompressionRatio.class);
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
static final String COMPRESSION_RATIO_DIR = "compression_ratio";
private static final String FILE_PREFIX = "Ratio-";
private static final String SEPARATOR = "-";
static final String RATIO_FILE_PATH_FORMAT = FILE_PREFIX + "%f" + SEPARATOR + "%d";
private static final double DEFAULT_COMPRESSION_RATIO = 2.0;
/**
* The total sum of all compression ratios.
*/
private double compressionRatioSum;
/**
* The number of compression ratios.
*/
private long calcTimes;
private File directory;
private CompressionRatio() {
directory = new File(
FilePathUtils.regularizePath(CONFIG.getSystemDir()) + COMPRESSION_RATIO_DIR);
try {
restore();
} catch (IOException e) {
LOGGER.error("Can not restore CompressionRatio", e);
}
}
/**
* Whenever the task of closing a file ends, the compression ratio of the file is calculated and
* call this method.
*
* @param currentCompressionRatio the compression ratio of the closing file.
*/
public synchronized void updateRatio(double currentCompressionRatio) throws IOException {
File oldFile = new File(directory,
String.format(RATIO_FILE_PATH_FORMAT, compressionRatioSum, calcTimes));
compressionRatioSum += currentCompressionRatio;
calcTimes++;
File newFile = new File(directory,
String.format(RATIO_FILE_PATH_FORMAT, compressionRatioSum, calcTimes));
persist(oldFile, newFile);
if (CONFIG.isEnableParameterAdapter()) {
IoTDBConfigDynamicAdapter.getInstance().tryToAdaptParameters();
}
}
/**
* Get the average compression ratio for all closed files
*/
synchronized double getRatio() {
return calcTimes == 0 ? DEFAULT_COMPRESSION_RATIO : compressionRatioSum / calcTimes;
}
private void persist(File oldFile, File newFile) throws IOException {
checkDirectoryExist();
if (!oldFile.exists()) {
newFile.createNewFile();
LOGGER.debug("Old ratio file {} doesn't exist, force create ratio file {}",
oldFile.getAbsolutePath(), newFile.getAbsolutePath());
} else {
FileUtils.moveFile(oldFile, newFile);
LOGGER.debug("Compression ratio file updated, previous: {}, current: {}",
oldFile.getAbsolutePath(), newFile.getAbsolutePath());
}
}
private void checkDirectoryExist() throws IOException {
if (!directory.exists()) {
FileUtils.forceMkdir(directory);
}
}
/**
* Restore compression ratio statistics from disk when system restart
*/
void restore() throws IOException {
checkDirectoryExist();
File[] ratioFiles = directory.listFiles((dir, name) -> name.startsWith(FILE_PREFIX));
if (ratioFiles != null && ratioFiles.length > 0) {
long maxTimes = 0;
double maxCompressionRatioSum = 0;
int maxRatioIndex = 0;
for (int i = 0; i < ratioFiles.length; i++) {
String[] splits = ratioFiles[i].getName().split("-");
long times = Long.parseLong(splits[2]);
if (times > maxTimes) {
maxTimes = times;
maxCompressionRatioSum = Double.parseDouble(splits[1]);
maxRatioIndex = i;
}
}
calcTimes = maxTimes;
compressionRatioSum = maxCompressionRatioSum;
LOGGER.debug(
"After restoring from compression ratio file, compressionRatioSum = {}, calcTimes = {}",
compressionRatioSum, calcTimes);
for (int i = 0; i < ratioFiles.length; i++) {
if (i != maxRatioIndex) {
ratioFiles[i].delete();
}
}
}
}
/**
* Only for test
*/
void reset() {
calcTimes = 0;
compressionRatioSum = 0;
}
public double getCompressionRatioSum() {
return compressionRatioSum;
}
long getCalcTimes() {
return calcTimes;
}
public static CompressionRatio getInstance() {
return CompressionRatioHolder.INSTANCE;
}
private static class CompressionRatioHolder {
private static final CompressionRatio INSTANCE = new CompressionRatio();
private CompressionRatioHolder() {
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.conf.adapter;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
public interface IDynamicAdapter {
/**
* Adjust parameters of maxMemtableNumber, memtableSizeThreshold and tsFileSizeThreshold.
* @return {@code true} if it has appropriate parameters and adjust them successfully
* {@code false} adjusting parameters failed
*/
boolean tryToAdaptParameters();
/**
* Add or delete storage groups
*
* @param diff it's positive if add new storage groups; it's negative if delete old storage
* groups.
*/
void addOrDeleteStorageGroup(int diff) throws ConfigAdjusterException;
/**
* Add or delete timeseries
*
* @param diff it's positive if create new timeseries; it's negative if delete old timeseris.
*/
void addOrDeleteTimeSeries(int diff) throws ConfigAdjusterException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.conf.adapter;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is to dynamically adjust some important parameters of the system, determine the speed
* of MenTable brushing disk, the speed of file sealing and so on, with the continuous change of
* load in the process of system operation.
*
* There are three dynamically adjustable parameters: maxMemTableNum, memtableSize and
* tsFileSizeThreshold.
*
* 1. maxMemTableNum. This parameter represents the size of the MemTable available in the MemTable
* pool, which is closely related to the number of storage groups. When adding or deleting a storage
* group, the parameter also adds or deletes two MemTables. The reason why adding or deleting two
* MemTables is that when the system is running stably, the speed of the flush operation is faster
* than that of data writing, so one is used for the Flush process and the other is used for data
* writing. Otherwise, the system should limit the speed of data writing to maintain stability.
*
* 2. memtableSize. This parameter determines the threshold value for the MemTable in memory to be
* flushed into disk. When the system load increases, the parameter should be set smaller so that
* the data in memory can be flushed into disk as soon as possible.
*
* 3. tsFileSizeThreshold. This parameter determines the speed of the tsfile seal, and then
* determines the maximum size of metadata information maintained in memory. When the system load
* increases, the parameter should be smaller to seal the file as soon as possible, release the
* memory occupied by the corresponding metadata information as soon as possible.
*
* The following equation is used to adjust the dynamic parameters of the data:
*
* Abbreviation of parameters:
* 1 memtableSize: m
* 2 maxMemTableNum: Nm
* 3 maxSeriesNumberAmongStorageGroup: Ns
* 4 tsFileSizeThreshold: Sf
* 5 CompressionRatio: c
* 6 chunk metadata size: a
* 7 static memory: b
* 8 allocate memory for write: S
*
* The equation: m * Nm + Nm * Ns * Sf / (m * a * c) + b = S
* Namely: MemTable data memory size + chunk metadata memory size + static memory size = memory size for write
*
*/
public class IoTDBConfigDynamicAdapter implements IDynamicAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConfigDynamicAdapter.class);
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
// static parameter section
/**
* When the size of the adjusted MemTable decreases more than this parameter, trigger the global
* flush operation and flush all MemTable that meets the flush condition to disk.
*/
private static final double FLUSH_THRESHOLD = 0.2;
/**
* Maximum amount of memory allocated for write process.
*/
private static final long ALLOCATE_MEMORY_FOR_WRITE = CONFIG.getAllocateMemoryForWrite();
/**
* Metadata size of per timeseries, the default value is 2KB.
*/
private static final long TIMESERIES_METADATA_SIZE_IN_BYTE = 2L * 1024;
/**
* Metadata size of per chunk, the default value is 1.5 KB.
*/
private static final long CHUNK_METADATA_SIZE_IN_BYTE = 1536L;
/**
* Average queue length in memtable pool
*/
static final int MEM_TABLE_AVERAGE_QUEUE_LEN = 5;
// static memory section
/**
* Static memory, includes all timeseries metadata, which equals to
* TIMESERIES_METADATA_SIZE_IN_BYTE * totalTimeseriesNum, the unit is byte.
*
* Currently, we think that static memory only consists of time series metadata information.
* We ignore the memory occupied by the tsfile information maintained in memory,
* because we think that this part occupies very little memory.
*/
private long staticMemory;
private int totalTimeseries;
// MemTable section
private int maxMemTableNum = MEM_TABLE_AVERAGE_QUEUE_LEN;
private long currentMemTableSize;
// Adapter section
private boolean initialized = false;
@Override
public synchronized boolean tryToAdaptParameters() {
boolean canAdjust = true;
long memtableSizeInByte = calcMemTableSize();
long memTableSizeFloorThreshold = getMemTableSizeFloorThreshold();
long tsFileSize = CONFIG.getTsFileSizeThreshold();
if (memtableSizeInByte < memTableSizeFloorThreshold) {
LOGGER.debug("memtableSizeInByte {} is smaller than memTableSizeFloorThreshold {}",
memtableSizeInByte, memTableSizeFloorThreshold);
tsFileSize = calcTsFileSize(memTableSizeFloorThreshold);
memtableSizeInByte = memTableSizeFloorThreshold + ((tsFileSize - memTableSizeFloorThreshold) >> 1);
if (tsFileSize < memTableSizeFloorThreshold) {
canAdjust = false;
}
}
if (canAdjust) {
CONFIG.setMaxMemtableNumber(maxMemTableNum);
CONFIG.setTsFileSizeThreshold(tsFileSize);
CONFIG.setMemtableSizeThreshold(memtableSizeInByte);
LOGGER.debug(
"After adjusting, max memTable num is {}, tsFile threshold is {}, memtableSize is {}, memTableSizeFloorThreshold is {}",
maxMemTableNum, tsFileSize, memtableSizeInByte, memTableSizeFloorThreshold);
currentMemTableSize = memtableSizeInByte;
}
if (!initialized) {
CONFIG.setMaxMemtableNumber(maxMemTableNum);
return true;
}
return canAdjust;
}
/**
* Calculate appropriate MemTable size.
* Computing method refers to class annotations.
*
* @return MemTable byte size. If the value is -1, there is no valid solution.
*/
private int calcMemTableSize() {
double ratio = CompressionRatio.getInstance().getRatio();
// when unit is byte, it's likely to cause Long type overflow.
// so when b is larger than Integer.MAC_VALUE use the unit KB.
double a = ratio * maxMemTableNum;
double b = (ALLOCATE_MEMORY_FOR_WRITE - staticMemory) * ratio;
int magnification = b > Integer.MAX_VALUE ? 1024 : 1;
b /= magnification;
double c = (double) CONFIG.getTsFileSizeThreshold() * maxMemTableNum * CHUNK_METADATA_SIZE_IN_BYTE
* MManager
.getInstance().getMaximalSeriesNumberAmongStorageGroups() / magnification / magnification;
double tempValue = b * b - 4 * a * c;
double memTableSize = ((b + Math.sqrt(tempValue)) / (2 * a));
return tempValue < 0 ? -1 : (int) (memTableSize * magnification);
}
/**
* Calculate appropriate Tsfile size based on MemTable size.
* Computing method refers to class annotations.
*
* @param memTableSize MemTable size
* @return Tsfile byte threshold
*/
private long calcTsFileSize(long memTableSize) {
return (long) ((ALLOCATE_MEMORY_FOR_WRITE - maxMemTableNum * memTableSize - staticMemory) * CompressionRatio
.getInstance().getRatio()
* memTableSize / (maxMemTableNum * CHUNK_METADATA_SIZE_IN_BYTE * MManager.getInstance()
.getMaximalSeriesNumberAmongStorageGroups()));
}
/**
* Get the floor threshold MemTable size. For Primitive Array, we think that the maximum memory
* occupied by each value is 8 bytes. The reason for multiplying 2 is that the timestamp also
* takes 8 bytes.
*/
private int getMemTableSizeFloorThreshold() {
return MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups()
* PrimitiveArrayPool.ARRAY_SIZE * Long.BYTES * 2;
}
/**
* TODO: Currently IoTDB only supports to add a storage group.
*/
@Override
public void addOrDeleteStorageGroup(int diff) throws ConfigAdjusterException {
maxMemTableNum += 4 * diff;
if(!CONFIG.isEnableParameterAdapter()){
CONFIG.setMaxMemtableNumber(maxMemTableNum);
return;
}
if (!tryToAdaptParameters()) {
maxMemTableNum -= 4 * diff;
throw new ConfigAdjusterException(
"The IoTDB system load is too large to create storage group.");
}
}
@Override
public void addOrDeleteTimeSeries(int diff) throws ConfigAdjusterException {
if(!CONFIG.isEnableParameterAdapter()){
return;
}
totalTimeseries += diff;
staticMemory += diff * TIMESERIES_METADATA_SIZE_IN_BYTE;
if (!tryToAdaptParameters()) {
totalTimeseries -= diff;
staticMemory -= diff * TIMESERIES_METADATA_SIZE_IN_BYTE;
throw new ConfigAdjusterException("The IoTDB system load is too large to add timeseries.");
}
}
public void setInitialized(boolean initialized) {
this.initialized = initialized;
}
long getCurrentMemTableSize() {
return currentMemTableSize;
}
int getTotalTimeseries() {
return totalTimeseries;
}
/**
* Only for test
*/
public void reset() {
totalTimeseries = 0;
staticMemory = 0;
maxMemTableNum = MEM_TABLE_AVERAGE_QUEUE_LEN;
initialized = false;
}
private IoTDBConfigDynamicAdapter() {
}
public static IoTDBConfigDynamicAdapter getInstance() {
return IoTDBConfigAdapterHolder.INSTANCE;
}
private static class IoTDBConfigAdapterHolder {
private static final IoTDBConfigDynamicAdapter INSTANCE = new IoTDBConfigDynamicAdapter();
private IoTDBConfigAdapterHolder() {
}
}
}
......@@ -77,6 +77,9 @@ public class DirectoryManager {
File file = new File(folder);
if (file.mkdirs()) {
logger.info("folder {} doesn't exist, create it", file.getPath());
} else {
logger.info("create folder {} failed. Is the folder existed: {}", file.getPath(),
file.exists());
}
}
}
......
......@@ -18,12 +18,9 @@
*/
package org.apache.iotdb.db.cost.statistic;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
......@@ -158,15 +155,12 @@ public class Measurement implements MeasurementMBean, IService {
public void startStatistics() {
stateChangeLock.lock();
try {
if (isEnableStat) {
return;
}
isEnableStat = true;
if (consumeFuture != null) {
if (consumeFuture != null && !consumeFuture.isCancelled()) {
logger.info("The consuming task in measurement stat module is already running...");
} else {
consumeFuture = service.scheduleWithFixedDelay(
new Measurement.DisplayRunnable(), 20, displayIntervalInMs, TimeUnit.MILLISECONDS);
} else {
logger.info("The consuming task in measurement stat module is already running...");
}
} catch (Exception e) {
LOGGER.error("Find error when start performance statistic thread, because {}", e);
......@@ -179,11 +173,10 @@ public class Measurement implements MeasurementMBean, IService {
public void startContinuousPrintStatistics() {
stateChangeLock.lock();
try {
if (isEnableStat) {
return;
}
isEnableStat = true;
if (displayFuture != null) {
if (displayFuture != null && !displayFuture.isCancelled()) {
logger.info("The display task in measurement stat module is already running...");
} else {
displayFuture = service.scheduleWithFixedDelay(
new Measurement.DisplayRunnable(), 20, displayIntervalInMs, TimeUnit.MILLISECONDS);
}
......@@ -204,9 +197,6 @@ public class Measurement implements MeasurementMBean, IService {
public void stopPrintStatistic() {
stateChangeLock.lock();
try {
if (!isEnableStat) {
return;
}
displayFuture = cancelFuture(displayFuture);
} catch (Exception e) {
LOGGER.error("Find error when stop display thread, because {}", e);
......@@ -219,9 +209,6 @@ public class Measurement implements MeasurementMBean, IService {
public void stopStatistic() {
stateChangeLock.lock();
try {
if (!isEnableStat) {
return;
}
isEnableStat = false;
displayFuture = cancelFuture(displayFuture);
consumeFuture = cancelFuture(consumeFuture);
......@@ -261,11 +248,13 @@ public class Measurement implements MeasurementMBean, IService {
public void start() throws StartupException {
// start display thread and consumer threads.
logger.info("start the consuming task in the measurement stats module...");
this.clearStatisticalState();
if (service.isShutdown()) {
service = IoTDBThreadPoolFactory.newScheduledThreadPool(
2, ThreadName.TIME_COST_STATSTIC.getName());
}
this.clearStatisticalState();
//we have to check again because someone may channge the value.
isEnableStat = IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceStat();
if (isEnableStat) {
consumeFuture = service.schedule(new QueueConsumerThread(), 0, TimeUnit.MILLISECONDS);
}
......
......@@ -30,10 +30,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageEngineFailureException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
......@@ -55,8 +55,8 @@ public class StorageEngine implements IService {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor will have a
* subfolder under the systemDir.
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
* will have a subfolder under the systemDir.
*/
private final String systemDir;
......@@ -85,7 +85,7 @@ public class StorageEngine implements IService {
*/
try {
List<String> storageGroups = MManager.getInstance().getAllStorageGroupNames();
for (String storageGroup: storageGroups) {
for (String storageGroup : storageGroups) {
StorageGroupProcessor processor = new StorageGroupProcessor(systemDir, storageGroup);
logger.info("Storage Group Processor {} is recovered successfully", storageGroup);
processorMap.put(storageGroup, processor);
......@@ -126,12 +126,10 @@ public class StorageEngine implements IService {
logger.debug("construct a processor instance, the storage group is {}, Thread is {}",
storageGroupName, Thread.currentThread().getId());
processor = new StorageGroupProcessor(systemDir, storageGroupName);
synchronized (processorMap) {
processorMap.put(storageGroupName, processor);
}
}
}
}
return processor;
} catch (PathErrorException | ProcessorException e) {
logger.error("Fail to get StorageGroupProcessor {}", storageGroupName, e);
......@@ -174,26 +172,20 @@ public class StorageEngine implements IService {
* only for unit test
*/
public void asyncFlushAndSealAllFiles() {
synchronized (processorMap) {
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
}
}
}
/**
* flush command
* Sync asyncCloseOneProcessor all file node processors.
* flush command Sync asyncCloseOneProcessor all file node processors.
*/
public void syncCloseAllProcessor() {
logger.info("Start closing all storage group processor");
synchronized (processorMap){
for(StorageGroupProcessor processor: processorMap.values()){
for (StorageGroupProcessor processor : processorMap.values()) {
processor.waitForAllCurrentTsFileProcessorsClosed();
}
}
}
/**
* update data.
......@@ -230,8 +222,8 @@ public class StorageEngine implements IService {
}
/**
* end query on a given deviceId. If some TsFile has been merged and this query is the
* last query using it, the TsFile can be deleted safely.
* end query on a given deviceId. If some TsFile has been merged and this query is the last query
* using it, the TsFile can be deleted safely.
*/
public void endQuery(String deviceId, int token) throws StorageEngineException {
// TODO implement it when developing the merge function
......@@ -257,7 +249,8 @@ public class StorageEngine implements IService {
* @param appendFile the appended tsfile information
*/
@SuppressWarnings("unused") // reimplement sync module
public boolean appendFileToStorageGroupProcessor(String storageGroupName, TsFileResource appendFile,
public boolean appendFileToStorageGroupProcessor(String storageGroupName,
TsFileResource appendFile,
String appendFilePath) throws StorageEngineException {
// TODO reimplement sync module
return true;
......@@ -286,8 +279,8 @@ public class StorageEngine implements IService {
}
/**
* delete all data files (both memory data and file on disk) in a storage group.
* It is used when there is no timeseries (which are all deleted) in this storage group)
* delete all data files (both memory data and file on disk) in a storage group. It is used when
* there is no timeseries (which are all deleted) in this storage group)
*/
public void deleteAllDataFilesInOneStorageGroup(String storageGroupName) {
if (processorMap.containsKey(storageGroupName)) {
......
......@@ -54,7 +54,7 @@ public class ChunkBufferPool {
//we use the memtable number * maximal series number in one StroageGroup * 2 as the capacity
int capacity =
2 * MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups() * IoTDBDescriptor
.getInstance().getConfig().getMemtableNumber() + 100000;
.getInstance().getConfig().getMaxMemtableNumber() + 100000;
if (availableChunkBuffer.isEmpty() && size < capacity) {
size++;
return new ChunkBuffer(schema);
......@@ -93,7 +93,7 @@ public class ChunkBufferPool {
//we use the memtable number * maximal series number in one StroageGroup as the capacity
int capacity =
MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups() * IoTDBDescriptor
.getInstance().getConfig().getMemtableNumber();
.getInstance().getConfig().getMaxMemtableNumber();
if (size > capacity) {
size --;
} else {
......
......@@ -53,7 +53,7 @@ public class ReadOnlyMemChunk implements TimeValuePairSorter {
this.initialized = false;
this.props = props;
if (props.containsKey(Encoder.MAX_POINT_NUMBER)) {
this.floatPrecision = Integer.valueOf(props.get(Encoder.MAX_POINT_NUMBER));
this.floatPrecision = Integer.parseInt(props.get(Encoder.MAX_POINT_NUMBER));
}
}
......
......@@ -43,7 +43,7 @@ public class FlushManager {
}
/**
* Add BufferWriteProcessor to asyncFlush manager
* Add BufferWriteProcessor to asyncTryToFlush manager
*/
@SuppressWarnings("squid:S2445")
void registerTsFileProcessor(TsFileProcessor tsFileProcessor) {
......
......@@ -33,7 +33,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
......@@ -44,8 +43,8 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
......@@ -73,8 +72,8 @@ import org.slf4j.LoggerFactory;
* (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
* shouldClose())<br/>
*
* (2) someone calls waitForAllCurrentTsFileProcessorsClosed().
* (up to now, only flush command from cli will call this method)<br/>
* (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, only flush command from
* cli will call this method)<br/>
*
* UnSequence data has the similar process as above.
*
......@@ -115,13 +114,13 @@ public class StorageGroupProcessor {
private TsFileProcessor workUnSequenceTsFileProcessor = null;
private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
/**
* device -> global latest timestamp of each device
* latestTimeForEachDevice caches non-flushed changes upon timestamps of each device, and is used
* to update latestFlushedTimeForEachDevice when a flush is issued.
* device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
* changes upon timestamps of each device, and is used to update latestFlushedTimeForEachDevice
* when a flush is issued.
*/
private Map<String, Long> latestTimeForEachDevice = new HashMap<>();
/**
* device -> largest timestamp of the latest memtable to be submitted to asyncFlush
* device -> largest timestamp of the latest memtable to be submitted to asyncTryToFlush
* latestFlushedTimeForEachDevice determines whether a data point should be put into a sequential
* file or an unsequential file. Data of some device with timestamp less than or equals to the
* device's latestFlushedTime should go into an unsequential file.
......@@ -129,22 +128,22 @@ public class StorageGroupProcessor {
private Map<String, Long> latestFlushedTimeForEachDevice = new HashMap<>();
private String storageGroupName;
/**
* versionController assigns a version for each MemTable and deletion/update such that after
* they are persisted, the order of insertions, deletions and updates can be re-determined.
* versionController assigns a version for each MemTable and deletion/update such that after they
* are persisted, the order of insertions, deletions and updates can be re-determined.
*/
private VersionController versionController;
/**
* mergeDeleteLock is to be used in the merge process. Concurrent deletion and merge may result in
* losing some deletion in the merged new file, so a lock is necessary.
* TODO reconsidering this when implementing the merge process.
* losing some deletion in the merged new file, so a lock is necessary. TODO reconsidering this
* when implementing the merge process.
*/
@SuppressWarnings("unused") // to be used in merge
private ReentrantLock mergeDeleteLock = new ReentrantLock();
/**
* This is the modification file of the result of the current merge. Because the merged file
* may be invisible at this moment, without this, deletion/update during merge could be lost.
* This is the modification file of the result of the current merge. Because the merged file may
* be invisible at this moment, without this, deletion/update during merge could be lost.
*/
private ModificationFile mergingModification;
......@@ -161,6 +160,9 @@ public class StorageGroupProcessor {
if (storageGroupSysDir.mkdirs()) {
logger.info("Storage Group system Directory {} doesn't exist, create it",
storageGroupSysDir.getPath());
} else if(!storageGroupSysDir.exists()) {
logger.error("craete Storage Group system Directory {} failed",
storageGroupSysDir.getPath());
}
versionController = new SimpleFileVersionController(storageGroupSysDir.getPath());
......@@ -315,7 +317,7 @@ public class StorageGroupProcessor {
latestTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
}
// check memtable size and may asyncFlush the work memtable
// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
tsFileProcessor.getWorkMemTableMemory(),
......@@ -507,7 +509,8 @@ public class StorageGroupProcessor {
/**
* Delete data whose timestamp <= 'timestamp' and belongs to the timeseries deviceId.measurementId.
* Delete data whose timestamp <= 'timestamp' and belongs to the timeseries
* deviceId.measurementId.
*
* @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
......@@ -640,6 +643,7 @@ public class StorageGroupProcessor {
@FunctionalInterface
public interface CloseTsFileCallBack {
void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
}
......
......@@ -30,12 +30,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.memtable.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
import org.apache.iotdb.db.rescon.MemTablePool;
import org.apache.iotdb.db.engine.memtable.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
......@@ -46,10 +46,10 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.MemTablePool;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
......@@ -78,16 +78,15 @@ public class TsFileProcessor {
private ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
/**
* It is set by the StorageGroupProcessor and checked by flush threads.
* (If shouldClose == true and its flushingMemTables are all flushed, then the flush thread will
* close this file.)
* It is set by the StorageGroupProcessor and checked by flush threads. (If shouldClose == true
* and its flushingMemTables are all flushed, then the flush thread will close this file.)
*/
private volatile boolean shouldClose;
private IMemTable workMemTable;
/**
* sync this object in query() and asyncFlush()
* sync this object in query() and asyncTryToFlush()
*/
private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
......@@ -108,6 +107,8 @@ public class TsFileProcessor {
private boolean sequence;
private long totalMemTableSize;
TsFileProcessor(String storageGroupName, File tsfile, FileSchema fileSchema,
VersionController versionController,
CloseTsFileCallBack closeTsFileCallback,
......@@ -192,7 +193,8 @@ public class TsFileProcessor {
boolean shouldFlush() {
return workMemTable.memSize() > TSFileConfig.groupSizeInByte;
return workMemTable != null && workMemTable.memSize() > IoTDBDescriptor.getInstance()
.getConfig().getMemtableSizeThreshold();
}
......@@ -274,7 +276,6 @@ public class TsFileProcessor {
flushQueryLock.writeLock().unlock();
}
synchronized (tmpMemTable) {
try {
long startWait = System.currentTimeMillis();
......@@ -334,6 +335,9 @@ public class TsFileProcessor {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
getLogNode().notifyStartFlush();
}
if (!tobeFlushed.isSignalMemTable()) {
totalMemTableSize += tobeFlushed.memSize();
}
workMemTable = null;
FlushManager.getInstance().registerTsFileProcessor(this);
}
......@@ -400,6 +404,17 @@ public class TsFileProcessor {
if (shouldClose && flushingMemTables.isEmpty()) {
try {
writer.mark();
try {
double compressionRatio = ((double) totalMemTableSize) / writer.getPos();
logger.debug("totalMemTableSize: {}, writer.getPos(): {}", totalMemTableSize,
writer.getPos());
if (compressionRatio == 0) {
logger.error("compressionRatio = 0, please check the log.");
}
CompressionRatio.getInstance().updateRatio(compressionRatio);
} catch (IOException e) {
logger.error("update compression ratio failed", e);
}
endFile();
} catch (IOException | TsFileProcessorException e) {
logger.error("meet error when flush FileMetadata to {}, change system mode to read-only", tsFileResource.getFile().getAbsolutePath());
......@@ -504,7 +519,8 @@ public class TsFileProcessor {
if (flushingMemTable.isSignalMemTable()) {
continue;
}
ReadOnlyMemChunk memChunk = flushingMemTable.query(deviceId, measurementId, dataType, props);
ReadOnlyMemChunk memChunk = flushingMemTable
.query(deviceId, measurementId, dataType, props);
if (memChunk != null) {
memSeriesLazyMerger.addMemSeries(memChunk);
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.exception;
public class ConfigAdjusterException extends Exception {
public ConfigAdjusterException() {
}
public ConfigAdjusterException(String message) {
super(message);
}
public ConfigAdjusterException(String message, Throwable cause) {
super(message, cause);
}
public ConfigAdjusterException(Throwable cause) {
super(cause);
}
}
......@@ -33,7 +33,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.monitor.MonitorConstants;
......@@ -77,11 +79,16 @@ public class MManager {
private MManager() {
schemaDir = IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "schema";
schemaDir =
IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "schema";
File systemFolder = new File(schemaDir);
if (!systemFolder.exists()) {
systemFolder.mkdirs();
if (systemFolder.mkdirs()) {
logger.info("create system folder {}", systemFolder.getAbsolutePath());
} else {
logger.info("create system folder {} failed.", systemFolder.getAbsolutePath());
}
}
logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG;
writeToLog = false;
......@@ -114,9 +121,6 @@ public class MManager {
}
}
};
init();
initialized = true;
}
public static MManager getInstance() {
......@@ -125,12 +129,14 @@ public class MManager {
//Because the writer will be used later and should not be closed here.
@SuppressWarnings("squid:S2093")
private void init() {
public void init() {
if(initialized){
return;
}
lock.writeLock().lock();
File logFile = new File(logFilePath);
try {
initFromLog(logFile);
seriesNumberInStorageGroups = mgraph.countSeriesNumberInEachStorageGroup();
......@@ -147,6 +153,7 @@ public class MManager {
} finally {
lock.writeLock().unlock();
}
initialized = true;
}
......@@ -155,7 +162,7 @@ public class MManager {
// init the metadata from the operation log
mgraph = new MGraph(ROOT_NAME);
if (logFile.exists()) {
try( FileReader fr = new FileReader(logFile);
try (FileReader fr = new FileReader(logFile);
BufferedReader br = new BufferedReader(fr)) {
String cmd;
while ((cmd = br.readLine()) != null) {
......@@ -239,7 +246,11 @@ public class MManager {
File logFile = new File(logFilePath);
File metadataDir = new File(schemaDir);
if (!metadataDir.exists()) {
metadataDir.mkdirs();
if (metadataDir.mkdirs()) {
logger.info("create schema folder {}.", metadataDir);
} else {
logger.info("create schema folder {} failed.", metadataDir);
}
}
FileWriter fileWriter;
fileWriter = new FileWriter(logFile, true);
......@@ -261,8 +272,8 @@ public class MManager {
* @param dataType the datetype {@code DataType} for the timeseries
* @param encoding the encoding function {@code Encoding} for the timeseries
* @param compressor the compressor function {@code Compressor} for the time series
* @return whether the measurement occurs for the first time in this storage group (if true,
* the measurement should be registered to the StorageEngine too)
* @return whether the measurement occurs for the first time in this storage group (if true, the
* measurement should be registered to the StorageEngine too)
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public boolean addPathToMTree(Path path, TSDataType dataType, TSEncoding encoding,
......@@ -282,6 +293,11 @@ public class MManager {
} catch (PathErrorException e) {
throw new MetadataErrorException(e);
}
try {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
} catch (ConfigAdjusterException e) {
throw new MetadataErrorException(e);
}
// the two map is stored in the storage group node
Map<String, MeasurementSchema> schemaMap = getStorageGroupSchemaMap(fileNodePath);
Map<String, Integer> numSchemaMap = getStorageGroupNumSchemaMap(fileNodePath);
......@@ -332,7 +348,6 @@ public class MManager {
CompressionType compressor, Map<String, String> props)
throws PathErrorException, IOException {
lock.writeLock().lock();
try {
mgraph.addPathToMTree(path, dataType, encoding, compressor, props);
......@@ -416,10 +431,10 @@ public class MManager {
/**
* delete given paths from metadata.
*
* @param deletePathList list of paths to be deleted
* @return a set contains StorageGroups that contain no more timeseries
* after this deletion and files of such StorageGroups should be deleted to reclaim disk space.
* @throws MetadataErrorException
* @return a set contains StorageGroups that contain no more timeseries after this deletion and
* files of such StorageGroups should be deleted to reclaim disk space.
*/
public Set<String> deletePaths(List<Path> deletePathList)
throws MetadataErrorException {
......@@ -428,6 +443,11 @@ public class MManager {
Set<String> emptyStorageGroups = new HashSet<>();
for (String p : fullPath) {
try {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(-1);
} catch (ConfigAdjusterException e) {
throw new MetadataErrorException(e);
}
String emptiedStorageGroup = deletePath(p);
if (emptiedStorageGroup != null) {
emptyStorageGroups.add(emptiedStorageGroup);
......@@ -511,17 +531,11 @@ public class MManager {
* function for setting storage level of the given path to mTree.
*/
public void setStorageLevelToMTree(String path) throws MetadataErrorException {
lock.writeLock().lock();
try {
checkAndGetDataTypeCache.clear();
mNodeCache.clear();
// if (current storage groups + the new storage group + the statistic storage group) * 2 > total memtable number
if ((seriesNumberInStorageGroups.size() + 2) * 2 > IoTDBDescriptor.getInstance().getConfig()
.getMemtableNumber()) {
throw new PathErrorException(
"too many storage groups, please increase the number of memtable");
}
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
mgraph.setStorageLevel(path);
seriesNumberInStorageGroups.put(path, 0);
if (writeToLog) {
......@@ -530,15 +544,23 @@ public class MManager {
writer.newLine();
writer.flush();
}
} catch (IOException | PathErrorException e) {
} catch (IOException | ConfigAdjusterException e) {
throw new MetadataErrorException(e);
} finally{
} catch (PathErrorException e) {
try {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
} catch (ConfigAdjusterException ex) {
throw new MetadataErrorException(ex);
}
throw new MetadataErrorException(e);
} finally {
lock.writeLock().unlock();
}
}
/**
* function for checking if the given path is storage level of mTree or not.
*
* @apiNote :for cluster
*/
boolean checkStorageLevelOfMTree(String path) {
......@@ -749,10 +771,9 @@ public class MManager {
}
/**
* @deprecated Get all MeasurementSchemas for given delta object type.
*
* @param path A seriesPath represented one Delta object
* @return a list contains all column schema
* @deprecated Get all MeasurementSchemas for given delta object type.
*/
@Deprecated
public List<MeasurementSchema> getSchemaForOneType(String path) throws PathErrorException {
......@@ -1018,8 +1039,8 @@ public class MManager {
}
/**
* Get MeasurementSchema for given seriesPath. Notice: Path must be a complete Path from root to leaf
* node.
* Get MeasurementSchema for given seriesPath. Notice: Path must be a complete Path from root to
* leaf node.
*/
private MeasurementSchema getSchemaForOnePath(String path) throws PathErrorException {
......@@ -1172,14 +1193,23 @@ public class MManager {
}
}
/**
* Only for test
*/
public void setMaxSeriesNumberAmongStorageGroup(int maxSeriesNumberAmongStorageGroup) {
this.maxSeriesNumberAmongStorageGroup = maxSeriesNumberAmongStorageGroup;
}
public int getMaximalSeriesNumberAmongStorageGroups() {
return maxSeriesNumberAmongStorageGroup;
}
private static class MManagerHolder {
private MManagerHolder(){
private MManagerHolder() {
//allowed to do nothing
}
private static final MManager INSTANCE = new MManager();
}
......
......@@ -20,6 +20,7 @@ package org.apache.iotdb.db.rescon;
import java.util.ArrayDeque;
import java.util.Deque;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
......@@ -28,16 +29,12 @@ import org.slf4j.LoggerFactory;
public class MemTablePool {
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private static final Logger logger = LoggerFactory.getLogger(MemTablePool.class);
private static final Deque<IMemTable> availableMemTables = new ArrayDeque<>();
/**
* >= number of storage group * 2
* TODO check this parameter to ensure that capaity * MaxMemTable Size < JVM memory / 2
*/
private static int capacity = IoTDBDescriptor.getInstance().getConfig().getMemtableNumber();
private int size = 0;
private static final int WAIT_TIME = 2000;
......@@ -47,7 +44,7 @@ public class MemTablePool {
public IMemTable getAvailableMemTable(Object applier) {
synchronized (availableMemTables) {
if (availableMemTables.isEmpty() && size < capacity) {
if (availableMemTables.isEmpty() && size < CONFIG.getMaxMemtableNumber()) {
size++;
logger.info("generated a new memtable for {}, system memtable size: {}, stack size: {}",
applier, size, availableMemTables.size());
......@@ -84,6 +81,14 @@ public class MemTablePool {
return;
}
synchronized (availableMemTables) {
// because of dynamic parameter adjust, the max number of memtable may decrease.
if (size > CONFIG.getMaxMemtableNumber()) {
logger.debug(
"Currently the size of available MemTables is {}, the maxmin size of MemTables is {}, discard this MemTable.",
CONFIG.getMaxMemtableNumber(), size);
size--;
return;
}
memTable.clear();
availableMemTables.push(memTable);
availableMemTables.notify();
......@@ -91,6 +96,10 @@ public class MemTablePool {
}
}
public int getSize() {
return size;
}
public static MemTablePool getInstance() {
return InstanceHolder.INSTANCE;
}
......
......@@ -21,13 +21,15 @@ package org.apache.iotdb.db.service;
import org.apache.iotdb.db.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.builder.ExceptionBuilder;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.sync.receiver.SyncServerManager;
import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.sync.receiver.SyncServerManager;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -86,6 +88,7 @@ public class IoTDB implements IoTDBMBean {
StatMonitor.getInstance().recovery();
}
initMManager();
registerManager.register(StorageEngine.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
registerManager.register(JMXService.getInstance());
......@@ -110,6 +113,18 @@ public class IoTDB implements IoTDBMBean {
logger.info("IoTDB is deactivated.");
}
private void initMManager(){
MManager.getInstance().init();
IoTDBConfigDynamicAdapter.getInstance().setInitialized(true);
logger.debug("After initializing, ");
logger.debug(
"After initializing, max memTable num is {}, tsFile threshold is {}, memtableSize is {}",
IoTDBDescriptor.getInstance().getConfig().getMaxMemtableNumber(),
IoTDBDescriptor.getInstance().getConfig().getTsFileSizeThreshold(),
IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold());
}
@Override
public void stop() {
deactivate();
......
......@@ -166,8 +166,8 @@ public class JDBCService implements JDBCServiceMBean, IService {
reset();
logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("{}: close {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
Thread.currentThread().interrupt();
}
}
......
......@@ -70,7 +70,9 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
this.identifier = identifier;
this.logDirectory =
DirectoryManager.getInstance().getWALFolder() + File.separator + this.identifier;
new File(logDirectory).mkdirs();
if (new File(logDirectory).mkdirs()) {
logger.info("create the WAL folder {}." + logDirectory);
}
}
@Override
......@@ -237,7 +239,9 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
private void nextFileWriter() {
fileId++;
File newFile = new File(logDirectory, WAL_FILE_NAME + fileId);
newFile.getParentFile().mkdirs();
if (newFile.getParentFile().mkdirs()) {
logger.info("create WAL parent folder {}.", newFile.getParent());
}
currentFileWriter = new LogWriter(newFile);
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.conf.adapter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class CompressionRatioTest {
private static IoTDB daemon;
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private CompressionRatio compressionRatio = CompressionRatio.getInstance();
private static final String directory = FilePathUtils.regularizePath(CONFIG.getSystemDir())
+ CompressionRatio.COMPRESSION_RATIO_DIR;
@Before
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
daemon = IoTDB.getInstance();
daemon.active();
EnvironmentUtils.envSetUp();
FileUtils.forceMkdir(new File(directory));
compressionRatio.reset();
compressionRatio.restore();
IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(true);
}
@After
public void tearDown() throws Exception {
daemon.stop();
EnvironmentUtils.cleanEnv();
}
@Test
public void testCompressionRatio() throws IOException {
double compressionRatioSum = 0;
int calcuTimes = 0;
if (new File(directory, String.format(CompressionRatio.RATIO_FILE_PATH_FORMAT, compressionRatioSum , calcuTimes)).exists()) {
fail();
}
double compressionRatio = 10;
for(int i = 0; i < 500 ; i+= compressionRatio){
this.compressionRatio.updateRatio(compressionRatio);
if (new File(directory, String.format(CompressionRatio.RATIO_FILE_PATH_FORMAT, compressionRatioSum , calcuTimes)).exists()) {
fail();
}
calcuTimes++;
compressionRatioSum += compressionRatio;
if (!new File(directory, String.format(CompressionRatio.RATIO_FILE_PATH_FORMAT, compressionRatioSum , calcuTimes)).exists()) {
fail();
}
assertEquals(0, Double
.compare(compressionRatioSum / calcuTimes, this.compressionRatio.getRatio()));
}
}
@Test
public void testRestore() throws IOException {
double compressionRatioSum = 0;
int calcuTimes = 0;
if (new File(directory, String.format(CompressionRatio.RATIO_FILE_PATH_FORMAT, compressionRatioSum , calcuTimes)).exists()) {
fail();
}
int compressionRatio = 10;
for(int i = 0; i < 100 ; i+= compressionRatio){
this.compressionRatio.updateRatio(compressionRatio);
if (new File(directory, String.format(CompressionRatio.RATIO_FILE_PATH_FORMAT, compressionRatioSum , calcuTimes)).exists()) {
fail();
}
calcuTimes++;
compressionRatioSum += compressionRatio;
if (!new File(directory, String.format(CompressionRatio.RATIO_FILE_PATH_FORMAT, compressionRatioSum , calcuTimes)).exists()) {
fail();
}
assertEquals(0, Double
.compare(compressionRatioSum / calcuTimes, this.compressionRatio.getRatio()));
}
this.compressionRatio.restore();
assertEquals(10, this.compressionRatio.getCalcTimes());
assertEquals(0, Double
.compare(compressionRatioSum / calcuTimes, this.compressionRatio.getRatio()));
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.conf.adapter;
import static org.junit.Assert.assertEquals;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class IoTDBConfigDynamicAdapterTest {
private static IoTDB daemon;
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private long oldTsFileThreshold = CONFIG.getTsFileSizeThreshold();
private int oldMaxMemTableNumber = CONFIG.getMaxMemtableNumber();
private long oldGroupSizeInByte = CONFIG.getMemtableSizeThreshold();
@Before
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
daemon = IoTDB.getInstance();
daemon.active();
EnvironmentUtils.envSetUp();
IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(true);
}
@After
public void tearDown() throws Exception {
daemon.stop();
EnvironmentUtils.cleanEnv();
CONFIG.setMaxMemtableNumber(oldMaxMemTableNumber);
CONFIG.setTsFileSizeThreshold(oldTsFileThreshold);
CONFIG.setMemtableSizeThreshold(oldGroupSizeInByte);
MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(0);
IoTDBConfigDynamicAdapter.getInstance().reset();
}
@Test
public void addOrDeleteStorageGroup() throws ConfigAdjusterException {
System.out.println(
"System total memory : " + Runtime.getRuntime().maxMemory() / IoTDBConstant.MB
+ "MB");
int memTableNum = IoTDBConfigDynamicAdapter.MEM_TABLE_AVERAGE_QUEUE_LEN;
for (int i = 1; i < 100; i++) {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
}
MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(100);
for (int i = 1; i < 1000000; i++) {
try {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
memTableNum += 4;
assertEquals(IoTDBConfigDynamicAdapter.getInstance().getCurrentMemTableSize(),
CONFIG.getMemtableSizeThreshold());
assertEquals(CONFIG.getMaxMemtableNumber(), memTableNum);
} catch (ConfigAdjusterException e) {
assertEquals("The IoTDB system load is too large to create storage group.", e.getMessage());
System.out.println("it has created " + i + " storage groups.");
assertEquals(CONFIG.getMaxMemtableNumber(), memTableNum);
break;
}
}
}
@Test
public void addOrDeleteTimeSeries() throws ConfigAdjusterException {
System.out.println(
"System total memory : " + Runtime.getRuntime().maxMemory() / IoTDBConstant.MB
+ "MB");
int totalTimeseries = 0;
for (int i = 1; i < 100; i++) {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
}
MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(100);
for (int i = 1; i < 1000000; i++) {
try {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
if (i % 10 == 0) {
MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(i);
}
totalTimeseries += 1;
assertEquals(IoTDBConfigDynamicAdapter.getInstance().getCurrentMemTableSize(),
CONFIG.getMemtableSizeThreshold());
assertEquals(IoTDBConfigDynamicAdapter.getInstance().getTotalTimeseries(),
totalTimeseries);
} catch (ConfigAdjusterException e) {
assertEquals("The IoTDB system load is too large to add timeseries.", e.getMessage());
System.out.println("it has added " + i + " timeseries.");
assertEquals(IoTDBConfigDynamicAdapter.getInstance().getTotalTimeseries(),
totalTimeseries);
break;
}
}
}
@Test
public void addOrDeleteTimeSeriesSyso() throws ConfigAdjusterException {
int sgNum = 1;
for (int i = 0; i < 30; i++) {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(sgNum);
}
int i = 1;
try {
for (; i <= 280 * 3200; i++) {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(i / 30 + 1);
}
} catch (ConfigAdjusterException e) {
assertEquals("The IoTDB system load is too large to add timeseries.", e.getMessage());
}
int j =0;
try {
while (true) {
j++;
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups() + 1);
}
} catch (ConfigAdjusterException e ) {
System.out.println(j);
assertEquals("The IoTDB system load is too large to add timeseries.", e.getMessage());
}
}
}
\ No newline at end of file
......@@ -84,7 +84,7 @@ public class PerformanceStatTest {
}
@Test
public void testSwith() {
public void testSwitch() {
Measurement measurement = Measurement.INSTANCE;
try {
measurement.start();
......
......@@ -36,6 +36,7 @@ public class MetadataManagerHelper {
public static void initMetadata() {
mmanager = MManager.getInstance();
mmanager.init();
mmanager.clear();
try {
mmanager.setStorageLevelToMTree("root.vehicle.d0");
......
......@@ -29,6 +29,7 @@ public class MemTablePoolTest {
private ConcurrentLinkedQueue<IMemTable> memTables;
private Thread thread = new ReturnThread();
private volatile boolean isFinished = false;
@Before
public void setUp() throws Exception {
......@@ -38,7 +39,9 @@ public class MemTablePoolTest {
@After
public void tearDown() throws Exception {
thread.interrupt();
isFinished = true;
thread.join();
System.out.println(MemTablePool.getInstance().getSize());
}
@Test
......@@ -57,35 +60,32 @@ public class MemTablePoolTest {
long start = System.currentTimeMillis();
TreeMap<Long, Long> treeMap = new TreeMap<>();
for (int i = 0; i < 1000000; i++) {
treeMap.put((long)i, (long)i);
treeMap.put((long) i, (long) i);
}
start = System.currentTimeMillis() - start;
System.out.println("time cost: " + start);
}
class ReturnThread extends Thread{
class ReturnThread extends Thread {
@Override
public void run() {
while (true) {
if(isInterrupted()){
if (isInterrupted()) {
break;
}
IMemTable memTable = memTables.poll();
if (memTable == null) {
if (isFinished) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
memTables.remove(memTable);
MemTablePool.getInstance().putBack(memTable, "test case");
}
......
......@@ -53,6 +53,7 @@ import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class DeletionFileNodeTest {
......
......@@ -46,6 +46,7 @@ import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class DeletionQueryTest {
......
......@@ -26,6 +26,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.Ignore;
import org.junit.Test;
public class ModificationFileTest {
......
......@@ -128,7 +128,7 @@ public class IoTDBAggregationSmallDataIT {
daemon.active();
EnvironmentUtils.envSetUp();
Thread.sleep(5000);
//Thread.sleep(5000);
insertSQL();
}
......
......@@ -28,6 +28,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.db.service.IoTDB;
......@@ -75,6 +76,7 @@ public class IoTDBEngineTimeGeneratorIT {
tsFileConfig.maxNumberOfPointsInPage = 100;
tsFileConfig.pageSizeInByte = 1024 * 1024 * 150;
tsFileConfig.groupSizeInByte = 1024 * 1024 * 100;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1024 * 100);
daemon = IoTDB.getInstance();
daemon.active();
......@@ -92,6 +94,7 @@ public class IoTDBEngineTimeGeneratorIT {
tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
tsFileConfig.pageSizeInByte = pageSizeInByte;
tsFileConfig.groupSizeInByte = groupSizeInByte;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
EnvironmentUtils.cleanEnv();
}
......
......@@ -27,6 +27,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
......@@ -64,6 +65,7 @@ public class IoTDBLargeDataIT {
tsFileConfig.maxNumberOfPointsInPage = 1000;
tsFileConfig.pageSizeInByte = 1024 * 150;
tsFileConfig.groupSizeInByte = 1024 * 1000;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1000);
daemon = IoTDB.getInstance();
daemon.active();
......@@ -81,6 +83,7 @@ public class IoTDBLargeDataIT {
tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
tsFileConfig.pageSizeInByte = pageSizeInByte;
tsFileConfig.groupSizeInByte = groupSizeInByte;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
EnvironmentUtils.cleanEnv();
}
......
......@@ -27,6 +27,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
......@@ -66,6 +67,7 @@ public class IoTDBMultiSeriesIT {
tsFileConfig.maxNumberOfPointsInPage = 1000;
tsFileConfig.pageSizeInByte = 1024 * 150;
tsFileConfig.groupSizeInByte = 1024 * 1000;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1000);
daemon = IoTDB.getInstance();
daemon.active();
......@@ -82,6 +84,7 @@ public class IoTDBMultiSeriesIT {
tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
tsFileConfig.pageSizeInByte = pageSizeInByte;
tsFileConfig.groupSizeInByte = groupSizeInByte;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
EnvironmentUtils.cleanEnv();
}
......
......@@ -28,6 +28,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
......@@ -78,6 +79,7 @@ public class IoTDBSequenceDataQueryIT {
tsFileConfig.maxNumberOfPointsInPage = 100;
tsFileConfig.pageSizeInByte = 1024 * 1024 * 150;
tsFileConfig.groupSizeInByte = 1024 * 1024 * 100;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1024 * 100);
daemon = IoTDB.getInstance();
daemon.active();
......@@ -93,6 +95,7 @@ public class IoTDBSequenceDataQueryIT {
tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
tsFileConfig.pageSizeInByte = pageSizeInByte;
tsFileConfig.groupSizeInByte = groupSizeInByte;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
EnvironmentUtils.cleanEnv();
}
......
......@@ -30,6 +30,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
......@@ -79,6 +80,7 @@ public class IoTDBSeriesReaderIT {
tsFileConfig.maxNumberOfPointsInPage = 1000;
tsFileConfig.pageSizeInByte = 1024 * 1024 * 150;
tsFileConfig.groupSizeInByte = 1024 * 1024 * 1000;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1024 * 1000);
daemon = IoTDB.getInstance();
daemon.active();
......@@ -99,6 +101,7 @@ public class IoTDBSeriesReaderIT {
tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
tsFileConfig.pageSizeInByte = pageSizeInByte;
tsFileConfig.groupSizeInByte = groupSizeInByte;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
EnvironmentUtils.cleanEnv();
}
......
......@@ -30,6 +30,7 @@ public class MGraphTest {
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
}
@After
......
......@@ -41,6 +41,7 @@ public class MManagerAdvancedTest {
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
mmanager = MManager.getInstance();
mmanager.setStorageLevelToMTree("root.vehicle.d0");
......@@ -60,6 +61,7 @@ public class MManagerAdvancedTest {
mmanager.addPathToMTree("root.vehicle.d1.s3", "DOUBLE", "RLE");
mmanager.addPathToMTree("root.vehicle.d1.s4", "BOOLEAN", "PLAIN");
mmanager.addPathToMTree("root.vehicle.d1.s5", "TEXT", "PLAIN");
}
@After
......
......@@ -46,6 +46,7 @@ public class MManagerBasicTest {
@Before
public void setUp() throws Exception {
compressionType = CompressionType.valueOf(TSFileConfig.compressor);
EnvironmentUtils.envSetUp();
}
@After
......
......@@ -41,6 +41,7 @@ public class MManagerImproveTest {
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
mManager = MManager.getInstance();
mManager.setStorageLevelToMTree("root.t1.v2");
......
......@@ -40,6 +40,7 @@ public class MTreeTest {
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
}
@After
......
......@@ -32,6 +32,7 @@ public class MetadataTest {
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
}
@After
......
......@@ -40,6 +40,10 @@ public class QueryProcessorTest {
private MManager mManager = MManager.getInstance();
private QueryProcessor processor = new QueryProcessor(new QueryProcessExecutor());
static {
MManager.getInstance().init();
}
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
......
......@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.QueryProcessor;
import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
......@@ -78,6 +79,10 @@ public class EngineDataSetWithValueFilterTest {
"insert into root.test.d0(timestamp,s0,s1) values(700,1307,'1038')",
"insert into root.test.d0(timestamp,s1) values(3000,'1309')"};
static {
MManager.getInstance().init();
}
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
......
......@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader;
import java.io.IOException;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
......@@ -37,10 +38,14 @@ public abstract class ReaderTestHelper {
protected StorageGroupProcessor storageGroupProcessor;
private String systemDir = "data/info";
static {
MManager.getInstance().init();
}
@Before
public void setUp() throws Exception {
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
MetadataManagerHelper.initMetadata();
storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup);
insertData();
}
......
......@@ -26,6 +26,8 @@ import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
......@@ -38,6 +40,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -60,6 +63,12 @@ public class EnvironmentUtils {
public static long TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
public static QueryContext TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
private static long oldTsFileThreshold = config.getTsFileSizeThreshold();
private static int oldMaxMemTableNumber = config.getMaxMemtableNumber();
private static long oldGroupSizeInByte = config.getMemtableSizeThreshold();
public static void cleanEnv() throws IOException, StorageEngineException {
QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
......@@ -85,6 +94,11 @@ public class EnvironmentUtils {
MManager.getInstance().clear();
// delete all directory
cleanAllDir();
config.setMaxMemtableNumber(oldMaxMemTableNumber);
config.setTsFileSizeThreshold(oldTsFileThreshold);
config.setMemtableSizeThreshold(oldGroupSizeInByte);
IoTDBConfigDynamicAdapter.getInstance().reset();
}
private static void cleanAllDir() throws IOException {
......@@ -126,6 +140,10 @@ public class EnvironmentUtils {
* this function should be called before all code in the setup
*/
public static void envSetUp() throws StartupException, IOException {
IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(false);
MManager.getInstance().init();
IoTDBConfigDynamicAdapter.getInstance().setInitialized(true);
createAllDir();
// disable the system monitor
config.setEnableStatMonitor(false);
......
......@@ -25,6 +25,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.MemUtils;
......@@ -62,6 +63,7 @@ public class IoTDBLogFileSizeTest {
}
groupSize = TSFileConfig.groupSizeInByte;
TSFileConfig.groupSizeInByte = 8 * 1024 * 1024;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(8 * 1024 * 1024);
EnvironmentUtils.closeStatMonitor();
daemon = IoTDB.getInstance();
daemon.active();
......@@ -75,6 +77,7 @@ public class IoTDBLogFileSizeTest {
return;
}
TSFileConfig.groupSizeInByte = groupSize;
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSize);
executeSQL(tearDownSqls);
daemon.stop();
Thread.sleep(5000);
......
......@@ -41,7 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* a restorable tsfile which do not depend on a restore file.
* a restorable tsfile.
*/
public class RestorableTsFileIOWriter extends TsFileIOWriter {
......@@ -183,7 +183,6 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
return append;
}
/**
* Given a TsFile, generate a writable RestorableTsFileIOWriter. That is, for a complete TsFile,
* the function erases all FileMetadata and supports writing new data; For a incomplete TsFile,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册