提交 f6063182 编写于 作者: H Hongze Cheng

TD-987

上级 b0f7c114
......@@ -53,14 +53,12 @@ typedef struct {
int32_t tsdbId;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t maxTables; // maximum number of tables this repository can have
int32_t daysPerFile; // day per file sharding policy
int32_t keep; // day of data to keep
int32_t keep1;
int32_t keep2;
int32_t minRowsPerFileBlock; // minimum rows per file block
int32_t maxRowsPerFileBlock; // maximum rows per file block
int32_t commitTime;
int8_t precision;
int8_t compression;
} STsdbCfg;
......
......@@ -37,14 +37,12 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnode->cfgVersion);
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnode->tsdbCfg.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnode->tsdbCfg.totalBlocks);
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnode->tsdbCfg.maxTables);
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnode->tsdbCfg.daysPerFile);
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnode->tsdbCfg.keep);
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnode->tsdbCfg.keep1);
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnode->tsdbCfg.keep2);
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.minRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.maxRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnode->tsdbCfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel);
......@@ -136,12 +134,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
}
pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
if (!maxTables || maxTables->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.maxTables = maxTables->valueint;
// cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
// if (!maxTables || maxTables->type != cJSON_Number) {
// printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId);
// goto PARSE_OVER;
// }
// pVnode->tsdbCfg.maxTables = maxTables->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
......@@ -185,12 +183,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
}
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
if (!commitTime || commitTime->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
// cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
// if (!commitTime || commitTime->type != cJSON_Number) {
// printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId);
// goto PARSE_OVER;
// }
// pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!precision || precision->type != cJSON_Number) {
......
......@@ -70,6 +70,7 @@ typedef struct {
pthread_rwlock_t rwLock;
int32_t nTables;
int32_t maxTables;
STable** tables;
SList* superList;
SHashObj* uidMap;
......@@ -111,9 +112,11 @@ typedef struct {
typedef struct {
T_REF_DECLARE();
SRWLatch latch;
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfRows;
int32_t maxTables;
STableData** tData;
SList* actList;
SList* bufBlockList;
......@@ -304,6 +307,7 @@ typedef struct {
// Operations
// ------------------ tsdbMeta.c
#define TSDB_INIT_NTABLES 1024
#define TABLE_TYPE(t) (t)->type
#define TABLE_NAME(t) (t)->name
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
......@@ -395,6 +399,7 @@ int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
......@@ -429,7 +434,7 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
void tsdbFreeFileH(STsdbFileH* pFileH);
int tsdbOpenFileH(STsdbRepo* pRepo);
void tsdbCloseFileH(STsdbRepo* pRepo);
SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid, int maxTables);
SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid);
void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
......@@ -511,6 +516,7 @@ void tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type, char* fname
int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
char* tsdbGetDataDirName(char* rootDir);
int tsdbGetNextMaxTables(int tid);
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
......
......@@ -149,7 +149,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
}
}
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int maxTables) {
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
STsdbFileH *pFileH = pRepo->tsdbFileH;
if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL;
......
......@@ -62,7 +62,6 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
static int tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables);
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
static int keyFGroupCompFunc(const void *key, const void *fgroup);
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
......@@ -85,10 +84,10 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
if (tsdbSetRepoEnv(rootDir, pCfg) < 0) return -1;
tsdbDebug(
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d maxTables %d daysPerFile %d keep "
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep "
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d",
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep,
pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock,
pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
return 0;
}
......@@ -307,13 +306,6 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
configChanged = true;
}
if (pRCfg->maxTables != pCfg->maxTables) {
if (tsdbAlterMaxTables(pRepo, pCfg->maxTables) < 0) {
tsdbError("vgId:%d failed to configure repo when alter maxTables since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
configChanged = true;
}
if (configChanged) {
if (tsdbSaveConfig(pRepo->rootDir, &pRepo->config) < 0) {
......@@ -385,6 +377,18 @@ char *tsdbGetDataDirName(char *rootDir) {
return fname;
}
int tsdbGetNextMaxTables(int tid) {
ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
int maxTables = TSDB_INIT_NTABLES;
while (true) {
maxTables = MIN(maxTables, TSDB_MAX_TABLES);
if (tid <= maxTables + 1) break;
maxTables *= 2;
}
return maxTables + 1;
}
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
......@@ -417,17 +421,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
goto _err;
}
// Check maxTables
if (pCfg->maxTables == -1) {
pCfg->maxTables = TSDB_DEFAULT_TABLES+1;
} else {
if (pCfg->maxTables - 1 < TSDB_MIN_TABLES || pCfg->maxTables - 1 > TSDB_MAX_TABLES) {
tsdbError("vgId:%d invalid maxTables configuration! maxTables %d TSDB_MIN_TABLES %d TSDB_MAX_TABLES %d",
pCfg->tsdbId, pCfg->maxTables - 1, TSDB_MIN_TABLES, TSDB_MAX_TABLES);
goto _err;
}
}
// Check daysPerFile
if (pCfg->daysPerFile == -1) {
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
......@@ -713,6 +706,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
STsdbMeta *pMeta = pRepo->tsdbMeta;
int64_t points = 0;
ASSERT(pBlock->tid < pMeta->maxTables);
STable *pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
......@@ -779,7 +773,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
}
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
// TODO
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pFGroup = NULL;
......@@ -792,7 +785,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC);
while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) {
if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err;
for (int i = 1; i < pRepo->config.maxTables; i++) {
for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
tsdbSetHelperTable(&rhelper, pTable, pRepo);
......@@ -868,36 +861,6 @@ static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
return 0;
}
static int tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
// TODO
int oldMaxTables = pRepo->config.maxTables;
if (oldMaxTables < pRepo->config.maxTables) {
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
STsdbMeta *pMeta = pRepo->tsdbMeta;
pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *));
memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables - oldMaxTables));
pRepo->config.maxTables = maxTables;
if (pRepo->mem) {
pRepo->mem->tData = realloc(pRepo->mem->tData, maxTables * sizeof(STableData *));
memset(POINTER_SHIFT(pRepo->mem->tData, sizeof(STableData *) * oldMaxTables), 0,
sizeof(STableData *) * (maxTables - oldMaxTables));
}
if (pRepo->imem) {
pRepo->imem->tData = realloc(pRepo->imem->tData, maxTables * sizeof(STableData *));
memset(POINTER_SHIFT(pRepo->imem->tData, sizeof(STableData *) * oldMaxTables), 0,
sizeof(STableData *) * (maxTables - oldMaxTables));
}
tsdbDebug("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables);
return 0;
}
static int keyFGroupCompFunc(const void *key, const void *fgroup) {
int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup;
......@@ -914,7 +877,6 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
tlen += taosEncodeVariantI32(buf, pCfg->tsdbId);
tlen += taosEncodeFixedI32(buf, pCfg->cacheBlockSize);
tlen += taosEncodeVariantI32(buf, pCfg->totalBlocks);
tlen += taosEncodeVariantI32(buf, pCfg->maxTables);
tlen += taosEncodeVariantI32(buf, pCfg->daysPerFile);
tlen += taosEncodeVariantI32(buf, pCfg->keep);
tlen += taosEncodeVariantI32(buf, pCfg->keep1);
......@@ -931,7 +893,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf = taosDecodeVariantI32(buf, &(pCfg->tsdbId));
buf = taosDecodeFixedI32(buf, &(pCfg->cacheBlockSize));
buf = taosDecodeVariantI32(buf, &(pCfg->totalBlocks));
buf = taosDecodeVariantI32(buf, &(pCfg->maxTables));
buf = taosDecodeVariantI32(buf, &(pCfg->daysPerFile));
buf = taosDecodeVariantI32(buf, &(pCfg->keep));
buf = taosDecodeVariantI32(buf, &(pCfg->keep1));
......@@ -1037,7 +998,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
if (pBlock->tid <= 0 || pBlock->tid >= pRepo->config.maxTables) {
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
......@@ -1120,7 +1081,7 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
static void tsdbStartStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
......@@ -1133,7 +1094,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
static void tsdbStopStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
(*pRepo->appH.cqDropFunc)(pTable->cqhandle);
......
......@@ -21,7 +21,7 @@
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
static SMemTable * tsdbNewMemTable(STsdbCfg *pCfg);
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
......@@ -30,13 +30,15 @@ static void * tsdbCommitData(void *arg);
static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
// ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
STsdbCfg * pCfg = &pRepo->config;
STsdbMeta * pMeta = pRepo->tsdbMeta;
int32_t level = 0;
int32_t headSize = 0;
TSKEY key = dataRowKey(row);
......@@ -45,7 +47,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
SSkipList * pSList = NULL;
int bytes = 0;
if (pMemTable != NULL && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
pTableData = pMemTable->tData[TABLE_TID(pTable)];
pSList = pTableData->pData;
......@@ -66,13 +68,20 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
// Operations above may change pRepo->mem, retake those values
ASSERT(pRepo->mem != NULL);
pMemTable = pRepo->mem;
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) return -1;;
}
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem)
taosWLockLatch(&(pMemTable->latch));
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
taosWUnLockLatch(&(pMemTable->latch));
}
pTableData = tsdbNewTableData(pCfg, pTable);
if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64
......@@ -122,7 +131,6 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
int ref = T_REF_DEC(pMemTable);
tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
if (ref == 0) {
STsdbCfg * pCfg = &pRepo->config;
STsdbBufPool *pBufPool = pRepo->pPool;
SListNode *pNode = NULL;
......@@ -139,7 +147,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
}
if (tsdbUnlockRepo(pRepo) < 0) return -1;
for (int i = 0; i < pCfg->maxTables; i++) {
for (int i = 0; i < pMemTable->maxTables; i++) {
if (pMemTable->tData[i] != NULL) {
tsdbFreeTableData(pMemTable->tData[i]);
}
......@@ -161,11 +169,24 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
tsdbRefMemTable(pRepo, *pIMem);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch));
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
return 0;
}
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
if (pMem != NULL) {
taosRUnLockLatch(&(pMem->latch));
tsdbUnRefMemTable(pRepo, pMem);
}
if (pIMem != NULL) {
tsdbUnRefMemTable(pRepo, pIMem);
}
}
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
STsdbCfg * pCfg = &pRepo->config;
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
......@@ -182,7 +203,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
}
if (pRepo->mem == NULL) {
SMemTable *pMemTable = tsdbNewMemTable(&pRepo->config);
SMemTable *pMemTable = tsdbNewMemTable(pRepo);
if (pMemTable == NULL) return NULL;
if (tsdbLockRepo(pRepo) < 0) {
......@@ -329,7 +350,9 @@ static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
}
static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) {
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable));
if (pMemTable == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......@@ -340,7 +363,8 @@ static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) {
pMemTable->keyLast = 0;
pMemTable->numOfRows = 0;
pMemTable->tData = (STableData**)calloc(pCfg->maxTables, sizeof(STableData*));
pMemTable->maxTables = pMeta->maxTables;
pMemTable->tData = (STableData **)calloc(pMemTable->maxTables, sizeof(STableData *));
if (pMemTable->tData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
......@@ -398,9 +422,6 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
goto _err;
}
// TODO: operation here should not be here, remove it
pTableData->pData->level = 1;
return pTableData;
_err:
......@@ -473,7 +494,7 @@ static void *tsdbCommitData(void *arg) {
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pCfg->maxTables);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
tsdbEndCommit(pRepo);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
......@@ -552,12 +573,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
STsdbCfg * pCfg = &pRepo->config;
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = NULL;
SMemTable * pMem = pRepo->imem;
TSKEY minKey = 0, maxKey = 0;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
if (!hasDataToCommit) {
tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
return 0;
......@@ -570,7 +592,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
return -1;
}
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid, pCfg->maxTables)) == NULL) {
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
......@@ -582,7 +604,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
}
// Loop to commit data in each table
for (int tid = 1; tid < pCfg->maxTables; tid++) {
for (int tid = 1; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
......@@ -643,11 +665,10 @@ _err:
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
STsdbCfg * pCfg = &(pRepo->config);
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SCommitIter *iters = (SCommitIter *)calloc(pCfg->maxTables, sizeof(SCommitIter));
SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
if (iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
......@@ -656,7 +677,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
// reference all tables
for (int i = 0; i < pCfg->maxTables; i++) {
for (int i = 0; i < pMem->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
iters[i].pTable = pMeta->tables[i];
......@@ -665,7 +686,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
for (int i = 0; i < pCfg->maxTables; i++) {
for (int i = 0; i < pMem->maxTables; i++) {
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......@@ -679,7 +700,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
return iters;
_err:
tsdbDestroyCommitIters(iters, pCfg->maxTables);
tsdbDestroyCommitIters(iters, pMem->maxTables);
return NULL;
}
......@@ -694,4 +715,25 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
}
free(iters);
}
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
ASSERT(pMemTable->maxTables < maxTables);
STableData **pTableData = (STableData **)calloc(maxTables, sizeof(STableData *));
if (pTableData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
STableData **tData = pMemTable->tData;
taosWLockLatch(&(pMemTable->latch));
pMemTable->maxTables = maxTables;
pMemTable->tData = pTableData;
taosWUnLockLatch(&(pMemTable->latch));
tfree(tData);
return 0;
}
\ No newline at end of file
......@@ -49,6 +49,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable);
static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable);
static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
// ------------------ OUTER FUNCTIONS ------------------
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
......@@ -60,13 +61,13 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
int tid = pCfg->tableId.tid;
STable * pTable = NULL;
if (tid < 0 || tid >= pRepo->config.maxTables) {
if (tid < 1 || tid > TSDB_MAX_TABLES) {
tsdbError("vgId:%d failed to create table since invalid tid %d", REPO_ID(pRepo), tid);
terrno = TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO;
goto _err;
}
if (pMeta->tables[tid] != NULL) {
if (tid < pMeta->maxTables && pMeta->tables[tid] != NULL) {
if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) {
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TABLE_UID(pTable));
......@@ -422,7 +423,8 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
goto _err;
}
pMeta->tables = (STable **)calloc(pCfg->maxTables, sizeof(STable *));
pMeta->maxTables = TSDB_INIT_NTABLES + 1;
pMeta->tables = (STable **)calloc(pMeta->maxTables, sizeof(STable *));
if (pMeta->tables == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
......@@ -434,7 +436,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
goto _err;
}
pMeta->uidMap = taosHashInit(pCfg->maxTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pMeta->uidMap = taosHashInit(TSDB_INIT_NTABLES * 1.1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
if (pMeta->uidMap == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
......@@ -484,14 +486,13 @@ _err:
}
int tsdbCloseMeta(STsdbRepo *pRepo) {
STsdbCfg * pCfg = &pRepo->config;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SListNode *pNode = NULL;
STable * pTable = NULL;
if (pMeta == NULL) return 0;
tdCloseKVStore(pMeta->pStore);
for (int i = 1; i < pCfg->maxTables; i++) {
for (int i = 1; i < pMeta->maxTables; i++) {
tsdbFreeTable(pMeta->tables[i]);
}
......@@ -624,9 +625,8 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
static void tsdbOrgMeta(void *pHandle) {
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &pRepo->config;
for (int i = 1; i < pCfg->maxTables; i++) {
for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) {
tsdbAddTableIntoIndex(pMeta, pTable, true);
......@@ -781,6 +781,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
goto _err;
}
} else {
if (tsdbAdjustMetaTables(pRepo, TABLE_TID(pTable)) < 0) goto _err;
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
if (tsdbAddTableIntoIndex(pMeta, pTable, true) < 0) {
tsdbDebug("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo),
......@@ -827,7 +828,6 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
SListIter lIter = {0};
SListNode *pNode = NULL;
STable * tTable = NULL;
STsdbCfg * pCfg = &(pRepo->config);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
int maxCols = schemaNCols(pSchema);
......@@ -860,7 +860,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
if (maxCols == pMeta->maxCols || maxRowBytes == pMeta->maxRowBytes) {
maxCols = 0;
maxRowBytes = 0;
for (int i = 0; i < pCfg->maxTables; i++) {
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable != NULL) {
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
......@@ -1266,5 +1266,28 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
tsdbRemoveTableFromMeta(pRepo, pTable, true, true);
}
return 0;
}
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (pMeta->maxTables >= tid) return 0;
int maxTables = tsdbGetNextMaxTables(tid);
STable **tables = (STable **)calloc(maxTables, sizeof(STable *));
if (tables == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
memcpy((void *)tables, (void *)pMeta->tables, sizeof(STable *) * pMeta->maxTables);
pMeta->maxTables = maxTables;
STable **tTables = pMeta->tables;
pMeta->tables = tables;
tfree(tTables);
tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables);
return 0;
}
\ No newline at end of file
......@@ -317,17 +317,20 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
}
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
// TODO: add uid check
if (pHandle->mem && pCheckInfo->tableId.tid < pHandle->mem->maxTables &&
pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
(const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
}
if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
if (pHandle->imem && pCheckInfo->tableId.tid < pHandle->imem->maxTables &&
pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
(const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
}
// both iterators are NULL, no data in buffer right now
if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
return false;
......@@ -1529,7 +1532,6 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables);
while (pQueryHandle->activeIndex < numOfTables) {
if (hasMoreDataInCache(pQueryHandle)) {
......@@ -2418,8 +2420,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tfree(pQueryHandle->statis);
// todo check error
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->mem);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem);
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem);
tsdbDestroyHelper(&pQueryHandle->rhelper);
......
......@@ -123,7 +123,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize;
tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
// tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep;
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
......@@ -630,14 +630,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion);
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks);
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
// len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile);
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep);
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnodeCfg->cfg.daysToKeep1);
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
// len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
......@@ -729,12 +729,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
}
pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
if (!maxTables || maxTables->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.maxTables = maxTables->valueint;
// cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
// if (!maxTables || maxTables->type != cJSON_Number) {
// vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId);
// goto PARSE_OVER;
// }
// pVnode->tsdbCfg.maxTables = maxTables->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
......@@ -778,12 +778,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
}
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
if (!commitTime || commitTime->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
// cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
// if (!commitTime || commitTime->type != cJSON_Number) {
// vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId);
// goto PARSE_OVER;
// }
// pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!precision || precision->type != cJSON_Number) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册