提交 7cacd403 编写于 作者: L lichuang

add cache last null columns feature

上级 5b19e0d2
......@@ -129,7 +129,7 @@ taosd -C
- blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改)
- replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改)
- precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。
- cacheLast:是否在内存中缓存子表 last_row,0:关闭;1:开启。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数)
- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,设置为3表示同时开启了1和2。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数)
对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL:
......
......@@ -417,6 +417,16 @@ void setVardataNull(char* val, int32_t type) {
}
}
bool isVardataNull(char* val, int32_t type) {
if (type == TSDB_DATA_TYPE_BINARY) {
return *(uint8_t*) varDataVal(val) == TSDB_DATA_BINARY_NULL;
} else if (type == TSDB_DATA_TYPE_NCHAR) {
return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL;
} else {
assert(0);
}
}
void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); }
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
......@@ -492,6 +502,54 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
}
}
bool isNullN(char *val, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return *(uint8_t *)(val) == TSDB_DATA_BOOL_NULL;
break;
case TSDB_DATA_TYPE_TINYINT:
return *(uint8_t *)(val) == TSDB_DATA_TINYINT_NULL;
break;
case TSDB_DATA_TYPE_SMALLINT:
return *(uint16_t *)(val) == TSDB_DATA_SMALLINT_NULL;
break;
case TSDB_DATA_TYPE_INT:
return *(uint32_t *)(val) == TSDB_DATA_INT_NULL;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
return *(uint64_t *)(val) == TSDB_DATA_BIGINT_NULL;
break;
case TSDB_DATA_TYPE_UTINYINT:
return *(uint8_t *)(val) == TSDB_DATA_UTINYINT_NULL;
break;
case TSDB_DATA_TYPE_USMALLINT:
return *(uint16_t *)(val) == TSDB_DATA_USMALLINT_NULL;
break;
case TSDB_DATA_TYPE_UINT:
return *(uint32_t *)(val) == TSDB_DATA_UINT_NULL;
break;
case TSDB_DATA_TYPE_UBIGINT:
return *(uint64_t *)(val) == TSDB_DATA_UBIGINT_NULL;
break;
case TSDB_DATA_TYPE_FLOAT:
return *(uint32_t *)(val) == TSDB_DATA_FLOAT_NULL;
break;
case TSDB_DATA_TYPE_DOUBLE:
return *(uint64_t *)(val) == TSDB_DATA_DOUBLE_NULL;
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
return isVardataNull(val, type);
break;
default: {
return *(uint32_t *)(val) == TSDB_DATA_INT_NULL;
break;
}
}
}
static uint8_t nullBool = TSDB_DATA_BOOL_NULL;
static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL;
static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL;
......
......@@ -299,7 +299,7 @@ do { \
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 1
#define TSDB_MAX_DB_CACHE_LAST_ROW 2
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_MIN_FSYNC_PERIOD 0
......
......@@ -69,9 +69,13 @@ typedef struct {
int8_t precision;
int8_t compression;
int8_t update;
int8_t cacheLastRow;
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column
} STsdbCfg;
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)
// --------- TSDB REPOSITORY USAGE STATISTICS
typedef struct {
int64_t totalStorage; // total bytes occupie
......
......@@ -176,6 +176,9 @@ void setNull(char *val, int32_t type, int32_t bytes);
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
void *getNullValue(int32_t type);
bool isVardataNull(char* val, int32_t type);
bool isNullN(char *val, int32_t type);
void assignVal(char *val, const char *src, int32_t len, int32_t type);
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf);
......
......@@ -36,6 +36,9 @@ typedef struct STable {
char* sql;
void* cqhandle;
SRWLatch latch; // TODO: implementa latch functions
SDataCol *lastCols;
int32_t lastColNum;
T_REF_DECLARE()
} STable;
......
......@@ -447,8 +447,10 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
if (pCfg->update != 0) pCfg->update = 1;
// update cacheLastRow
if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1;
if (pCfg->cacheLastRow != 0) {
if (pCfg->cacheLastRow > 3)
pCfg->cacheLastRow = 1;
}
return 0;
}
......@@ -581,7 +583,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
if (pIdx && lastKey < pIdx->maxKey) {
pTable->lastKey = pIdx->maxKey;
if (pCfg->cacheLastRow) {
if (CACHE_LAST_ROW(pCfg)) {
if (tsdbLoadBlockInfo(&readh, NULL) < 0) {
tsdbDestroyReadH(&readh);
return -1;
......
......@@ -955,11 +955,61 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
}
}
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
//tsdbDebug("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row));
if (pTable->numOfSchemas <= 0) {
return;
}
STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1];
int i = pTable->numOfSchemas - 1;
while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) {
i -= 1;
pSchema = pTable->schema[i];
}
if (pSchema == NULL || pSchema->version != dataRowVersion(row)) {
return;
}
SDataCol *pLatestCols = pTable->lastCols;
for (int j = 0; j < schemaNCols(pSchema); j++) {
if (j >= pTable->lastColNum) {
pTable->lastCols = realloc(pTable->lastCols, pTable->lastColNum + 10);
for (int i = 0; i < 10; ++i) {
pTable->lastCols[i + pTable->lastColNum].bytes = 0;
pTable->lastCols[i + pTable->lastColNum].pData = NULL;
}
pTable->lastColNum += 10;
}
STColumn *pTCol = schemaColAt(pSchema, j);
SDataCol *pDataCol = &(pLatestCols[j]);
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (isNullN(value, pTCol->type)) {
continue;
}
if (pDataCol->pData == NULL) {
pDataCol->pData = malloc(pSchema->columns[j].bytes);
pDataCol->bytes = pSchema->columns[j].bytes;
} else if (pDataCol->bytes < pSchema->columns[j].bytes) {
pDataCol->pData = realloc(pDataCol->pData, pSchema->columns[j].bytes);
pDataCol->bytes = pSchema->columns[j].bytes;
}
//tsdbDebug("vgId:%d cache column %d for %d,%p", REPO_ID(pRepo), j, pDataCol->bytes, pDataCol->pData);
memcpy(pDataCol->pData, value, pDataCol->bytes);
}
}
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
STsdbCfg *pCfg = &pRepo->config;
tsdbDebug("vgId:%d tsdbUpdateTableLatestInfo, %ld, %ld, %d", REPO_ID(pRepo), tsdbGetTableLastKeyImpl(pTable), dataRowKey(row), pCfg->cacheLastRow);
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
if (pCfg->cacheLastRow || pTable->lastRow != NULL) {
if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) {
SDataRow nrow = pTable->lastRow;
if (taosTSizeof(nrow) < dataRowLen(row)) {
SDataRow orow = nrow;
......@@ -984,7 +1034,10 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow
} else {
pTable->lastKey = dataRowKey(row);
}
}
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
updateTableLatestColumn(pRepo, pTable, row);
}
}
return 0;
}
......@@ -14,6 +14,7 @@
*/
#include "tsdbint.h"
#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20
#define TSDB_SUPER_TABLE_SL_LEVEL 5
#define DEFAULT_TAG_INDEX_COLUMN 0
......@@ -671,6 +672,12 @@ static STable *tsdbNewTable() {
}
pTable->lastKey = TSKEY_INITIAL_VAL;
pTable->lastCols = (SDataCol*)malloc(TSDB_LATEST_COLUMN_ARRAY_SIZE * sizeof(SDataCol));
pTable->lastColNum = TSDB_LATEST_COLUMN_ARRAY_SIZE;
for (int i = 0; i < pTable->lastColNum; ++i) {
pTable->lastCols[i].bytes = 0;
pTable->lastCols[i].pData = NULL;
}
return pTable;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册