From 876c006847a2f698b2445abf598463ab5a8e3f43 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 6 Apr 2022 23:01:45 +0800 Subject: [PATCH] [TS-238](tsdb): fixed last_row() return empty. --- src/client/src/tscDelete.c | 37 +++++------------- src/tsdb/inc/tsdbint.h | 3 +- src/tsdb/src/tsdbDelete.c | 79 +++++++++++++++++++++++++++++++------- src/tsdb/src/tsdbMain.c | 39 ++++++++++++------- src/tsdb/src/tsdbRead.c | 2 +- 5 files changed, 102 insertions(+), 58 deletions(-) diff --git a/src/client/src/tscDelete.c b/src/client/src/tscDelete.c index 5df03ebf26..e5727c65cc 100644 --- a/src/client/src/tscDelete.c +++ b/src/client/src/tscDelete.c @@ -58,13 +58,11 @@ void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) { trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; tscError("0x%"PRIx64" query cancelled or failed, sub:0x%"PRIx64", vgId:%d, orderOfSub:%d, code:%s, global code:%s", pParentSql->self, pSql->self, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code)); - - tscHandleSubDeleteError(param, tres, code); - if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) { // all sub done, call parentSQL callback to finish (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); } + tfree(pSql->param); return; } @@ -76,42 +74,27 @@ void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) { * NOTE: thread safe is required. */ if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { - assert(code == taos_errno(pSql)); - int32_t sent = 0; - - if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID && code != TSDB_CODE_VND_INVALID_VGROUP_ID)) { - tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry); - - //tscReissueSubquery(trsupport, pSql, code, &sent); - if (sent) { - return; - } - } else { - tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code)); - atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort - } - + tscError(":CDEL 0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code)); + atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort tscHandleSubDeleteError(param, tres, pParentSql->res.code); - - if(!sent) { - if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) { - // all sub done, call parentSQL callback to finish - (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); - } + if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) { + // all sub done, call parentSQL callback to finish + (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); } - + tfree(pSql->param); return; } tscDebug("0x%"PRIx64":CDEL sub:0x%"PRIx64" query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql->self, pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex); - // do merge + // success do total count pParentSql->res.numOfRows += pSql->res.numOfRows; if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) { // all sub done, call parentSQL callback to finish (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); } + tfree(pSql->param); return ; } @@ -206,7 +189,6 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { return TSDB_CODE_VND_INVALID_VGROUP_ID; } - pRes->qId = 0x1; // hack the qhandle check SSubqueryState *pState = &pSql->subState; int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; @@ -237,7 +219,6 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { SSqlObj *pNew = tscCreateSTableSubDelete(pSql, pVgroupMsg, trs); if (pNew == NULL) { tscError("0x%"PRIx64"CDEL failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); - tfree(trs->localBuffer); tfree(trs); break; } diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 086f02b159..aa2c41cb1f 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -116,9 +116,10 @@ STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo); UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); -int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable); +int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable, bool lastKey); void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]); +int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx, bool onlyKey); static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { ASSERT(pRepo != NULL); diff --git a/src/tsdb/src/tsdbDelete.c b/src/tsdb/src/tsdbDelete.c index 4ddf644cab..4a26a2526d 100644 --- a/src/tsdb/src/tsdbDelete.c +++ b/src/tsdb/src/tsdbDelete.c @@ -32,9 +32,11 @@ typedef struct { SBlockIdx * pBlkIdx; SBlockIdx bIndex; SBlockInfo *pInfo; + bool update; // need update lastrow } STableDeleteH; typedef struct { + STsdbRepo *pRepo; SRtn rtn; SFSIter fsIter; SArray * tblArray; // STableDeleteH, table array to cache table obj and block indexes @@ -45,6 +47,7 @@ typedef struct { SArray * aSubBlk; SDataCols *pDCols; SControlDataInfo* pCtlInfo; + SArray * aUpdates; } SDeleteH; @@ -63,7 +66,7 @@ typedef struct { static void tsdbStartDelete(STsdbRepo *pRepo); static void tsdbEndDelete(STsdbRepo *pRepo, int eno); static int tsdbDeleteMeta(STsdbRepo *pRepo); -static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo); +static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray); static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet); static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo); static void tsdbDestroyDeleteH(SDeleteH *pdh); @@ -91,6 +94,24 @@ int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlInfo) { return ret; } +static void tsdbUpdateLastRow(STsdbRepo* pRepo, SArray * pArray) { + size_t cnt = taosArrayGetSize(pArray); + for (size_t i = 0; i < cnt; ++i) { + STable* pTable = taosArrayGetP(pArray, i); + tsdbLoadLastCache(pRepo, pTable, true); + } +} + +static void tsdbClearUpdates(SArray * pArray) { + size_t cnt = taosArrayGetSize(pArray); + for (size_t i = 0; i < cnt; ++i) { + STable* pTable = taosArrayGetP(pArray, i); + tsdbUnRefTable(pTable); + } + // destory + taosArrayDestroy(&pArray); +} + static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { int32_t code = 0; // Step 1: check and clear cache @@ -117,17 +138,23 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { goto _err; } - if (tsdbDeleteTSData(pRepo, pCtlInfo) < 0) { + SArray* aUpdates = taosArrayInit(10, sizeof(STable *)); + if (tsdbDeleteTSData(pRepo, pCtlInfo, aUpdates) < 0) { tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } tsdbEndDelete(pRepo, TSDB_CODE_SUCCESS); + + // update last row + tsdbUpdateLastRow(pRepo, aUpdates); + tsdbClearUpdates(aUpdates); return TSDB_CODE_SUCCESS; _err: pRepo->code = terrno; tsdbEndDelete(pRepo, terrno); + tsdbClearUpdates(aUpdates); return -1; } @@ -174,27 +201,28 @@ static int tsdbDeleteMeta(STsdbRepo *pRepo) { return 0; } -static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { +static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray) { STsdbCfg * pCfg = REPO_CFG(pRepo); - SDeleteH truncateH = {0}; + SDeleteH deleteH = {0}; SDFileSet * pSet = NULL; tsdbDebug("vgId:%d start to truncate TS data for %d", REPO_ID(pRepo), pCtlInfo->tids[0]); - if (tsdbInitDeleteH(&truncateH, pRepo) < 0) { + if (tsdbInitDeleteH(&deleteH, pRepo) < 0) { return -1; } - truncateH.pCtlInfo = pCtlInfo; - STimeWindow win = pCtlInfo->win; + deleteH.aUpdates = pArray; + deleteH.pCtlInfo = pCtlInfo; + STimeWindow win = pCtlInfo->win; int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision); int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision); ASSERT(sFid <= eFid); - while ((pSet = tsdbFSIterNext(&(truncateH.fsIter)))) { + while ((pSet = tsdbFSIterNext(&(deleteH.fsIter)))) { // remove expired files - if (pSet->fid < truncateH.rtn.minFid) { + if (pSet->fid < deleteH.rtn.minFid) { tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); continue; @@ -202,7 +230,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { if ((pSet->fid < sFid) || (pSet->fid > eFid)) { tsdbDebug("vgId:%d no need to truncate FSET %d, sFid %d, eFid %d", REPO_ID(pRepo), pSet->fid, sFid, eFid); - if (tsdbApplyRtnOnFSet(pRepo, pSet, &(truncateH.rtn)) < 0) { + if (tsdbApplyRtnOnFSet(pRepo, pSet, &(deleteH.rtn)) < 0) { return -1; } continue; @@ -217,8 +245,8 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { #endif if (pCtlInfo->command & CMD_DELETE_DATA) { - if (tsdbFSetDelete(&truncateH, pSet) < 0) { - tsdbDestroyDeleteH(&truncateH); + if (tsdbFSetDelete(&deleteH, pSet) < 0) { + tsdbDestroyDeleteH(&deleteH); tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); return -1; } @@ -228,7 +256,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { } - tsdbDestroyDeleteH(&truncateH); + tsdbDestroyDeleteH(&deleteH); tsdbDebug("vgId:%d truncate TS data over", REPO_ID(pRepo)); return 0; } @@ -295,6 +323,7 @@ static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo) { memset(pdh, 0, sizeof(*pdh)); TSDB_FSET_SET_CLOSED(TSDB_DELETE_WSET(pdh)); + pdh->pRepo = pRepo; tsdbGetRtnSnap(pRepo, &(pdh->rtn)); tsdbFSIterInit(&(pdh->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); @@ -349,6 +378,21 @@ static void tsdbDestroyDeleteH(SDeleteH *pdh) { tsdbCloseDFileSet(TSDB_DELETE_WSET(pdh)); } +void tsdbAddUpdates(SArray* pArray, STable* pTable) { + size_t cnt = taosArrayGetSize(pArray); + for ( size_t i = 0; i < cnt; i++) { + STable* pt = taosArrayGetP(pArray, i); + if ( pt == pTable) { + // found + return ; + } + } + // ref count ++ + tsdbRefTable(pTable); + // append + taosArrayAddBatch(pArray, &pTable, 1); +} + // init tbl array with pRepo->meta static int tsdbInitDeleteTblArray(SDeleteH *pdh) { STsdbRepo *pRepo = TSDB_DELETE_REPO(pdh); @@ -640,7 +684,14 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) { return -1; } - return 0; + // update new last row in last row was deleted + TSKEY lastKey = pItem->pTable->lastKey; + if(lastKey >= pdh->pCtlInfo->win.skey && lastKey <= pdh->pCtlInfo->win.ekey) { + // update lastkey and lastrow + tsdbAddUpdates(pdh->aUpdates, pItem->pTable); + } + + return TSDB_CODE_SUCCESS; } // keep intact blocks info and write to head file then save offset to blkIdx diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 89731b6624..f62ae9e834 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -30,7 +30,6 @@ static void tsdbFreeRepo(STsdbRepo *pRepo); static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo); static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh); -static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx); // Function declaration int32_t tsdbCreateRepo(int repoid) { @@ -814,8 +813,7 @@ out: return err; } -static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { - ASSERT(pTable->lastRow == NULL); +int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx, bool onlyKey) { if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) { return -1; } @@ -844,15 +842,22 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, } TSKEY lastKey = memRowKey(lastRow); - // during the load data in file, new data would be inserted and last row has been updated TSDB_WLOCK_TABLE(pTable); - if (pTable->lastRow == NULL) { + + if (onlyKey) { pTable->lastKey = lastKey; - pTable->lastRow = lastRow; - TSDB_WUNLOCK_TABLE(pTable); } else { - TSDB_WUNLOCK_TABLE(pTable); + // set + if (pTable->lastRow == NULL) { + pTable->lastKey = lastKey; + pTable->lastRow = lastRow; + lastRow = NULL; + } + } + + TSDB_WUNLOCK_TABLE(pTable); + if (lastRow) { taosTZfree(lastRow); } @@ -908,7 +913,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { if (pIdx && lastKey < pIdx->maxKey) { pTable->lastKey = pIdx->maxKey; - if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { + if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx, false) != 0) { tsdbDestroyReadH(&readh); return -1; } @@ -933,10 +938,11 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { return 0; } -int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { +int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable, bool lastKey) { SFSIter fsiter; SReadH readh; SDFileSet *pSet; + bool onlyKey = false; int cacheLastRowTableNum = 0; int cacheLastColTableNum = 0; @@ -956,14 +962,19 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { } if (!cacheLastRow && !cacheLastCol) { - return 0; + if(!lastKey) + return 0; + onlyKey = true; } cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0; cacheLastColTableNum = (cacheLastCol && pTable->lastCols == NULL) ? 1 : 0; if (cacheLastRowTableNum == 0 && cacheLastColTableNum == 0) { - return 0; + if (!lastKey) + return 0; + onlyKey = true; + cacheLastRowTableNum = 1; } if (tsdbInitReadH(&readh, pRepo) < 0) { @@ -997,7 +1008,7 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { SBlockIdx *pIdx = readh.pBlkIdx; if (pIdx && (cacheLastRowTableNum > 0) && (pTable->lastRow == NULL)) { - if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { + if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx, onlyKey) != 0) { tsdbUnLockFS(REPO_FS(pRepo)); tsdbDestroyReadH(&readh); return -1; @@ -1116,7 +1127,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { pTable->lastKey = pIdx->maxKey; - if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { + if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx, false) != 0) { tsdbDestroyReadH(&readh); return -1; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index b038a0e9b3..5a9879e87c 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -635,7 +635,7 @@ static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) { if (pTable->cacheLastConfigVersion == pRepo->cacheLastConfigVersion) { continue; } - code = tsdbLoadLastCache(pRepo, pTable); + code = tsdbLoadLastCache(pRepo, pTable, false); if (code != 0) { tsdbError("%p uid:%" PRId64 ", tid:%d, failed to load last cache since %s", pQueryHandle, pTable->tableId.uid, pTable->tableId.tid, tstrerror(terrno)); -- GitLab