未验证 提交 168cbbd4 编写于 作者: zhanglingzhe0820's avatar zhanglingzhe0820 提交者: GitHub

Add merge speed limiting (#1712)

上级 32abc110
......@@ -308,6 +308,9 @@ force_full_merge=false
# When less than 0, this mechanism is disabled.
chunk_merge_point_threshold=20480
# The limit of write throughput merge can reach per second
merge_throughput_mb_per_sec=16
####################
### Metadata Cache Configuration
####################
......
......@@ -516,6 +516,11 @@ public class IoTDBConfig {
*/
private int chunkMergePointThreshold = 20480;
/**
* The limit of write throughput merge can reach per second
*/
private int mergeThroughputMbPerSec = 16;
private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM;
/**
......@@ -1221,6 +1226,14 @@ public class IoTDBConfig {
this.chunkMergePointThreshold = chunkMergePointThreshold;
}
public int getMergeThroughputMbPerSec() {
return mergeThroughputMbPerSec;
}
public void setMergeThroughputMbPerSec(int mergeThroughputMbPerSec) {
this.mergeThroughputMbPerSec = mergeThroughputMbPerSec;
}
public long getMemtableSizeThreshold() {
return memtableSizeThreshold;
}
......
......@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.merge.manage;
import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
......@@ -59,6 +60,7 @@ public class MergeManager implements IService, MergeManagerMBean {
private final String mbeanName = String
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
getID().getJmxName());
private final RateLimiter mergeRateLimiter = RateLimiter.create(Double.MAX_VALUE);
private AtomicInteger threadCnt = new AtomicInteger();
private ThreadPoolExecutor mergeTaskPool;
......@@ -72,6 +74,35 @@ public class MergeManager implements IService, MergeManagerMBean {
private MergeManager() {
}
public RateLimiter getMergeRateLimiter() {
setMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeThroughputMbPerSec());
return mergeRateLimiter;
}
/**
* wait by throughoutMbPerSec limit to avoid continuous Write
*/
public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength) {
while (bytesLength >= Integer.MAX_VALUE) {
limiter.acquire(Integer.MAX_VALUE);
bytesLength -= Integer.MAX_VALUE;
}
if (bytesLength > 0) {
limiter.acquire((int) bytesLength);
}
}
private void setMergeRate(final double throughoutMbPerSec) {
double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
// if throughout = 0, disable rate limiting
if (throughout == 0) {
throughout = Double.MAX_VALUE;
}
if (mergeRateLimiter.getRate() != throughout) {
mergeRateLimiter.setRate(throughout);
}
}
public static MergeManager getINSTANCE() {
return INSTANCE;
}
......@@ -84,7 +115,9 @@ public class MergeManager implements IService, MergeManagerMBean {
public Future<Void> submitChunkSubTask(MergeChunkHeapTask task) {
MergeFuture future = (MergeFuture) mergeChunkSubTaskPool.submit(task);
storageGroupSubTasks.computeIfAbsent(task.getStorageGroupName(), k -> new ConcurrentSkipListSet<>()).add(future);
storageGroupSubTasks
.computeIfAbsent(task.getStorageGroupName(), k -> new ConcurrentSkipListSet<>())
.add(future);
return future;
}
......@@ -103,18 +136,18 @@ public class MergeManager implements IService, MergeManagerMBean {
}
mergeTaskPool = new MergeThreadPool(threadNum,
r -> new Thread(r, "MergeThread-" + threadCnt.getAndIncrement()));
r -> new Thread(r, "MergeThread-" + threadCnt.getAndIncrement()));
mergeChunkSubTaskPool = new MergeThreadPool(threadNum * chunkSubThreadNum,
r -> new Thread(r, "MergeChunkSubThread-" + threadCnt.getAndIncrement()));
r -> new Thread(r, "MergeChunkSubThread-" + threadCnt.getAndIncrement()));
long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
if (mergeInterval > 0) {
timedMergeThreadPool = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r,
timedMergeThreadPool = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,
"TimedMergeThread"));
timedMergeThreadPool.scheduleAtFixedRate(this::mergeAll, mergeInterval,
mergeInterval, TimeUnit.SECONDS);
}
taskCleanerThreadPool = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r,
taskCleanerThreadPool = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,
"MergeTaskCleaner"));
taskCleanerThreadPool.scheduleAtFixedRate(this::cleanFinishedTask, 30, 30, TimeUnit.MINUTES);
logger.info("MergeManager started");
......@@ -134,11 +167,11 @@ public class MergeManager implements IService, MergeManagerMBean {
mergeChunkSubTaskPool.shutdownNow();
logger.info("Waiting for task pool to shut down");
long startTime = System.currentTimeMillis();
while (!mergeTaskPool.isTerminated() || !mergeChunkSubTaskPool.isTerminated() ) {
while (!mergeTaskPool.isTerminated() || !mergeChunkSubTaskPool.isTerminated()) {
// wait
long time = System.currentTimeMillis() - startTime;
if (time % 60_000 == 0) {
logger.warn("MergeManager has wait for {} seconds to stop", time/1000);
logger.warn("MergeManager has wait for {} seconds to stop", time / 1000);
}
}
mergeTaskPool = null;
......@@ -163,11 +196,11 @@ public class MergeManager implements IService, MergeManagerMBean {
awaitTermination(mergeChunkSubTaskPool, millseconds);
logger.info("Waiting for task pool to shut down");
long startTime = System.currentTimeMillis();
while (!mergeTaskPool.isTerminated() || !mergeChunkSubTaskPool.isTerminated() ) {
while (!mergeTaskPool.isTerminated() || !mergeChunkSubTaskPool.isTerminated()) {
// wait
long time = System.currentTimeMillis() - startTime;
if (time % 60_000 == 0) {
logger.warn("MergeManager has wait for {} seconds to stop", time/1000);
logger.warn("MergeManager has wait for {} seconds to stop", time / 1000);
}
}
mergeTaskPool = null;
......@@ -195,7 +228,8 @@ public class MergeManager implements IService, MergeManagerMBean {
private void mergeAll() {
try {
StorageEngine.getInstance().mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
StorageEngine.getInstance()
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
} catch (StorageEngineException e) {
logger.error("Cannot perform a global merge because", e);
}
......@@ -204,7 +238,6 @@ public class MergeManager implements IService, MergeManagerMBean {
/**
* Abort all merges of a storage group. The caller must acquire the write lock of the
* corresponding storage group.
* @param storageGroup
*/
@Override
public void abortMerge(String storageGroup) {
......@@ -242,10 +275,9 @@ public class MergeManager implements IService, MergeManagerMBean {
}
/**
*
* @return 2 maps, the first map contains status of main merge tasks and the second map
* contains status of merge chunk heap tasks, both map use storage groups as keys and list of
* merge status as values.
* @return 2 maps, the first map contains status of main merge tasks and the second map contains
* status of merge chunk heap tasks, both map use storage groups as keys and list of merge status
* as values.
*/
public Map<String, List<TaskStatus>>[] collectTaskStatus() {
Map<String, List<TaskStatus>>[] result = new Map[2];
......@@ -275,7 +307,8 @@ public class MergeManager implements IService, MergeManagerMBean {
for (Entry<String, List<TaskStatus>> stringListEntry : statusMaps[0].entrySet()) {
String storageGroup = stringListEntry.getKey();
List<TaskStatus> statusList = stringListEntry.getValue();
builder.append("\t").append("Storage group: ").append(storageGroup).append(System.lineSeparator());
builder.append("\t").append("Storage group: ").append(storageGroup)
.append(System.lineSeparator());
for (TaskStatus status : statusList) {
builder.append("\t\t").append(status.toString()).append(System.lineSeparator());
}
......@@ -285,7 +318,8 @@ public class MergeManager implements IService, MergeManagerMBean {
for (Entry<String, List<TaskStatus>> stringListEntry : statusMaps[1].entrySet()) {
String storageGroup = stringListEntry.getKey();
List<TaskStatus> statusList = stringListEntry.getValue();
builder.append("\t").append("Storage group: ").append(storageGroup).append(System.lineSeparator());
builder.append("\t").append("Storage group: ").append(storageGroup)
.append(System.lineSeparator());
for (TaskStatus status : statusList) {
builder.append("\t\t").append(status.toString()).append(System.lineSeparator());
}
......@@ -301,6 +335,7 @@ public class MergeManager implements IService, MergeManagerMBean {
}
public static class TaskStatus {
private String taskName;
private String createdTime;
private String progress;
......
......@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.tsfilemanagement.utils;
import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair;
import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
......@@ -29,6 +30,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
......@@ -89,7 +91,7 @@ public class HotCompactionUtils {
return new Pair<>(newChunkMetadata, newChunk);
}
private static long writeUnseqChunk(String storageGroup,
private static long readUnseqChunk(String storageGroup,
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String deviceId, long maxVersion,
String measurementId,
Map<Long, TimeValuePair> timeValuePairMap, List<TsFileResource> levelResources)
......@@ -157,7 +159,7 @@ public class HotCompactionUtils {
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap = new HashMap<>();
RateLimiter compactionRateLimiter = MergeManager.getINSTANCE().getMergeRateLimiter();
fillDeviceMeasurementMap(devices, deviceMeasurementMap, tsFileResources,
tsFileSequenceReaderMap, storageGroup);
if (!sequence) {
......@@ -170,7 +172,7 @@ public class HotCompactionUtils {
.entrySet()) {
String measurementId = entry.getKey();
Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
maxVersion = writeUnseqChunk(storageGroup, tsFileSequenceReaderMap, deviceId,
maxVersion = readUnseqChunk(storageGroup, tsFileSequenceReaderMap, deviceId,
maxVersion, measurementId, timeValuePairMap, tsFileResources);
IChunkWriter chunkWriter = new ChunkWriterImpl(entry.getValue());
for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
......@@ -178,6 +180,9 @@ public class HotCompactionUtils {
targetResource.updateStartTime(deviceId, timeValuePair.getTimestamp());
targetResource.updateEndTime(deviceId, timeValuePair.getTimestamp());
}
// wait for limit write
MergeManager
.mergeRateLimiterAcquire(compactionRateLimiter, chunkWriter.getCurrentChunkSize());
chunkWriter.writeToFileWriter(writer);
}
writer.writeVersion(maxVersion);
......@@ -199,6 +204,9 @@ public class HotCompactionUtils {
ChunkMetadata newChunkMetadata = chunkPair.left;
Chunk newChunk = chunkPair.right;
if (newChunkMetadata != null && newChunk != null) {
// wait for limit write
MergeManager.mergeRateLimiterAcquire(compactionRateLimiter,
newChunk.getHeader().getDataSize() + newChunk.getData().position());
writer.writeChunk(newChunk, newChunkMetadata);
targetResource.updateStartTime(deviceId, newChunkMetadata.getStartTime());
targetResource.updateEndTime(deviceId, newChunkMetadata.getEndTime());
......
......@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.merge;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.common.util.concurrent.RateLimiter;
import java.util.PriorityQueue;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask;
......@@ -30,6 +31,21 @@ import org.junit.Test;
public class MergeManagerTest extends MergeTest {
@Test
public void testRateLimiter() {
RateLimiter compactionRateLimiter = MergeManager.getINSTANCE().getMergeRateLimiter();
long startTime = System.currentTimeMillis();
MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 160 * 1024 * 1024L);
assertTrue((System.currentTimeMillis() - startTime) < 1000);
MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 16 * 1024 * 1024L);
assertTrue((System.currentTimeMillis() - startTime) > 9000
&& (System.currentTimeMillis() - startTime) < 10000);
MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 16 * 1024 * 1024L);
System.out.println("run time:" + (System.currentTimeMillis() - startTime));
MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 160 * 1024 * 1024L);
System.out.println("run time:" + (System.currentTimeMillis() - startTime));
}
@Test
public void testGenMergeReport() {
FakedMergeMultiChunkTask chunkTask = new FakedMergeMultiChunkTask();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册