未验证 提交 8d79a5e4 编写于 作者: S shuwenwei 提交者: GitHub

Add hot load compaction configs (#10758)

上级 144e5428
......@@ -35,6 +35,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.constant.CrossCompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.constant.InnerSequenceCompactionSelector;
......@@ -1107,6 +1108,94 @@ public class IoTDBDescriptor {
loadWALHotModifiedProps(properties);
}
private void loadCompactionHotModifiedProps(Properties properties) throws InterruptedException {
conf.setCompactionValidationLevel(
CompactionValidationLevel.valueOf(
properties.getProperty(
"compaction_validation_level", conf.getCompactionValidationLevel().toString())));
loadCompactionIsEnabledHotModifiedProps(properties);
boolean restartCompactionTaskManager = loadCompactionThreadCountHotModifiedProps(properties);
restartCompactionTaskManager |= loadCompactionSubTaskCountHotModifiedProps(properties);
if (restartCompactionTaskManager) {
CompactionTaskManager.getInstance().restart();
}
}
private boolean loadCompactionThreadCountHotModifiedProps(Properties properties) {
int newConfigCompactionThreadCount =
Integer.parseInt(
properties.getProperty(
"compaction_thread_count", Integer.toString(conf.getCompactionThreadCount())));
if (newConfigCompactionThreadCount <= 0) {
logger.error("compaction_thread_count must greater than 0");
return false;
}
if (newConfigCompactionThreadCount == conf.getCompactionThreadCount()) {
return false;
}
conf.setCompactionThreadCount(
Integer.parseInt(
properties.getProperty(
"compaction_thread_count", Integer.toString(conf.getCompactionThreadCount()))));
return true;
}
private boolean loadCompactionSubTaskCountHotModifiedProps(Properties properties) {
int newConfigSubtaskNum =
Integer.parseInt(
properties.getProperty(
"sub_compaction_thread_count", Integer.toString(conf.getSubCompactionTaskNum())));
if (newConfigSubtaskNum <= 0) {
logger.error("sub_compaction_thread_count must greater than 0");
return false;
}
if (newConfigSubtaskNum == conf.getSubCompactionTaskNum()) {
return false;
}
conf.setSubCompactionTaskNum(newConfigSubtaskNum);
return true;
}
private void loadCompactionIsEnabledHotModifiedProps(Properties properties) {
boolean isCompactionEnabled =
conf.isEnableSeqSpaceCompaction()
|| conf.isEnableUnseqSpaceCompaction()
|| conf.isEnableCrossSpaceCompaction();
boolean newConfigEnableCrossSpaceCompaction =
Boolean.parseBoolean(
properties.getProperty(
"enable_cross_space_compaction",
Boolean.toString(conf.isEnableCrossSpaceCompaction())));
boolean newConfigEnableSeqSpaceCompaction =
Boolean.parseBoolean(
properties.getProperty(
"enable_seq_space_compaction",
Boolean.toString(conf.isEnableSeqSpaceCompaction())));
boolean newConfigEnableUnseqSpaceCompaction =
Boolean.parseBoolean(
properties.getProperty(
"enable_unseq_space_compaction",
Boolean.toString(conf.isEnableUnseqSpaceCompaction())));
boolean compactionEnabledInNewConfig =
newConfigEnableCrossSpaceCompaction
|| newConfigEnableSeqSpaceCompaction
|| newConfigEnableUnseqSpaceCompaction;
if (!isCompactionEnabled && compactionEnabledInNewConfig) {
logger.error("Compaction cannot start in current status.");
return;
}
conf.setEnableCrossSpaceCompaction(newConfigEnableCrossSpaceCompaction);
conf.setEnableSeqSpaceCompaction(newConfigEnableSeqSpaceCompaction);
conf.setEnableUnseqSpaceCompaction(newConfigEnableUnseqSpaceCompaction);
}
private void loadWALHotModifiedProps(Properties properties) {
long walAsyncModeFsyncDelayInMs =
Long.parseLong(
......@@ -1531,6 +1620,9 @@ public class IoTDBDescriptor {
WALManager.getInstance().rebootWALDeleteThread();
}
// update compaction config
loadCompactionHotModifiedProps(properties);
// update schema quota configuration
conf.setClusterSchemaLimitLevel(
properties
......
......@@ -398,7 +398,6 @@ public class CompactionTaskManager implements IService {
return storageGroupName + "-" + dataRegionId;
}
@TestOnly
public void restart() throws InterruptedException {
if (IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() > 0) {
if (subCompactionTaskExecutionPool != null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册