未验证 提交 2aefcb31 编写于 作者: Z ZhangHongYin 提交者: GitHub

[IOTDB-4000] Add the control of the size of wal folder in multiLeader consensus. (#6836)

上级 ea70aea2
......@@ -202,6 +202,9 @@ public class MultiLeaderConfig {
private final int maxWaitingTimeForAccumulatingBatchInMs;
private final long basicRetryWaitTimeMs;
private final long maxRetryWaitTimeMs;
private final long throttleDownThreshold;
private final long throttleUpThreshold;
private final long throttleTimeOutMs;
private Replication(
int maxPendingRequestNumPerNode,
......@@ -209,13 +212,19 @@ public class MultiLeaderConfig {
int maxPendingBatch,
int maxWaitingTimeForAccumulatingBatchInMs,
long basicRetryWaitTimeMs,
long maxRetryWaitTimeMs) {
long maxRetryWaitTimeMs,
long throttleDownThreshold,
long throttleUpThreshold,
long throttleTimeOutMs) {
this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
this.maxRequestPerBatch = maxRequestPerBatch;
this.maxPendingBatch = maxPendingBatch;
this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
this.throttleDownThreshold = throttleDownThreshold;
this.throttleUpThreshold = throttleUpThreshold;
this.throttleTimeOutMs = throttleTimeOutMs;
}
public int getMaxPendingRequestNumPerNode() {
......@@ -242,6 +251,18 @@ public class MultiLeaderConfig {
return maxRetryWaitTimeMs;
}
public long getThrottleDownThreshold() {
return throttleDownThreshold;
}
public long getThrottleUpThreshold() {
return throttleUpThreshold;
}
public long getThrottleTimeOutMs() {
return throttleTimeOutMs;
}
public static Replication.Builder newBuilder() {
return new Replication.Builder();
}
......@@ -255,6 +276,9 @@ public class MultiLeaderConfig {
private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
private long throttleDownThreshold = 50 * 1024 * 1024 * 1024L;
private long throttleUpThreshold = 1024 * 1024 * 1024L;
private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
public Replication.Builder setMaxPendingRequestNumPerNode(int maxPendingRequestNumPerNode) {
this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
......@@ -287,6 +311,21 @@ public class MultiLeaderConfig {
return this;
}
public Replication.Builder setThrottleDownThreshold(long throttleDownThreshold) {
this.throttleDownThreshold = throttleDownThreshold;
return this;
}
public Replication.Builder setThrottleUpThreshold(long throttleUpThreshold) {
this.throttleUpThreshold = throttleUpThreshold;
return this;
}
public Replication.Builder setThrottleTimeOutMs(long throttleTimeOutMs) {
this.throttleTimeOutMs = throttleTimeOutMs;
return this;
}
public Replication build() {
return new Replication(
maxPendingRequestNumPerNode,
......@@ -294,7 +333,10 @@ public class MultiLeaderConfig {
maxPendingBatch,
maxWaitingTimeForAccumulatingBatchInMs,
basicRetryWaitTimeMs,
maxRetryWaitTimeMs);
maxRetryWaitTimeMs,
throttleDownThreshold,
throttleUpThreshold,
throttleTimeOutMs);
}
}
}
......
......@@ -61,6 +61,7 @@ public class MultiLeaderServerImpl {
private final AtomicLong index;
private final LogDispatcher logDispatcher;
private final MultiLeaderConfig config;
private final ConsensusReqReader reader;
public MultiLeaderServerImpl(
String storageDir,
......@@ -80,9 +81,7 @@ public class MultiLeaderServerImpl {
}
this.config = config;
this.logDispatcher = new LogDispatcher(this, clientManager);
// restart
ConsensusReqReader reader =
(ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
reader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
long currentSearchIndex = reader.getCurrentSearchIndex();
if (1 == configuration.size()) {
// only one configuration means single replica.
......@@ -110,6 +109,18 @@ public class MultiLeaderServerImpl {
*/
public TSStatus write(IConsensusRequest request) {
synchronized (stateMachine) {
if (needToThrottleDown()) {
logger.info(
"[Throttle Down] index:{}, safeIndex:{}",
getIndex(),
getCurrentSafelyDeletedSearchIndex());
try {
stateMachine.wait(config.getReplication().getThrottleTimeOutMs());
} catch (InterruptedException e) {
logger.error("Failed to throttle down because ", e);
Thread.currentThread().interrupt();
}
}
IndexedConsensusRequest indexedConsensusRequest =
buildIndexedConsensusRequestForLocalRequest(request);
if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
......@@ -226,6 +237,16 @@ public class MultiLeaderServerImpl {
return config;
}
public boolean needToThrottleDown() {
return reader.getTotalSize() > config.getReplication().getThrottleDownThreshold();
}
public boolean needToThrottleUp() {
return reader.getTotalSize()
< config.getReplication().getThrottleDownThreshold()
- config.getReplication().getThrottleUpThreshold();
}
public AtomicLong getIndexObject() {
return index;
}
......
......@@ -222,6 +222,12 @@ public class LogDispatcher {
// indicating that insert nodes whose search index are before this value can be deleted
// safely
reader.setSafelyDeletedSearchIndex(impl.getCurrentSafelyDeletedSearchIndex());
// notify
if (impl.needToThrottleUp()) {
synchronized (impl.getStateMachine()) {
impl.getStateMachine().notifyAll();
}
}
}
public PendingBatch getBatch() {
......
......@@ -79,4 +79,7 @@ public interface ConsensusReqReader {
/** Get current search index */
long getCurrentSearchIndex();
/** Get total size of wal files */
long getTotalSize();
}
......@@ -47,6 +47,11 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
return requestSets.getLocalRequestNumber();
}
@Override
public long getTotalSize() {
return 0;
}
private class FakeConsensusReqIterator implements ConsensusReqReader.ReqIterator {
private long nextSearchIndex;
......
......@@ -178,6 +178,11 @@ target_config_nodes=127.0.0.1:22277
# Datatype: long
# delete_wal_files_period_in_ms=20000
# The minimum size of wal files when throttle down in MultiLeader consensus
# If it's a value smaller than 0, use the default value 50 * 1024 * 1024 * 1024 bytes (50GB).
# Datatype: long
# multi_leader_throttle_down_threshold_in_byte=53687091200
####################
### Directory Configuration
####################
......
......@@ -948,6 +948,9 @@ public class IoTDBConfig {
/** Maximum execution time of a DriverTask */
private int driverTaskExecutionTimeSliceInMs = 100;
/** Maximum size of wal buffer used in MultiLeader consensus. Unit: byte */
private long throttleDownThreshold = 50 * 1024 * 1024 * 1024L;
IoTDBConfig() {}
public float getUdfMemoryBudgetInMB() {
......@@ -3016,4 +3019,12 @@ public class IoTDBConfig {
public void setDriverTaskExecutionTimeSliceInMs(int driverTaskExecutionTimeSliceInMs) {
this.driverTaskExecutionTimeSliceInMs = driverTaskExecutionTimeSliceInMs;
}
public long getThrottleDownThreshold() {
return throttleDownThreshold;
}
public void setThrottleDownThreshold(long throttleDownThreshold) {
this.throttleDownThreshold = throttleDownThreshold;
}
}
......@@ -1073,6 +1073,15 @@ public class IoTDBDescriptor {
if (deleteWalFilesPeriod > 0) {
conf.setDeleteWalFilesPeriodInMs(deleteWalFilesPeriod);
}
long throttleDownThresholdInByte =
Long.parseLong(
properties.getProperty(
"multi_leader_throttle_down_threshold_in_byte",
Long.toString(conf.getThrottleDownThreshold())));
if (throttleDownThresholdInByte > 0) {
conf.setThrottleDownThreshold(throttleDownThresholdInByte);
}
}
private void loadAutoCreateSchemaProps(Properties properties) {
......
......@@ -77,6 +77,10 @@ public class DataRegionConsensusImpl {
conf.getThriftServerAwaitTimeForStopService())
.setThriftMaxFrameSize(conf.getThriftMaxFrameSize())
.build())
.setReplication(
MultiLeaderConfig.Replication.newBuilder()
.setThrottleDownThreshold(conf.getThrottleDownThreshold())
.build())
.build())
.setRatisConfig(
RatisConfig.newBuilder()
......
......@@ -116,6 +116,11 @@ public class WALFakeNode implements IWALNode {
throw new UnsupportedOperationException();
}
@Override
public long getTotalSize() {
return 0;
}
public static WALFakeNode getFailureInstance(Exception e) {
return new WALFakeNode(
WALFlushListener.Status.FAILURE,
......
......@@ -749,6 +749,11 @@ public class WALNode implements IWALNode {
return buffer.getCurrentSearchIndex();
}
@Override
public long getTotalSize() {
return WALManager.getInstance().getTotalDiskUsage();
}
// endregion
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册