未验证 提交 118d6cb9 编写于 作者: S Steve Yurong Su (宇荣) 提交者: GitHub

Primitive Array Manager v3 (#3513)

上级 f1bc4e32
......@@ -69,7 +69,6 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.RandomDeleteCache;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.db.utils.TestOnly;
......@@ -154,8 +153,6 @@ public class MManager {
// tag key -> tag value -> LeafMNode
private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new ConcurrentHashMap<>();
// data type -> number
private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
private AtomicLong totalSeriesNumber = new AtomicLong();
private boolean initialized;
protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
......@@ -332,7 +329,6 @@ public class MManager {
tagLogFile.close();
tagLogFile = null;
}
this.schemaDataTypeNumMap.clear();
initialized = false;
if (config.isEnableMTreeSnapshot() && timedCreateMTreeSnapshotThread != null) {
timedCreateMTreeSnapshotThread.shutdownNow();
......@@ -473,7 +469,6 @@ public class MManager {
logger.warn("Current series number {} is too large...", totalSeriesNumber);
allowToCreateNewSeries = false;
}
updateSchemaDataTypeNumMap(type, 1);
// write log
if (!isRecovering) {
......@@ -565,9 +560,6 @@ public class MManager {
logger.warn("Current series number {} is too large...", totalSeriesNumber);
allowToCreateNewSeries = false;
}
for (TSDataType type : dataTypes) {
updateSchemaDataTypeNumMap(type, 1);
}
// write log
if (!isRecovering) {
logWriter.createAlignedTimeseries(plan);
......@@ -695,19 +687,12 @@ public class MManager {
int timeseriesNum = 0;
if (schema instanceof MeasurementSchema) {
removeFromTagInvertedIndex(pair.right);
updateSchemaDataTypeNumMap(schema.getType(), -1);
timeseriesNum = 1;
} else if (schema instanceof VectorMeasurementSchema) {
for (TSDataType dataType : schema.getValueTSDataTypeList()) {
updateSchemaDataTypeNumMap(dataType, -1);
timeseriesNum++;
}
timeseriesNum += schema.getValueTSDataTypeList().size();
}
PartialPath storageGroupPath = pair.left;
// update statistics in schemaDataTypeNumMap
updateSchemaDataTypeNumMap(pair.right.getSchema().getType(), -1);
// drop trigger with no exceptions
TriggerEngine.drop(pair.right);
......@@ -762,8 +747,6 @@ public class MManager {
List<MeasurementMNode> leafMNodes = mtree.deleteStorageGroup(storageGroup);
for (MeasurementMNode leafMNode : leafMNodes) {
removeFromTagInvertedIndex(leafMNode);
// update statistics in schemaDataTypeNumMap
updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
}
// drop triggers with no exceptions
......@@ -783,22 +766,6 @@ public class MManager {
}
}
/**
* update statistics in schemaDataTypeNumMap
*
* @param type data type
* @param num 1 for creating timeseries and -1 for deleting timeseries
*/
private synchronized void updateSchemaDataTypeNumMap(TSDataType type, int num) {
// add an array of the series type
schemaDataTypeNumMap.put(type, schemaDataTypeNumMap.getOrDefault(type, 0) + num);
// add an array of time
schemaDataTypeNumMap.put(
TSDataType.INT64, schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num);
PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, totalSeriesNumber.get());
}
/**
* Check if the given path is storage group or not.
*
......
......@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.rescon;
import org.apache.iotdb.db.conf.IoTDBConfig;
......@@ -30,79 +29,168 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
/** Manage all primitive data list in memory, including get and release operation. */
/** Manage all primitive data lists in memory, including get and release operations. */
public class PrimitiveArrayManager {
/** data type -> ArrayDeque of primitive arrays. */
private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap =
new EnumMap<>(TSDataType.class);
private static final Logger LOGGER = LoggerFactory.getLogger(PrimitiveArrayManager.class);
/** data type -> ratio of data type in schema, which could be seen as recommended ratio */
private static final Map<TSDataType, Double> bufferedArraysNumRatio =
new EnumMap<>(TSDataType.class);
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private static final Logger logger = LoggerFactory.getLogger(PrimitiveArrayManager.class);
public static final int ARRAY_SIZE = CONFIG.getPrimitiveArraySize();
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
/** threshold total size of arrays for all data types */
private static final double POOLED_ARRAYS_MEMORY_THRESHOLD =
CONFIG.getAllocateMemoryForWrite() * CONFIG.getBufferedArraysMemoryProportion();
public static final int ARRAY_SIZE = config.getPrimitiveArraySize();
/** TSDataType#serialize() -> ArrayDeque<Array>, VECTOR is ignored */
private static final ArrayDeque[] POOLED_ARRAYS = new ArrayDeque[TSDataType.values().length - 1];
/** threshold total size of arrays for all data types */
private static final double BUFFERED_ARRAY_SIZE_THRESHOLD =
config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion();
/** TSDataType#serialize() -> max size of ArrayDeque<Array>, VECTOR is ignored */
private static final int[] LIMITS = new int[TSDataType.values().length - 1];
/** total size of buffered arrays */
private static final AtomicLong bufferedArraysRamSize = new AtomicLong();
/** LIMITS should be updated if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) */
private static long limitUpdateThreshold;
/** total size of out of buffer arrays */
private static final AtomicLong outOfBufferArraysRamSize = new AtomicLong();
/** TSDataType#serialize() -> count of allocation requests, VECTOR is ignored */
private static final AtomicLong[] ALLOCATION_REQUEST_COUNTS =
new AtomicLong[] {
new AtomicLong(0),
new AtomicLong(0),
new AtomicLong(0),
new AtomicLong(0),
new AtomicLong(0),
new AtomicLong(0)
};
private static final AtomicLong TOTAL_ALLOCATION_REQUEST_COUNT = new AtomicLong(0);
static {
bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.INT32, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.INT64, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.VECTOR, new ArrayDeque<>());
init();
}
private PrimitiveArrayManager() {
logger.info("BufferedArraySizeThreshold is {}", BUFFERED_ARRAY_SIZE_THRESHOLD);
private static void init() {
LOGGER.info("BufferedArraySizeThreshold is {}", POOLED_ARRAYS_MEMORY_THRESHOLD);
// POOLED_ARRAYS_MEMORY_THRESHOLD = ∑(datatype[i].getDataTypeSize() * ARRAY_SIZE * LIMITS[i])
// we init all LIMITS[i] with the same value, so we have
// => LIMITS[i] = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / ∑(datatype[i].getDataTypeSize())
int totalDataTypeSize = 0;
for (TSDataType dataType : TSDataType.values()) {
// VECTOR is ignored
if (dataType.equals(TSDataType.VECTOR)) {
continue;
}
totalDataTypeSize += dataType.getDataTypeSize();
}
@SuppressWarnings("squid:S3518") // totalDataTypeSize can not be zero
double limit = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / totalDataTypeSize;
Arrays.fill(LIMITS, (int) limit);
// limitUpdateThreshold = ∑(LIMITS[i])
limitUpdateThreshold = (long) ((TSDataType.values().length - 1) * limit);
for (int i = 0; i < POOLED_ARRAYS.length; ++i) {
POOLED_ARRAYS[i] = new ArrayDeque<>((int) limit);
}
for (AtomicLong allocationRequestCount : ALLOCATION_REQUEST_COUNTS) {
allocationRequestCount.set(0);
}
TOTAL_ALLOCATION_REQUEST_COUNT.set(0);
}
private PrimitiveArrayManager() {}
/**
* Get primitive data lists according to type
* Get or allocate primitive data lists according to type.
*
* @param dataType data type
* @return an array
*/
public static Object getPrimitiveArraysByType(TSDataType dataType) {
long delta = (long) ARRAY_SIZE * dataType.getDataTypeSize();
// check memory of buffered array, if already full, generate OOB
if (bufferedArraysRamSize.get() + delta > BUFFERED_ARRAY_SIZE_THRESHOLD) {
// return an out of buffer array
outOfBufferArraysRamSize.addAndGet(delta);
return createPrimitiveArray(dataType);
public static Object allocate(TSDataType dataType) {
if (dataType.equals(TSDataType.VECTOR)) {
throw new UnSupportedDataTypeException(TSDataType.VECTOR.name());
}
if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) {
synchronized (TOTAL_ALLOCATION_REQUEST_COUNT) {
if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) {
updateLimits();
}
}
}
int order = dataType.serialize();
ALLOCATION_REQUEST_COUNTS[order].incrementAndGet();
TOTAL_ALLOCATION_REQUEST_COUNT.incrementAndGet();
Object array;
synchronized (POOLED_ARRAYS[order]) {
array = POOLED_ARRAYS[order].poll();
}
if (array == null) {
array = createPrimitiveArray(dataType);
}
return array;
}
private static void updateLimits() {
// we want to update LIMITS[i] according to ratios[i]
double[] ratios = new double[ALLOCATION_REQUEST_COUNTS.length];
for (int i = 0; i < ALLOCATION_REQUEST_COUNTS.length; ++i) {
ratios[i] =
ALLOCATION_REQUEST_COUNTS[i].get() / (double) TOTAL_ALLOCATION_REQUEST_COUNT.get();
}
synchronized (bufferedArraysMap.get(dataType)) {
// try to get a buffered array
Object dataArray = bufferedArraysMap.get(dataType).poll();
if (dataArray != null) {
return dataArray;
// initially we have:
// POOLED_ARRAYS_MEMORY_THRESHOLD = ∑(datatype[i].getDataTypeSize() * LIMITS[i]) * ARRAY_SIZE
// we can find a number called limitBase which satisfies:
// LIMITS[i] = limitBase * ratios[i]
// => POOLED_ARRAYS_MEMORY_THRESHOLD =
// limitBase * ∑(datatype[i].getDataTypeSize() * ratios[i]) * ARRAY_SIZE
// => limitBase = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE
// / ∑(datatype[i].getDataTypeSize() * ratios[i])
double weightedSumOfRatios = 0;
for (TSDataType dataType : TSDataType.values()) {
// VECTOR is ignored
if (dataType.equals(TSDataType.VECTOR)) {
continue;
}
weightedSumOfRatios += dataType.getDataTypeSize() * ratios[dataType.serialize()];
}
@SuppressWarnings("squid:S3518") // weightedSumOfRatios can not be zero
double limitBase = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / weightedSumOfRatios;
// LIMITS[i] = limitBase * ratios[i]
for (int i = 0; i < LIMITS.length; ++i) {
int oldLimit = LIMITS[i];
int newLimit = (int) (limitBase * ratios[i]);
LIMITS[i] = newLimit;
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"limit of {} array deque size updated: {} -> {}",
TSDataType.deserialize((byte) i).name(),
oldLimit,
newLimit);
}
}
// no buffered array, create one
bufferedArraysRamSize.addAndGet(delta);
return createPrimitiveArray(dataType);
// limitUpdateThreshold = ∑(LIMITS[i])
limitUpdateThreshold = 0;
for (int limit : LIMITS) {
limitUpdateThreshold += limit;
}
for (AtomicLong allocationRequestCount : ALLOCATION_REQUEST_COUNTS) {
allocationRequestCount.set(0);
}
TOTAL_ALLOCATION_REQUEST_COUNT.set(0);
}
private static Object createPrimitiveArray(TSDataType dataType) {
......@@ -126,16 +214,49 @@ public class PrimitiveArrayManager {
case TEXT:
dataArray = new Binary[ARRAY_SIZE];
break;
case VECTOR:
dataArray = new byte[ARRAY_SIZE][];
break;
default:
throw new UnSupportedDataTypeException(dataType.toString());
throw new UnSupportedDataTypeException(dataType.name());
}
return dataArray;
}
/**
* This method is called when bringing back data array
*
* @param array data array to be released
*/
public static void release(Object array) {
int order;
if (array instanceof boolean[]) {
order = TSDataType.BOOLEAN.serialize();
} else if (array instanceof int[]) {
order = TSDataType.INT32.serialize();
} else if (array instanceof long[]) {
order = TSDataType.INT64.serialize();
} else if (array instanceof float[]) {
order = TSDataType.FLOAT.serialize();
} else if (array instanceof double[]) {
order = TSDataType.DOUBLE.serialize();
} else if (array instanceof Binary[]) {
Arrays.fill((Binary[]) array, null);
order = TSDataType.TEXT.serialize();
} else {
throw new UnSupportedDataTypeException(array.getClass().toString());
}
synchronized (POOLED_ARRAYS[order]) {
ArrayDeque<Object> arrays = POOLED_ARRAYS[order];
if (arrays.size() < LIMITS[order]) {
arrays.add(array);
}
}
}
public static void close() {
init();
}
/**
* Get primitive data lists according to data type and size, only for TVList's sorting
*
......@@ -183,140 +304,7 @@ public class PrimitiveArrayManager {
}
return binaries;
default:
return null;
throw new UnSupportedDataTypeException(dataType.name());
}
}
/**
* This method is called when bringing back data array
*
* @param releasingArray data array to be released
*/
public static void release(Object releasingArray) {
TSDataType releasingType;
if (releasingArray instanceof boolean[]) {
releasingType = TSDataType.BOOLEAN;
} else if (releasingArray instanceof int[]) {
releasingType = TSDataType.INT32;
} else if (releasingArray instanceof long[]) {
releasingType = TSDataType.INT64;
} else if (releasingArray instanceof float[]) {
releasingType = TSDataType.FLOAT;
} else if (releasingArray instanceof double[]) {
releasingType = TSDataType.DOUBLE;
} else if (releasingArray instanceof Binary[]) {
Arrays.fill((Binary[]) releasingArray, null);
releasingType = TSDataType.TEXT;
} else {
throw new UnSupportedDataTypeException("Unknown data array type");
}
if (outOfBufferArraysRamSize.get() <= 0) {
// if there is no out of buffer array, bring back as buffered array directly
putBackBufferedArray(releasingType, releasingArray);
} else {
// if the system has out of buffer array, we need to release some memory
if (!isCurrentDataTypeExceeded(releasingType)) {
// if the buffered array of the releasingType is less than expected
// choose an array of redundantDataType to release and try to buffer the array of
// releasingType
for (Entry<TSDataType, ArrayDeque<Object>> entry : bufferedArraysMap.entrySet()) {
TSDataType dataType = entry.getKey();
if (isCurrentDataTypeExceeded(dataType)) {
// if we find a replaced array, bring back the original array as a buffered array
if (logger.isDebugEnabled()) {
logger.debug(
"The ratio of {} in buffered array has not reached the schema ratio. discard a redundant array of {}",
releasingType,
dataType);
}
// bring back the replaced array as OOB array
replaceBufferedArray(releasingType, releasingArray, dataType);
break;
}
}
}
releaseOutOfBuffer(releasingType);
}
}
/**
* Bring back a buffered array
*
* @param dataType data type
* @param dataArray data array
*/
private static void putBackBufferedArray(TSDataType dataType, Object dataArray) {
synchronized (bufferedArraysMap.get(dataType)) {
bufferedArraysMap.get(dataType).add(dataArray);
}
}
private static void replaceBufferedArray(
TSDataType releasingType, Object releasingArray, TSDataType redundantType) {
synchronized (bufferedArraysMap.get(redundantType)) {
if (bufferedArraysMap.get(redundantType).poll() != null) {
bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * redundantType.getDataTypeSize());
}
}
if (bufferedArraysRamSize.get() + (long) ARRAY_SIZE * releasingType.getDataTypeSize()
< BUFFERED_ARRAY_SIZE_THRESHOLD) {
ArrayDeque<Object> releasingArrays = bufferedArraysMap.get(releasingType);
synchronized (releasingArrays) {
releasingArrays.add(releasingArray);
}
bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * releasingType.getDataTypeSize());
}
}
private static void releaseOutOfBuffer(TSDataType dataType) {
outOfBufferArraysRamSize.getAndUpdate(
l -> Math.max(0, l - (long) ARRAY_SIZE * dataType.getDataTypeSize()));
}
/**
* @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a
* specific type)
* @param totalSeries total time series number
*/
public static void updateSchemaDataTypeNum(
Map<TSDataType, Integer> schemaDataTypeNumMap, long totalSeries) {
for (Map.Entry<TSDataType, Integer> entry : schemaDataTypeNumMap.entrySet()) {
TSDataType dataType = entry.getKey();
// one time series has 2 columns (time column + value column)
bufferedArraysNumRatio.put(
dataType, (double) schemaDataTypeNumMap.get(dataType) / (totalSeries * 2));
}
}
/**
* check whether the ratio of buffered array of specific data type reaches the ratio in schema (as
* recommended ratio)
*
* @param dataType data type
* @return true if the buffered array ratio exceeds the recommend ratio
*/
private static boolean isCurrentDataTypeExceeded(TSDataType dataType) {
long total = 0;
for (ArrayDeque<Object> value : bufferedArraysMap.values()) {
total += value.size();
}
long arrayNumInBuffer =
bufferedArraysMap.containsKey(dataType) ? bufferedArraysMap.get(dataType).size() : 0;
return total != 0
&& ((double) arrayNumInBuffer / total > bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
}
public static void close() {
for (ArrayDeque<Object> dataListQueue : bufferedArraysMap.values()) {
dataListQueue.clear();
}
bufferedArraysNumRatio.clear();
bufferedArraysRamSize.set(0);
outOfBufferArraysRamSize.set(0);
}
}
......@@ -346,7 +346,7 @@ public abstract class TVList {
}
protected Object getPrimitiveArraysByType(TSDataType dataType) {
return PrimitiveArrayManager.getPrimitiveArraysByType(dataType);
return PrimitiveArrayManager.allocate(dataType);
}
protected long[] cloneTime(long[] array) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册