From bf06466cf211878d9a0bc1f4ee8b086e62dee291 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 5 May 2020 22:01:56 +0800 Subject: [PATCH] add change repository compression options --- src/inc/taoserror.h | 3 ++ src/tsdb/inc/tsdbMain.h | 3 ++ src/tsdb/src/tsdbCache.c | 75 ++++++++++++++++++++++++++++++++++++---- src/tsdb/src/tsdbFile.c | 2 +- src/tsdb/src/tsdbMain.c | 48 ++++++++++++++++++++++--- 5 files changed, 118 insertions(+), 13 deletions(-) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 98355a8672..b5ab4412a9 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -173,6 +173,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 462, "invalid value") // others TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 500, "invalid file format") +// TSDB +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONFIG, 0, 550, "invalid TSDB configuration") + #ifdef TAOS_ERROR_C }; diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 89708a5553..e056a10bba 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -160,6 +160,7 @@ typedef struct { typedef struct { int64_t index; + int numOfCacheBlocks; SList * memPool; } STsdbCachePool; @@ -488,6 +489,8 @@ int tsdbWriteCompIdx(SRWHelper *pHelper); // --------- Other functions need to further organize void tsdbFitRetention(STsdbRepo *pRepo); +int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); +void tsdbAdjustCacheBlocks(STsdbCache *pCache); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCache.c b/src/tsdb/src/tsdbCache.c index 9351bc602b..84f8a81eea 100644 --- a/src/tsdb/src/tsdbCache.c +++ b/src/tsdb/src/tsdbCache.c @@ -20,6 +20,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache); static void tsdbFreeBlockList(SList *list); static void tsdbFreeCacheMem(SCacheMem *mem); +static int tsdbAddCacheBlockToPool(STsdbCache *pCache); STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) { STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); @@ -40,13 +41,7 @@ STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) if (pPool->memPool == NULL) goto _err; for (int i = 0; i < totalBlocks; i++) { - STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize); - if (pBlock == NULL) { - goto _err; - } - pBlock->offset = 0; - pBlock->remain = cacheBlockSize; - tdListAppend(pPool->memPool, (void *)(&pBlock)); + if (tsdbAddCacheBlockToPool(pCache) < 0) goto _err; } pCache->mem = NULL; @@ -142,4 +137,70 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { tsdbUnLockRepo(pCache->pRepo); return 0; +} + +int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { + STsdbCache *pCache = pRepo->tsdbCache; + int oldNumOfBlocks = pCache->totalCacheBlocks; + + tsdbLockRepo((TsdbRepoT *)pRepo); + + ASSERT(pCache->totalCacheBlocks != totalBlocks); + + if (pCache->totalCacheBlocks < totalBlocks) { + ASSERT(pCache->totalCacheBlocks == pCache->pool.numOfCacheBlocks); + int blocksToAdd = pCache->totalCacheBlocks - totalBlocks; + pCache->totalCacheBlocks = totalBlocks; + for (int i = 0; i < blocksToAdd; i++) { + if (tsdbAddCacheBlockToPool(pCache) < 0) { + tsdbUnLockRepo((TsdbRepoT *)pRepo); + tsdbError("tsdbId %d: failed to add cache block to cache pool", pRepo->config.tsdbId); + return -1; + } + } + } else { + pCache->totalCacheBlocks = totalBlocks; + tsdbAdjustCacheBlocks(pCache); + } + + tsdbUnLockRepo((TsdbRepoT *)pRepo); + tsdbTrace("tsdbId %d: tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, totalBlocks); + return 0; +} + +static int tsdbAddCacheBlockToPool(STsdbCache *pCache) { + STsdbCachePool *pPool = &pCache->pool; + + STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize); + if (pBlock == NULL) return -1; + + pBlock->offset = 0; + pBlock->remain = pCache->cacheBlockSize; + tdListAppend(pPool->memPool, (void *)(&pBlock)); + pPool->numOfCacheBlocks++; + + return 0; +} + +static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) { + STsdbCachePool *pPool = &pCache->pool; + STsdbCacheBlock *pBlock = NULL; + + ASSERT(pCache->totalCacheBlocks >= 0); + + SListNode *node = tdListPopHead(pPool->memPool); + if (node == NULL) return -1; + + tdListNodeGetData(pPool->memPool, node, &pBlock); + free(pBlock); + listNodeFree(node); + pPool->numOfCacheBlocks--; + + return 0; +} + +void tsdbAdjustCacheBlocks(STsdbCache *pCache) { + while (pCache->totalCacheBlocks < pCache->pool.numOfCacheBlocks) { + if (tsdbRemoveCacheBlockFromPool(pCache) < 0) break; + } } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 51bdda79e6..0c1b9e314e 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -47,7 +47,7 @@ STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) { return NULL; } - pFileH->maxFGroups = pCfg->keep / pCfg->daysPerFile + 2; + pFileH->maxFGroups = pCfg->keep / pCfg->daysPerFile + 3; pFileH->fGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup)); if (pFileH->fGroup == NULL) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index dd5dbd3f8d..3f41a3c5fe 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -36,8 +36,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **i SDataCols *pDataCols); static TSKEY tsdbNextIterKey(SSkipListIterator *pIter); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); -// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, -// int64_t uid); +static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression); +static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); +static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -298,10 +299,23 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) { */ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) { STsdbRepo *pRepo = (STsdbRepo *)repo; + STsdbCfg * pRCfg = &pRepo->config; - pRepo->config = *pCfg; - // TODO - return 0; + if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_INVALID_CONFIG; + + ASSERT(pRCfg->tsdbId == pCfg->tsdbId); + ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); + ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile); + ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock); + ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock); + ASSERT(pRCfg->precision == pCfg->precision); + + if (pRCfg->compression != pCfg->compression) tsdbAlterCompression(pRepo, pCfg->compression); + if (pRCfg->keep != pCfg->keep) tsdbAlterKeep(pRepo, pCfg->keep); + if (pRCfg->totalBlocks != pCfg->totalBlocks) tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks); + if (pRCfg->maxTables != pCfg->maxTables) tsdbAlterMaxTables(pRepo, pCfg->maxTables); + + return TSDB_CODE_SUCCESS; } int32_t tsdbTriggerCommit(TsdbRepoT *repo) { @@ -927,6 +941,7 @@ _exit: tsdbLockRepo(arg); tdListMove(pCache->imem->list, pCache->pool.memPool); + tsdbAdjustCacheBlocks(pCache); tdListFree(pCache->imem->list); free(pCache->imem); pCache->imem = NULL; @@ -1045,4 +1060,27 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; } return 0; +} + +static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) { + pRepo->config.compression = compression; +} + +static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { + STsdbCfg *pCfg = &pRepo->config; + + int maxFiles = keep / pCfg->maxTables + 3; + if (pRepo->config.keep > keep) { + pRepo->tsdbFileH->maxFGroups = maxFiles; + } else { + pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup)); + if (pRepo->tsdbFileH->fGroup == NULL) { + // TODO: deal with the error + } + pRepo->tsdbFileH->maxFGroups = maxFiles; + } +} + +static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { + // TODO } \ No newline at end of file -- GitLab