未验证 提交 8519fb60 编写于 作者: J Jialin Qiao 提交者: GitHub

Make dynamic parameter controlable (#1042)

* make dynamic parameter adapter and  ActiveTimeSeriesCounter controlable
上级 b2336949
...@@ -127,6 +127,9 @@ public class IoTDBConfigDynamicAdapter implements IDynamicAdapter { ...@@ -127,6 +127,9 @@ public class IoTDBConfigDynamicAdapter implements IDynamicAdapter {
@Override @Override
public synchronized boolean tryToAdaptParameters() { public synchronized boolean tryToAdaptParameters() {
if(!CONFIG.isEnableParameterAdapter()){
return true;
}
boolean canAdjust = true; boolean canAdjust = true;
double ratio = CompressionRatio.getInstance().getRatio(); double ratio = CompressionRatio.getInstance().getRatio();
long memtableSizeInByte = calcMemTableSize(ratio); long memtableSizeInByte = calcMemTableSize(ratio);
...@@ -218,10 +221,13 @@ public class IoTDBConfigDynamicAdapter implements IDynamicAdapter { ...@@ -218,10 +221,13 @@ public class IoTDBConfigDynamicAdapter implements IDynamicAdapter {
public void addOrDeleteStorageGroup(int diff) throws ConfigAdjusterException { public void addOrDeleteStorageGroup(int diff) throws ConfigAdjusterException {
totalStorageGroup += diff; totalStorageGroup += diff;
maxMemTableNum += IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff; maxMemTableNum += IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff;
if(!CONFIG.isEnableParameterAdapter()){ if(!CONFIG.isEnableParameterAdapter()){
// the maxMemTableNum will also be set in tryToAdaptParameters, this should not move out
CONFIG.setMaxMemtableNumber(maxMemTableNum); CONFIG.setMaxMemtableNumber(maxMemTableNum);
return; return;
} }
if (!tryToAdaptParameters()) { if (!tryToAdaptParameters()) {
totalStorageGroup -= diff; totalStorageGroup -= diff;
maxMemTableNum -= IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff; maxMemTableNum -= IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff;
......
...@@ -22,6 +22,7 @@ import java.io.IOException; ...@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager; import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.IMemTable;
...@@ -82,11 +83,15 @@ public class MemTableFlushTask { ...@@ -82,11 +83,15 @@ public class MemTableFlushTask {
sortTime += System.currentTimeMillis() - startTime; sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.add(new Pair<>(tvList, desc)); encodingTaskQueue.add(new Pair<>(tvList, desc));
// register active time series to the ActiveTimeSeriesCounter // register active time series to the ActiveTimeSeriesCounter
ActiveTimeSeriesCounter.getInstance().offer(storageGroup, deviceId, measurementId); if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
ActiveTimeSeriesCounter.getInstance().offer(storageGroup, deviceId, measurementId);
}
} }
encodingTaskQueue.add(new EndChunkGroupIoTask()); encodingTaskQueue.add(new EndChunkGroupIoTask());
} }
ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup); if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup);
}
noMoreEncodingTask = true; noMoreEncodingTask = true;
logger.debug( logger.debug(
"Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.", "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
......
...@@ -261,6 +261,9 @@ public class TsFileProcessor { ...@@ -261,6 +261,9 @@ public class TsFileProcessor {
*/ */
private long getMemtableSizeThresholdBasedOnSeriesNum() { private long getMemtableSizeThresholdBasedOnSeriesNum() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
if(!config.isEnableParameterAdapter()){
return config.getMemtableSizeThreshold();
}
long memTableSize = (long) (config.getMemtableSizeThreshold() * config.getMaxMemtableNumber() long memTableSize = (long) (config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
/ IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() / IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup()
* ActiveTimeSeriesCounter.getInstance() * ActiveTimeSeriesCounter.getInstance()
...@@ -268,7 +271,6 @@ public class TsFileProcessor { ...@@ -268,7 +271,6 @@ public class TsFileProcessor {
return Math.max(memTableSize, config.getMemtableSizeThreshold()); return Math.max(memTableSize, config.getMemtableSizeThreshold());
} }
public boolean shouldClose() { public boolean shouldClose() {
long fileSize = tsFileResource.getFileSize(); long fileSize = tsFileResource.getFileSize();
long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig() long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
......
...@@ -85,7 +85,7 @@ public class MManager { ...@@ -85,7 +85,7 @@ public class MManager {
// device -> DeviceMNode // device -> DeviceMNode
private RandomDeleteCache<String, MNode> mNodeCache; private RandomDeleteCache<String, MNode> mNodeCache;
private Map<String, Integer> seriesNumberInStorageGroups = new HashMap<>(); private Map<String, Integer> seriesNumberInStorageGroups;
private long maxSeriesNumberAmongStorageGroup; private long maxSeriesNumberAmongStorageGroup;
private boolean initialized; private boolean initialized;
private IoTDBConfig config; private IoTDBConfig config;
...@@ -136,20 +136,22 @@ public class MManager { ...@@ -136,20 +136,22 @@ public class MManager {
try { try {
initFromLog(logFile); initFromLog(logFile);
// storage group name -> the series number if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
seriesNumberInStorageGroups = new HashMap<>(); // storage group name -> the series number
List<String> storageGroups = mtree.getAllStorageGroupNames(); seriesNumberInStorageGroups = new HashMap<>();
for (String sg : storageGroups) { List<String> storageGroups = mtree.getAllStorageGroupNames();
MNode node = mtree.getNodeByPath(sg); for (String sg : storageGroups) {
seriesNumberInStorageGroups.put(sg, node.getLeafCount()); MNode node = mtree.getNodeByPath(sg);
seriesNumberInStorageGroups.put(sg, node.getLeafCount());
}
if (seriesNumberInStorageGroups.isEmpty()) {
maxSeriesNumberAmongStorageGroup = 0;
} else {
maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream()
.max(Integer::compareTo).get();
}
} }
if (seriesNumberInStorageGroups.isEmpty()) {
maxSeriesNumberAmongStorageGroup = 0;
} else {
maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream()
.max(Integer::compareTo).get();
}
writeToLog = true; writeToLog = true;
} catch (IOException | MetadataException e) { } catch (IOException | MetadataException e) {
mtree = new MTree(); mtree = new MTree();
...@@ -184,7 +186,9 @@ public class MManager { ...@@ -184,7 +186,9 @@ public class MManager {
try { try {
this.mtree = new MTree(); this.mtree = new MTree();
this.mNodeCache.clear(); this.mNodeCache.clear();
this.seriesNumberInStorageGroups.clear(); if (seriesNumberInStorageGroups != null) {
this.seriesNumberInStorageGroups.clear();
}
this.maxSeriesNumberAmongStorageGroup = 0; this.maxSeriesNumberAmongStorageGroup = 0;
if (logWriter != null) { if (logWriter != null) {
logWriter.close(); logWriter.close();
...@@ -294,10 +298,12 @@ public class MManager { ...@@ -294,10 +298,12 @@ public class MManager {
createTimeseriesWithMemoryCheckAndLog(path, dataType, encoding, compressor, props); createTimeseriesWithMemoryCheckAndLog(path, dataType, encoding, compressor, props);
// update statistics // update statistics
int size = seriesNumberInStorageGroups.get(storageGroupName); if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
seriesNumberInStorageGroups.put(storageGroupName, size + 1); int size = seriesNumberInStorageGroups.get(storageGroupName);
if (size + 1 > maxSeriesNumberAmongStorageGroup) { seriesNumberInStorageGroups.put(storageGroupName, size + 1);
maxSeriesNumberAmongStorageGroup = size + 1; if (size + 1 > maxSeriesNumberAmongStorageGroup) {
maxSeriesNumberAmongStorageGroup = size + 1;
}
} }
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
...@@ -361,12 +367,16 @@ public class MManager { ...@@ -361,12 +367,16 @@ public class MManager {
public Set<String> deleteTimeseries(String prefixPath) throws MetadataException { public Set<String> deleteTimeseries(String prefixPath) throws MetadataException {
lock.writeLock().lock(); lock.writeLock().lock();
if (isStorageGroup(prefixPath)) { if (isStorageGroup(prefixPath)) {
int size = seriesNumberInStorageGroups.get(prefixPath);
seriesNumberInStorageGroups.put(prefixPath, 0); if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
if (size == maxSeriesNumberAmongStorageGroup) { int size = seriesNumberInStorageGroups.get(prefixPath);
seriesNumberInStorageGroups.values().stream().max(Integer::compareTo) seriesNumberInStorageGroups.put(prefixPath, 0);
.ifPresent(val -> maxSeriesNumberAmongStorageGroup = val); if (size == maxSeriesNumberAmongStorageGroup) {
seriesNumberInStorageGroups.values().stream().max(Integer::compareTo)
.ifPresent(val -> maxSeriesNumberAmongStorageGroup = val);
}
} }
mNodeCache.clear(); mNodeCache.clear();
} }
try { try {
...@@ -412,12 +422,15 @@ public class MManager { ...@@ -412,12 +422,15 @@ public class MManager {
} catch (ConfigAdjusterException e) { } catch (ConfigAdjusterException e) {
throw new MetadataException(e); throw new MetadataException(e);
} }
String storageGroup = getStorageGroupName(path);
int size = seriesNumberInStorageGroups.get(storageGroup); if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
seriesNumberInStorageGroups.put(storageGroup, size - 1); String storageGroup = getStorageGroupName(path);
if (size == maxSeriesNumberAmongStorageGroup) { int size = seriesNumberInStorageGroups.get(storageGroup);
seriesNumberInStorageGroups.values().stream().max(Integer::compareTo) seriesNumberInStorageGroups.put(storageGroup, size - 1);
.ifPresent(val -> maxSeriesNumberAmongStorageGroup = val); if (size == maxSeriesNumberAmongStorageGroup) {
seriesNumberInStorageGroups.values().stream().max(Integer::compareTo)
.ifPresent(val -> maxSeriesNumberAmongStorageGroup = val);
}
} }
return storageGroupName; return storageGroupName;
} finally { } finally {
...@@ -441,8 +454,10 @@ public class MManager { ...@@ -441,8 +454,10 @@ public class MManager {
writer.flush(); writer.flush();
} }
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1); IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
ActiveTimeSeriesCounter.getInstance().init(storageGroup); if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
seriesNumberInStorageGroups.put(storageGroup, 0); ActiveTimeSeriesCounter.getInstance().init(storageGroup);
seriesNumberInStorageGroups.put(storageGroup, 0);
}
} catch (IOException e) { } catch (IOException e) {
throw new MetadataException(e.getMessage()); throw new MetadataException(e.getMessage());
} catch (ConfigAdjusterException e) { } catch (ConfigAdjusterException e) {
...@@ -473,17 +488,20 @@ public class MManager { ...@@ -473,17 +488,20 @@ public class MManager {
writer.flush(); writer.flush();
} }
mNodeCache.clear(); mNodeCache.clear();
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
int size = seriesNumberInStorageGroups.get(storageGroup); if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(size * -1); IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
ActiveTimeSeriesCounter.getInstance().delete(storageGroup); int size = seriesNumberInStorageGroups.get(storageGroup);
seriesNumberInStorageGroups.remove(storageGroup); IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(size * -1);
if (size == maxSeriesNumberAmongStorageGroup) { ActiveTimeSeriesCounter.getInstance().delete(storageGroup);
if (seriesNumberInStorageGroups.isEmpty()) { seriesNumberInStorageGroups.remove(storageGroup);
maxSeriesNumberAmongStorageGroup = 0; if (size == maxSeriesNumberAmongStorageGroup) {
} else { if (seriesNumberInStorageGroups.isEmpty()) {
maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream() maxSeriesNumberAmongStorageGroup = 0;
.max(Integer::compareTo).get(); } else {
maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream()
.max(Integer::compareTo).get();
}
} }
} }
} }
......
...@@ -44,15 +44,19 @@ import org.junit.Test; ...@@ -44,15 +44,19 @@ import org.junit.Test;
public class MManagerBasicTest { public class MManagerBasicTest {
private CompressionType compressionType; private CompressionType compressionType;
private boolean canAdjust = IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
canAdjust = IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter();
compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor(); compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
EnvironmentUtils.envSetUp(); EnvironmentUtils.envSetUp();
IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(true);
} }
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(canAdjust);
EnvironmentUtils.cleanEnv(); EnvironmentUtils.cleanEnv();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册