From 1062357eeee0cb3eb04064c20c98a3f356f51ad0 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 26 May 2021 17:36:51 +0800 Subject: [PATCH] [TD-4034]when cacheLast option changed,restore or free cache last data --- src/tsdb/inc/tsdbMeta.h | 6 +- src/tsdb/inc/tsdbint.h | 4 + src/tsdb/src/tsdbCommitQueue.c | 22 +++-- src/tsdb/src/tsdbMain.c | 172 ++++++++++++++++++++++++++++++++- src/tsdb/src/tsdbMeta.c | 23 ++--- 5 files changed, 203 insertions(+), 24 deletions(-) diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 45868c002d..14d5a41768 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -38,8 +38,9 @@ typedef struct STable { SRWLatch latch; // TODO: implementa latch functions SDataCol *lastCols; - int16_t lastColNum; - int16_t maxColumnNum; + int16_t maxColNum; + int16_t restoreColumnNum; + bool hasRestoreLastColumn; int lastColSVersion; T_REF_DECLARE() } STable; @@ -90,6 +91,7 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema); int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId); int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema); STSchema* tsdbGetTableLatestSchema(STable *pTable); +void tsdbFreeLastColumns(STable* pTable); static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 4d62164df9..e74c3238e2 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -75,6 +75,9 @@ struct STsdbRepo { STsdbCfg save_config; // save apply config bool config_changed; // config changed flag pthread_mutex_t save_mutex; // protect save config + + uint8_t hasCachedLastRow; + uint8_t hasCachedLastColumn; STsdbAppH appH; STsdbStat stat; @@ -100,6 +103,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo); +int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]); diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index e753a3211e..abea79bc4f 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -113,11 +113,15 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) { } static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { + pthread_mutex_lock(&pRepo->save_mutex); + pRepo->config_changed = false; STsdbCfg * pSaveCfg = &pRepo->save_config; - + STsdbCfg oldCfg; int32_t oldTotalBlocks = pRepo->config.totalBlocks; + memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg)); + pRepo->config.compression = pRepo->save_config.compression; pRepo->config.keep = pRepo->save_config.keep; pRepo->config.keep1 = pRepo->save_config.keep1; @@ -125,10 +129,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; pRepo->config.totalBlocks = pRepo->save_config.totalBlocks; - tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)", + pthread_mutex_unlock(&pRepo->save_mutex); + + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)", REPO_ID(pRepo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks); + pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks); int err = tsdbExpendPool(pRepo, oldTotalBlocks); if (!TAOS_SUCCEEDED(err)) { @@ -136,6 +142,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err)); } + if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) { + if (tsdbLockRepo(pRepo) < 0) return; + tsdbCacheLastData(pRepo, &oldCfg); + tsdbUnlockRepo(pRepo); + } + } static void *tsdbLoopCommit(void *arg) { @@ -165,10 +177,8 @@ static void *tsdbLoopCommit(void *arg) { pRepo = ((SCommitReq *)pNode->data)->pRepo; // check if need to apply new config - if (pRepo->config_changed) { - pthread_mutex_lock(&pRepo->save_mutex); + if (pRepo->config_changed) { tsdbApplyRepoConfig(pRepo); - pthread_mutex_unlock(&pRepo->save_mutex); } tsdbCommitData(pRepo); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 880400dd9f..a8bbd0d69e 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -548,6 +548,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } pRepo->config_changed = false; + atomic_store_8(&pRepo->hasCachedLastRow, 0); + atomic_store_8(&pRepo->hasCachedLastColumn, 0); code = tsem_init(&(pRepo->readyToCommit), 0, 1); if (code != 0) { @@ -636,7 +638,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea int err = 0; numColumns = schemaNCols(pSchema); - if (numColumns <= pTable->maxColumnNum) { + if (numColumns <= pTable->restoreColumnNum) { return 0; } if (pTable->lastColSVersion != schemaVersion(pSchema)) { @@ -675,7 +677,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea SBlockIdx *pIdx = pReadh->pBlkIdx; blockIdx = (int32_t)(pIdx->numOfBlocks - 1); - while (numColumns > pTable->maxColumnNum && blockIdx >= 0) { + while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) { bool loadStatisData = false; pBlock = pReadh->pBlkInfo->blocks + blockIdx; blockIdx -= 1; @@ -693,7 +695,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea loadStatisData = true; } - for (int16_t i = 0; i < numColumns && numColumns > pTable->maxColumnNum; ++i) { + for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { STColumn *pCol = schemaColAt(pSchema, i); // ignore loaded columns if (pTable->lastCols[i].bytes != 0) { @@ -733,7 +735,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); pLastCol->ts = dataRowKey(row); - pTable->maxColumnNum += 1; + pTable->restoreColumnNum += 1; tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); break; @@ -766,7 +768,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - pTable->maxColumnNum = 0; + pTable->restoreColumnNum = 0; } } @@ -841,5 +843,165 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { } tsdbDestroyReadH(&readh); + if (CACHE_LAST_ROW(pCfg)) { + atomic_store_8(&pRepo->hasCachedLastRow, 1); + } + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + atomic_store_8(&pRepo->hasCachedLastColumn, 1); + } + return 0; } + +int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { + bool cacheLastRow = false, cacheLastCol = false; + SFSIter fsiter; + SReadH readh; + SDFileSet *pSet; + STsdbMeta *pMeta = pRepo->tsdbMeta; + //STsdbCfg * pCfg = REPO_CFG(pRepo); + SBlock * pBlock; + int tableNum = 0; + int maxTableIdx = 0; + int cacheLastRowTableNum = 0; + int cacheLastColTableNum = 0; + + bool need_free_last_row = CACHE_LAST_ROW(oldCfg) && !CACHE_LAST_ROW(&(pRepo->config)); + bool need_free_last_col = CACHE_LAST_NULL_COLUMN(oldCfg) && !CACHE_LAST_NULL_COLUMN(&(pRepo->config)); + + if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) { + tsdbInfo("tsdbCacheLastData cache last data since cacheLast option changed"); + cacheLastRow = !CACHE_LAST_ROW(oldCfg) && CACHE_LAST_ROW(&(pRepo->config)); + cacheLastCol = !CACHE_LAST_NULL_COLUMN(oldCfg) && CACHE_LAST_NULL_COLUMN(&(pRepo->config)); + } + + // calc max table idx and table num + for (int i = 1; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + tableNum += 1; + maxTableIdx = i; + if (cacheLastCol) { + pTable->restoreColumnNum = 0; + } + } + + // if close last option,need to free data + if (need_free_last_row || need_free_last_col) { + if (need_free_last_row) { + atomic_store_8(&pRepo->hasCachedLastRow, 0); + } + if (need_free_last_col) { + atomic_store_8(&pRepo->hasCachedLastColumn, 0); + } + tsdbInfo("free cache last data since cacheLast option changed"); + for (int i = 1; i < maxTableIdx; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + if (need_free_last_row) { + taosTZfree(pTable->lastRow); + pTable->lastRow = NULL; + pTable->lastKey = TSKEY_INITIAL_VAL; + } + if (need_free_last_col) { + tsdbFreeLastColumns(pTable); + } + } + } + + if (!cacheLastRow && !cacheLastCol) { + return 0; + } + + cacheLastRowTableNum = cacheLastRow ? tableNum : 0; + cacheLastColTableNum = cacheLastCol ? tableNum : 0; + + if (tsdbInitReadH(&readh, pRepo) < 0) { + return -1; + } + + tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD); + + while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) { + if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + if (tsdbLoadBlockIdx(&readh) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + for (int i = 1; i <= maxTableIdx; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + + //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); + + if (tsdbSetReadTable(&readh, pTable) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + SBlockIdx *pIdx = readh.pBlkIdx; + + if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { + pTable->lastKey = pIdx->maxKey; + + if (tsdbLoadBlockInfo(&readh, NULL) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; + + if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + // Get the data in row + ASSERT(pTable->lastRow == NULL); + STSchema *pSchema = tsdbGetTableSchema(pTable); + pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (pTable->lastRow == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyReadH(&readh); + return -1; + } + + tdInitDataRow(pTable->lastRow, pSchema); + for (int icol = 0; icol < schemaNCols(pSchema); icol++) { + STColumn *pCol = schemaColAt(pSchema, icol); + SDataCol *pDataCol = readh.pDCols[0]->cols + icol; + tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, + pCol->offset); + } + cacheLastRowTableNum -= 1; + } + + // restore NULL columns + if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) { + if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { + tsdbDestroyReadH(&readh); + return -1; + } + if (pTable->hasRestoreLastColumn) { + cacheLastColTableNum -= 1; + } + } + } + } + + tsdbDestroyReadH(&readh); + + if (cacheLastRow) { + atomic_store_8(&pRepo->hasCachedLastRow, 1); + } + if (cacheLastCol) { + atomic_store_8(&pRepo->hasCachedLastColumn, 1); + } + + return 0; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 5717d52eea..324a7c79c5 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -44,7 +44,6 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable); static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema); -static void tsdbFreeLastColumns(STable* pTable); // ------------------ OUTER FUNCTIONS ------------------ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { @@ -590,12 +589,12 @@ void tsdbUnRefTable(STable *pTable) { } } -static void tsdbFreeLastColumns(STable* pTable) { +void tsdbFreeLastColumns(STable* pTable) { if (pTable->lastCols == NULL) { return; } - for (int i = 0; i < pTable->lastColNum; ++i) { + for (int i = 0; i < pTable->maxColNum; ++i) { if (pTable->lastCols[i].bytes == 0) { continue; } @@ -605,14 +604,16 @@ static void tsdbFreeLastColumns(STable* pTable) { } tfree(pTable->lastCols); pTable->lastCols = NULL; - pTable->lastColNum = 0; + pTable->maxColNum = 0; + pTable->lastColSVersion = -1; + pTable->restoreColumnNum = 0; } int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { if (pTable->lastCols == NULL) { return -1; } - for (int16_t i = 0; i < pTable->lastColNum; ++i) { + for (int16_t i = 0; i < pTable->maxColNum; ++i) { if (pTable->lastCols[i].colId == colId) { return i; } @@ -640,8 +641,8 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { } pTable->lastColSVersion = schemaVersion(pSchema); - pTable->lastColNum = numOfColumn; - pTable->maxColumnNum = 0; + pTable->maxColNum = numOfColumn; + pTable->restoreColumnNum = 0; return 0; } @@ -682,11 +683,11 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) { } SDataCol *oldLastCols = pTable->lastCols; - int16_t oldLastColNum = pTable->lastColNum; + int16_t oldLastColNum = pTable->maxColNum; pTable->lastColSVersion = schemaVersion(pNewSchema); pTable->lastCols = lastCols; - pTable->lastColNum = numOfCols; + pTable->maxColNum = numOfCols; if (oldLastCols == NULL) { TSDB_WUNLOCK_TABLE(pTable); @@ -797,8 +798,8 @@ static STable *tsdbNewTable() { pTable->lastKey = TSKEY_INITIAL_VAL; pTable->lastCols = NULL; - pTable->maxColumnNum = 0; - pTable->lastColNum = 0; + pTable->restoreColumnNum = 0; + pTable->maxColNum = 0; pTable->lastColSVersion = -1; return pTable; } -- GitLab