diff --git a/src/tsdb/inc/tsdbBuffer.h b/src/tsdb/inc/tsdbBuffer.h index 414ace00097d95742080a8f173177d5e44497237..c6aabeaab9e1ee725bd6326dfc9da4a45edbe898 100644 --- a/src/tsdb/inc/tsdbBuffer.h +++ b/src/tsdb/inc/tsdbBuffer.h @@ -28,8 +28,9 @@ typedef struct { int bufBlockSize; int tBufBlocks; int nBufBlocks; + int nRecycleBlocks; int64_t index; - SList* bufBlockList; + SList* bufBlockList; } STsdbBufPool; #define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold @@ -39,5 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool); int tsdbOpenBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); +void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); #endif /* _TD_TSDB_BUFFER_H_ */ \ No newline at end of file diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 1798a21b9963c7641dd99dc7fa11a5dd977e0e3c..266f2a45d87ec03969b8acd9bb204cc69213e32e 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -70,6 +70,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { pPool->tBufBlocks = pCfg->totalBlocks; pPool->nBufBlocks = 0; pPool->index = 0; + pPool->nRecycleBlocks = 0; for (int i = 0; i < pCfg->totalBlocks; i++) { STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); @@ -156,4 +157,42 @@ _err: return NULL; } -static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } \ No newline at end of file +static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } + +void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { + if (oldTotalBlocks == pRepo->config.totalBlocks) { + return; + } + + if (tsdbLockRepo(pRepo) < 0) return; + STsdbBufPool* pPool = pRepo->pPool; + + if (pRepo->config.totalBlocks > oldTotalBlocks) { + for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) { + STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); + if (pBufBlock == NULL) goto err; + + if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { + tsdbFreeBufBlock(pBufBlock); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto err; + } + + pPool->nBufBlocks++; + } + pthread_cond_signal(&pPool->poolNotEmpty); + } else { + pPool->nRecycleBlocks = oldTotalBlocks - pRepo->config.totalBlocks; + } + +err: + tsdbUnlockRepo(pRepo); +} + +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { + STsdbBufBlock *pBufBlock = NULL; + tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); + tsdbFreeBufBlock(pBufBlock); + free(pNode); + pPool->nBufBlocks--; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 2e2cc74159eced73a0980c7700059a5ed7c505cb..86712db957c4049307821f08c2d37c84005fc663 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -116,17 +116,21 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { pRepo->config_changed = false; STsdbCfg * pSaveCfg = &pRepo->save_config; + int32_t oldTotalBlocks = pRepo->config.totalBlocks; + pRepo->config.compression = pRepo->save_config.compression; pRepo->config.keep = pRepo->save_config.keep; pRepo->config.keep1 = pRepo->save_config.keep1; pRepo->config.keep2 = pRepo->save_config.keep2; pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; - //pRepo->config.update = pRepo->save_config.update; + pRepo->config.totalBlocks = pRepo->save_config.totalBlocks; - tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d)", + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(pRepo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow); + pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks); + + tsdbExpendPool(pRepo, oldTotalBlocks); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c5f1052b634307c073fde2c059434a4dedc6b2a7..fd02a3c8b97d7506209d92661143a158d8d94951 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -230,6 +230,9 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { if (pRCfg->cacheLastRow != pCfg->cacheLastRow) { configChanged = true; } + if (pRCfg->totalBlocks != pCfg->totalBlocks) { + configChanged = true; + } if (!configChanged) { tsdbError("vgId:%d no config changed", REPO_ID(repo)); @@ -250,15 +253,16 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { pSaveCfg->keep1 = pCfg->keep1; pSaveCfg->keep2 = pCfg->keep2; pSaveCfg->cacheLastRow = pCfg->cacheLastRow; + pSaveCfg->totalBlocks = pCfg->totalBlocks; - tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d)", + tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(repo), pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2, - pRCfg->cacheLastRow); - tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d)", + pRCfg->cacheLastRow, pRCfg->totalBlocks); + tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(repo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->cacheLastRow); + pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks); repo->config_changed = true; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 20ec426018a39e554fb03e9bb11399dbce1f3fcc..776cc07d2f5572767de2210249acac6082e4b608 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -98,17 +98,26 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { STsdbBufPool *pBufPool = pRepo->pPool; SListNode *pNode = NULL; + bool recycleBlocks = pBufPool->nRecycleBlocks > 0; if (tsdbLockRepo(pRepo) < 0) return -1; while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { - tdListAppendNode(pBufPool->bufBlockList, pNode); - } - int code = pthread_cond_signal(&pBufPool->poolNotEmpty); - if (code != 0) { - if (tsdbUnlockRepo(pRepo) < 0) return -1; - tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code)); - terrno = TAOS_SYSTEM_ERROR(code); - return -1; + if (pBufPool->nRecycleBlocks > 0) { + tsdbRecycleBufferBlock(pBufPool, pNode); + pBufPool->nRecycleBlocks -= 1; + } else { + tdListAppendNode(pBufPool->bufBlockList, pNode); + } + } + if (!recycleBlocks) { + int code = pthread_cond_signal(&pBufPool->poolNotEmpty); + if (code != 0) { + if (tsdbUnlockRepo(pRepo) < 0) return -1; + tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } } + if (tsdbUnlockRepo(pRepo) < 0) return -1; for (int i = 0; i < pMemTable->maxTables; i++) {