diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index a2258dcfd8116d6dca59e2e156ac561dad7e3d8f..bab17322f0737cdd324dc1aef15de061fba9f536 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -53,14 +53,12 @@ typedef struct { int32_t tsdbId; int32_t cacheBlockSize; int32_t totalBlocks; - int32_t maxTables; // maximum number of tables this repository can have int32_t daysPerFile; // day per file sharding policy int32_t keep; // day of data to keep int32_t keep1; int32_t keep2; int32_t minRowsPerFileBlock; // minimum rows per file block int32_t maxRowsPerFileBlock; // maximum rows per file block - int32_t commitTime; int8_t precision; int8_t compression; } STsdbCfg; diff --git a/src/kit/taosmigrate/taosmigrateVnodeCfg.c b/src/kit/taosmigrate/taosmigrateVnodeCfg.c index e830ef78f554445d6455bff71e9457ea2645ef9e..e80e687f02d1aa1064d89a6d3d37f9dc215221e2 100644 --- a/src/kit/taosmigrate/taosmigrateVnodeCfg.c +++ b/src/kit/taosmigrate/taosmigrateVnodeCfg.c @@ -37,14 +37,12 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile) len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnode->cfgVersion); len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnode->tsdbCfg.cacheBlockSize); len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnode->tsdbCfg.totalBlocks); - len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnode->tsdbCfg.maxTables); len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnode->tsdbCfg.daysPerFile); len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnode->tsdbCfg.keep); len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnode->tsdbCfg.keep1); len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnode->tsdbCfg.keep2); len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.minRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.maxRowsPerFileBlock); - len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnode->tsdbCfg.commitTime); len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel); @@ -136,12 +134,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile) } pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint; - cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables"); - if (!maxTables || maxTables->type != cJSON_Number) { - printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.maxTables = maxTables->valueint; + // cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables"); + // if (!maxTables || maxTables->type != cJSON_Number) { + // printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId); + // goto PARSE_OVER; + // } + // pVnode->tsdbCfg.maxTables = maxTables->valueint; cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); if (!daysPerFile || daysPerFile->type != cJSON_Number) { @@ -185,12 +183,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile) } pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint; - cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime"); - if (!commitTime || commitTime->type != cJSON_Number) { - printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint; + // cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime"); + // if (!commitTime || commitTime->type != cJSON_Number) { + // printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId); + // goto PARSE_OVER; + // } + // pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint; cJSON *precision = cJSON_GetObjectItem(root, "precision"); if (!precision || precision->type != cJSON_Number) { diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 762d2253e21aca6c95318cdbcda16a31da9b45cf..3e92c017651fb19e59eb6b144b052f721a296c65 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -70,6 +70,7 @@ typedef struct { pthread_rwlock_t rwLock; int32_t nTables; + int32_t maxTables; STable** tables; SList* superList; SHashObj* uidMap; @@ -111,9 +112,11 @@ typedef struct { typedef struct { T_REF_DECLARE(); + SRWLatch latch; TSKEY keyFirst; TSKEY keyLast; int64_t numOfRows; + int32_t maxTables; STableData** tData; SList* actList; SList* bufBlockList; @@ -304,6 +307,7 @@ typedef struct { // Operations // ------------------ tsdbMeta.c +#define TSDB_INIT_NTABLES 1024 #define TABLE_TYPE(t) (t)->type #define TABLE_NAME(t) (t)->name #define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data @@ -395,6 +399,7 @@ int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); +void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, @@ -429,7 +434,7 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); void tsdbFreeFileH(STsdbFileH* pFileH); int tsdbOpenFileH(STsdbRepo* pRepo); void tsdbCloseFileH(STsdbRepo* pRepo); -SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid, int maxTables); +SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid); void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); @@ -511,6 +516,7 @@ void tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type, char* fname int tsdbLockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo); char* tsdbGetDataDirName(char* rootDir); +int tsdbGetNextMaxTables(int tid); STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 767fbc82529f401be1c25186e67af23c773cf5c7..f69887869862c913002c6d553454d3c43168e835 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -149,7 +149,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { } } -SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int maxTables) { +SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) { STsdbFileH *pFileH = pRepo->tsdbFileH; if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 772bcf48d6023f9d0070ba8c23ce8eab128ddd4e..11a84cd5528ec48e51c10c217e339b562370c497 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -62,7 +62,6 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression); static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); -static int tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables); static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); static int keyFGroupCompFunc(const void *key, const void *fgroup); static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); @@ -85,10 +84,10 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { if (tsdbSetRepoEnv(rootDir, pCfg) < 0) return -1; tsdbDebug( - "vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d maxTables %d daysPerFile %d keep " + "vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep " "%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d", - pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep, - pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression); + pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, + pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression); return 0; } @@ -307,13 +306,6 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks); configChanged = true; } - if (pRCfg->maxTables != pCfg->maxTables) { - if (tsdbAlterMaxTables(pRepo, pCfg->maxTables) < 0) { - tsdbError("vgId:%d failed to configure repo when alter maxTables since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } - configChanged = true; - } if (configChanged) { if (tsdbSaveConfig(pRepo->rootDir, &pRepo->config) < 0) { @@ -385,6 +377,18 @@ char *tsdbGetDataDirName(char *rootDir) { return fname; } +int tsdbGetNextMaxTables(int tid) { + ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES); + int maxTables = TSDB_INIT_NTABLES; + while (true) { + maxTables = MIN(maxTables, TSDB_MAX_TABLES); + if (tid <= maxTables + 1) break; + maxTables *= 2; + } + + return maxTables + 1; +} + STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; } STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; } STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; } @@ -417,17 +421,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { goto _err; } - // Check maxTables - if (pCfg->maxTables == -1) { - pCfg->maxTables = TSDB_DEFAULT_TABLES+1; - } else { - if (pCfg->maxTables - 1 < TSDB_MIN_TABLES || pCfg->maxTables - 1 > TSDB_MAX_TABLES) { - tsdbError("vgId:%d invalid maxTables configuration! maxTables %d TSDB_MIN_TABLES %d TSDB_MAX_TABLES %d", - pCfg->tsdbId, pCfg->maxTables - 1, TSDB_MIN_TABLES, TSDB_MAX_TABLES); - goto _err; - } - } - // Check daysPerFile if (pCfg->daysPerFile == -1) { pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; @@ -713,6 +706,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY STsdbMeta *pMeta = pRepo->tsdbMeta; int64_t points = 0; + ASSERT(pBlock->tid < pMeta->maxTables); STable *pTable = pMeta->tables[pBlock->tid]; ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); @@ -779,7 +773,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { } static int tsdbRestoreInfo(STsdbRepo *pRepo) { - // TODO STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; SFileGroup *pFGroup = NULL; @@ -792,7 +785,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC); while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) { if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err; - for (int i = 1; i < pRepo->config.maxTables; i++) { + for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; tsdbSetHelperTable(&rhelper, pTable, pRepo); @@ -868,36 +861,6 @@ static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { return 0; } -static int tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { - // TODO - int oldMaxTables = pRepo->config.maxTables; - if (oldMaxTables < pRepo->config.maxTables) { - terrno = TSDB_CODE_TDB_INVALID_ACTION; - return -1; - } - - STsdbMeta *pMeta = pRepo->tsdbMeta; - - pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *)); - memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables - oldMaxTables)); - pRepo->config.maxTables = maxTables; - - if (pRepo->mem) { - pRepo->mem->tData = realloc(pRepo->mem->tData, maxTables * sizeof(STableData *)); - memset(POINTER_SHIFT(pRepo->mem->tData, sizeof(STableData *) * oldMaxTables), 0, - sizeof(STableData *) * (maxTables - oldMaxTables)); - } - - if (pRepo->imem) { - pRepo->imem->tData = realloc(pRepo->imem->tData, maxTables * sizeof(STableData *)); - memset(POINTER_SHIFT(pRepo->imem->tData, sizeof(STableData *) * oldMaxTables), 0, - sizeof(STableData *) * (maxTables - oldMaxTables)); - } - - tsdbDebug("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables); - return 0; -} - static int keyFGroupCompFunc(const void *key, const void *fgroup) { int fid = *(int *)key; SFileGroup *pFGroup = (SFileGroup *)fgroup; @@ -914,7 +877,6 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) { tlen += taosEncodeVariantI32(buf, pCfg->tsdbId); tlen += taosEncodeFixedI32(buf, pCfg->cacheBlockSize); tlen += taosEncodeVariantI32(buf, pCfg->totalBlocks); - tlen += taosEncodeVariantI32(buf, pCfg->maxTables); tlen += taosEncodeVariantI32(buf, pCfg->daysPerFile); tlen += taosEncodeVariantI32(buf, pCfg->keep); tlen += taosEncodeVariantI32(buf, pCfg->keep1); @@ -931,7 +893,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { buf = taosDecodeVariantI32(buf, &(pCfg->tsdbId)); buf = taosDecodeFixedI32(buf, &(pCfg->cacheBlockSize)); buf = taosDecodeVariantI32(buf, &(pCfg->totalBlocks)); - buf = taosDecodeVariantI32(buf, &(pCfg->maxTables)); buf = taosDecodeVariantI32(buf, &(pCfg->daysPerFile)); buf = taosDecodeVariantI32(buf, &(pCfg->keep)); buf = taosDecodeVariantI32(buf, &(pCfg->keep1)); @@ -1037,7 +998,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { pBlock->schemaLen = htonl(pBlock->schemaLen); pBlock->numOfRows = htons(pBlock->numOfRows); - if (pBlock->tid <= 0 || pBlock->tid >= pRepo->config.maxTables) { + if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, pBlock->tid); terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; @@ -1120,7 +1081,7 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) { static void tsdbStartStream(STsdbRepo *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; - for (int i = 0; i < pRepo->config.maxTables; i++) { + for (int i = 0; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->type == TSDB_STREAM_TABLE) { pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, @@ -1133,7 +1094,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) { static void tsdbStopStream(STsdbRepo *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; - for (int i = 0; i < pRepo->config.maxTables; i++) { + for (int i = 0; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->type == TSDB_STREAM_TABLE) { (*pRepo->appH.cqDropFunc)(pTable->cqhandle); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 7708646a61c22a772fe0e9d0c4bfb46346fbcc95..9ca28d06cb8a0260ab79d6196afcdfc80a9dfd3f 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -21,7 +21,7 @@ static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo); static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes); -static SMemTable * tsdbNewMemTable(STsdbCfg *pCfg); +static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); @@ -30,13 +30,15 @@ static void * tsdbCommitData(void *arg); static int tsdbCommitMeta(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); +static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); // ---------------- INTERNAL FUNCTIONS ---------------- int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { STsdbCfg * pCfg = &pRepo->config; + STsdbMeta * pMeta = pRepo->tsdbMeta; int32_t level = 0; int32_t headSize = 0; TSKEY key = dataRowKey(row); @@ -45,7 +47,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { SSkipList * pSList = NULL; int bytes = 0; - if (pMemTable != NULL && pMemTable->tData[TABLE_TID(pTable)] != NULL && + if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL && pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) { pTableData = pMemTable->tData[TABLE_TID(pTable)]; pSList = pTableData->pData; @@ -66,13 +68,20 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { // Operations above may change pRepo->mem, retake those values ASSERT(pRepo->mem != NULL); pMemTable = pRepo->mem; + + if (TABLE_TID(pTable) >= pMemTable->maxTables) { + if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) return -1;; + } pTableData = pMemTable->tData[TABLE_TID(pTable)]; if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem) + taosWLockLatch(&(pMemTable->latch)); pMemTable->tData[TABLE_TID(pTable)] = NULL; tsdbFreeTableData(pTableData); + taosWUnLockLatch(&(pMemTable->latch)); } + pTableData = tsdbNewTableData(pCfg, pTable); if (pTableData == NULL) { tsdbError("vgId:%d failed to insert row with key %" PRId64 @@ -122,7 +131,6 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { int ref = T_REF_DEC(pMemTable); tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref); if (ref == 0) { - STsdbCfg * pCfg = &pRepo->config; STsdbBufPool *pBufPool = pRepo->pPool; SListNode *pNode = NULL; @@ -139,7 +147,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { } if (tsdbUnlockRepo(pRepo) < 0) return -1; - for (int i = 0; i < pCfg->maxTables; i++) { + for (int i = 0; i < pMemTable->maxTables; i++) { if (pMemTable->tData[i] != NULL) { tsdbFreeTableData(pMemTable->tData[i]); } @@ -161,11 +169,24 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { tsdbRefMemTable(pRepo, *pIMem); if (tsdbUnlockRepo(pRepo) < 0) return -1; - tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem); + if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch)); + + tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem); return 0; } +void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) { + if (pMem != NULL) { + taosRUnLockLatch(&(pMem->latch)); + tsdbUnRefMemTable(pRepo, pMem); + } + + if (pIMem != NULL) { + tsdbUnRefMemTable(pRepo, pIMem); + } +} + void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { STsdbCfg * pCfg = &pRepo->config; STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); @@ -182,7 +203,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { } if (pRepo->mem == NULL) { - SMemTable *pMemTable = tsdbNewMemTable(&pRepo->config); + SMemTable *pMemTable = tsdbNewMemTable(pRepo); if (pMemTable == NULL) return NULL; if (tsdbLockRepo(pRepo) < 0) { @@ -329,7 +350,9 @@ static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) { listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); } -static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) { +static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { + STsdbMeta *pMeta = pRepo->tsdbMeta; + SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable)); if (pMemTable == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -340,7 +363,8 @@ static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) { pMemTable->keyLast = 0; pMemTable->numOfRows = 0; - pMemTable->tData = (STableData**)calloc(pCfg->maxTables, sizeof(STableData*)); + pMemTable->maxTables = pMeta->maxTables; + pMemTable->tData = (STableData **)calloc(pMemTable->maxTables, sizeof(STableData *)); if (pMemTable->tData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -398,9 +422,6 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { goto _err; } - // TODO: operation here should not be here, remove it - pTableData->pData->level = 1; - return pTableData; _err: @@ -473,7 +494,7 @@ static void *tsdbCommitData(void *arg) { _exit: tdFreeDataCols(pDataCols); - tsdbDestroyCommitIters(iters, pCfg->maxTables); + tsdbDestroyCommitIters(iters, pMem->maxTables); tsdbDestroyHelper(&whelper); tsdbEndCommit(pRepo); tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); @@ -552,12 +573,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe STsdbCfg * pCfg = &pRepo->config; STsdbFileH *pFileH = pRepo->tsdbFileH; SFileGroup *pGroup = NULL; + SMemTable * pMem = pRepo->imem; TSKEY minKey = 0, maxKey = 0; tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); // Check if there are data to commit to this file - int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); + int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey); if (!hasDataToCommit) { tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid); return 0; @@ -570,7 +592,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe return -1; } - if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid, pCfg->maxTables)) == NULL) { + if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) { tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); goto _err; } @@ -582,7 +604,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe } // Loop to commit data in each table - for (int tid = 1; tid < pCfg->maxTables; tid++) { + for (int tid = 1; tid < pMem->maxTables; tid++) { SCommitIter *pIter = iters + tid; if (pIter->pTable == NULL) continue; @@ -643,11 +665,10 @@ _err: } static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { - STsdbCfg * pCfg = &(pRepo->config); SMemTable *pMem = pRepo->imem; STsdbMeta *pMeta = pRepo->tsdbMeta; - SCommitIter *iters = (SCommitIter *)calloc(pCfg->maxTables, sizeof(SCommitIter)); + SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter)); if (iters == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; @@ -656,7 +677,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { if (tsdbRLockRepoMeta(pRepo) < 0) goto _err; // reference all tables - for (int i = 0; i < pCfg->maxTables; i++) { + for (int i = 0; i < pMem->maxTables; i++) { if (pMeta->tables[i] != NULL) { tsdbRefTable(pMeta->tables[i]); iters[i].pTable = pMeta->tables[i]; @@ -665,7 +686,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err; - for (int i = 0; i < pCfg->maxTables; i++) { + for (int i = 0; i < pMem->maxTables; i++) { if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) { if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -679,7 +700,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { return iters; _err: - tsdbDestroyCommitIters(iters, pCfg->maxTables); + tsdbDestroyCommitIters(iters, pMem->maxTables); return NULL; } @@ -694,4 +715,25 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { } free(iters); +} + +static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { + ASSERT(pMemTable->maxTables < maxTables); + + STableData **pTableData = (STableData **)calloc(maxTables, sizeof(STableData *)); + if (pTableData == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + STableData **tData = pMemTable->tData; + + taosWLockLatch(&(pMemTable->latch)); + pMemTable->maxTables = maxTables; + pMemTable->tData = pTableData; + taosWUnLockLatch(&(pMemTable->latch)); + + tfree(tData); + + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index b25e734694c6136c8ee74ed8174bae5a96605a5a..8d237ab6737635cc0b40ba14f691d45b4d47a58c 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -49,6 +49,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable); static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable); static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable); static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); +static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); // ------------------ OUTER FUNCTIONS ------------------ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { @@ -60,13 +61,13 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { int tid = pCfg->tableId.tid; STable * pTable = NULL; - if (tid < 0 || tid >= pRepo->config.maxTables) { + if (tid < 1 || tid > TSDB_MAX_TABLES) { tsdbError("vgId:%d failed to create table since invalid tid %d", REPO_ID(pRepo), tid); terrno = TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO; goto _err; } - if (pMeta->tables[tid] != NULL) { + if (tid < pMeta->maxTables && pMeta->tables[tid] != NULL) { if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) { tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable)); @@ -422,7 +423,8 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) { goto _err; } - pMeta->tables = (STable **)calloc(pCfg->maxTables, sizeof(STable *)); + pMeta->maxTables = TSDB_INIT_NTABLES + 1; + pMeta->tables = (STable **)calloc(pMeta->maxTables, sizeof(STable *)); if (pMeta->tables == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -434,7 +436,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) { goto _err; } - pMeta->uidMap = taosHashInit(pCfg->maxTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + pMeta->uidMap = taosHashInit(TSDB_INIT_NTABLES * 1.1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); if (pMeta->uidMap == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -484,14 +486,13 @@ _err: } int tsdbCloseMeta(STsdbRepo *pRepo) { - STsdbCfg * pCfg = &pRepo->config; STsdbMeta *pMeta = pRepo->tsdbMeta; SListNode *pNode = NULL; STable * pTable = NULL; if (pMeta == NULL) return 0; tdCloseKVStore(pMeta->pStore); - for (int i = 1; i < pCfg->maxTables; i++) { + for (int i = 1; i < pMeta->maxTables; i++) { tsdbFreeTable(pMeta->tables[i]); } @@ -624,9 +625,8 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { static void tsdbOrgMeta(void *pHandle) { STsdbRepo *pRepo = (STsdbRepo *)pHandle; STsdbMeta *pMeta = pRepo->tsdbMeta; - STsdbCfg * pCfg = &pRepo->config; - for (int i = 1; i < pCfg->maxTables; i++) { + for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) { tsdbAddTableIntoIndex(pMeta, pTable, true); @@ -781,6 +781,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo goto _err; } } else { + if (tsdbAdjustMetaTables(pRepo, TABLE_TID(pTable)) < 0) goto _err; if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index if (tsdbAddTableIntoIndex(pMeta, pTable, true) < 0) { tsdbDebug("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo), @@ -827,7 +828,6 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro SListIter lIter = {0}; SListNode *pNode = NULL; STable * tTable = NULL; - STsdbCfg * pCfg = &(pRepo->config); STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); int maxCols = schemaNCols(pSchema); @@ -860,7 +860,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro if (maxCols == pMeta->maxCols || maxRowBytes == pMeta->maxRowBytes) { maxCols = 0; maxRowBytes = 0; - for (int i = 0; i < pCfg->maxTables; i++) { + for (int i = 0; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable != NULL) { pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); @@ -1266,5 +1266,28 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) { tsdbRemoveTableFromMeta(pRepo, pTable, true, true); } + return 0; +} + +static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) { + STsdbMeta *pMeta = pRepo->tsdbMeta; + if (pMeta->maxTables >= tid) return 0; + + int maxTables = tsdbGetNextMaxTables(tid); + + STable **tables = (STable **)calloc(maxTables, sizeof(STable *)); + if (tables == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + memcpy((void *)tables, (void *)pMeta->tables, sizeof(STable *) * pMeta->maxTables); + pMeta->maxTables = maxTables; + + STable **tTables = pMeta->tables; + pMeta->tables = tables; + tfree(tTables); + tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables); + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c82d1f905a20fffe447111f95d66c7060f61da7a..577a0965d813dee89922653568966073fcaa2ce9 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -317,17 +317,20 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh } assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL); - - if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) { + + // TODO: add uid check + if (pHandle->mem && pCheckInfo->tableId.tid < pHandle->mem->maxTables && + pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) { pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData, - (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); + (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); } - - if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) { + + if (pHandle->imem && pCheckInfo->tableId.tid < pHandle->imem->maxTables && + pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) { pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData, - (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); + (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); } - + // both iterators are NULL, no data in buffer right now if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) { return false; @@ -1529,7 +1532,6 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) { size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables); while (pQueryHandle->activeIndex < numOfTables) { if (hasMoreDataInCache(pQueryHandle)) { @@ -2418,8 +2420,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { tfree(pQueryHandle->statis); // todo check error - tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->mem); - tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem); + tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem); tsdbDestroyHelper(&pQueryHandle->rhelper); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 249fb428e7781c2c88551116602bb80e584cbc95..dd5ec1843941b8b691c53df9a4d3cb3f54f6521d 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -123,7 +123,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize; tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks; - tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables; + // tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables; tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep; tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock; @@ -630,14 +630,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion); len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize); len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks); - len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables); + // len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables); len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile); len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep); len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnodeCfg->cfg.daysToKeep1); len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2); len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock); - len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); + // len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel); @@ -729,12 +729,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { } pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint; - cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables"); - if (!maxTables || maxTables->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.maxTables = maxTables->valueint; + // cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables"); + // if (!maxTables || maxTables->type != cJSON_Number) { + // vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId); + // goto PARSE_OVER; + // } + // pVnode->tsdbCfg.maxTables = maxTables->valueint; cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); if (!daysPerFile || daysPerFile->type != cJSON_Number) { @@ -778,12 +778,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { } pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint; - cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime"); - if (!commitTime || commitTime->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint; + // cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime"); + // if (!commitTime || commitTime->type != cJSON_Number) { + // vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId); + // goto PARSE_OVER; + // } + // pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint; cJSON *precision = cJSON_GetObjectItem(root, "precision"); if (!precision || precision->type != cJSON_Number) {