From 6185cef9fe52a2c0fef06e58cb0bea85c6a44f49 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 24 Mar 2022 18:36:49 +0800 Subject: [PATCH] [TS-238](tsdb): single table del data first ok --- src/inc/taosmsg.h | 1 + src/tsdb/inc/tsdbTruncate.h | 10 +- src/tsdb/src/tsdbCommit.c | 11 - src/tsdb/src/tsdbTruncate.c | 447 ++++++++++++++++-------------------- 4 files changed, 204 insertions(+), 265 deletions(-) diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 9736760ce1..8b289f7a8d 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -1004,6 +1004,7 @@ typedef struct { } STLV; #define CMD_DELETE_DATA 0x00000001 +#define CMD_TRUNCATE 0x00000002 typedef struct SControlData{ uint32_t command; // see define CMD_??? STimeWindow win; diff --git a/src/tsdb/inc/tsdbTruncate.h b/src/tsdb/inc/tsdbTruncate.h index 1b5226930a..9c0db697f5 100644 --- a/src/tsdb/inc/tsdbTruncate.h +++ b/src/tsdb/inc/tsdbTruncate.h @@ -18,6 +18,8 @@ #ifdef __cplusplus extern "C" { #endif + +// SControlData addition information typedef struct { SControlData ctlData; // addition info @@ -25,11 +27,15 @@ typedef struct { int32_t tid; // table id tsem_t* pSem; bool memNull; // pRepo->mem is NULL, this is true + uint64_t* uids; // delete table + int32_t uidCount; SShellSubmitRspMsg *pRsp; } SControlDataInfo; -void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param); -void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param); +// -------- interface --------- + +// delete +int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 2f27604b39..dee5bc86ff 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -1780,17 +1780,6 @@ int tsdbApplyRtn(STsdbRepo *pRepo) { return 0; } -int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo) { - int ret = TSDB_CODE_SUCCESS; - - if(pCtlDataInfo->pRsp) { - pCtlDataInfo->pRsp->affectedRows = htonl(23); - pCtlDataInfo->pRsp->code = ret; - } - - return ret; -} - // do control task int tsdbCommitControl(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo) { int ret = TSDB_CODE_SUCCESS; diff --git a/src/tsdb/src/tsdbTruncate.c b/src/tsdb/src/tsdbTruncate.c index 5e669d9bf6..30bd71e0c9 100644 --- a/src/tsdb/src/tsdbTruncate.c +++ b/src/tsdb/src/tsdbTruncate.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include "tsdbint.h" +#include "tsdbTruncate.h" typedef struct { STable * pTable; @@ -30,43 +31,43 @@ typedef struct { SArray * aBlkIdx; SArray * aSupBlk; SDataCols *pDCols; - void * param; // STruncateTblMsg or SDeleteTblMsg - TSDB_REQ_T type; // truncate or delete + SControlDataInfo* pCtlInfo; } STruncateH; -#define TSDB_TRUNCATE_WSET(pTruncateH) (&((pTruncateH)->wSet)) -#define TSDB_TRUNCATE_REPO(pTruncateH) TSDB_READ_REPO(&((pTruncateH)->readh)) -#define TSDB_TRUNCATE_HEAD_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_HEAD) -#define TSDB_TRUNCATE_DATA_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_DATA) -#define TSDB_TRUNCATE_LAST_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_LAST) -#define TSDB_TRUNCATE_SMAD_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_SMAD) -#define TSDB_TRUNCATE_SMAL_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_SMAL) -#define TSDB_TRUNCATE_BUF(pTruncateH) TSDB_READ_BUF(&((pTruncateH)->readh)) -#define TSDB_TRUNCATE_COMP_BUF(pTruncateH) TSDB_READ_COMP_BUF(&((pTruncateH)->readh)) -#define TSDB_TRUNCATE_EXBUF(pTruncateH) TSDB_READ_EXBUF(&((pTruncateH)->readh)) +#define TSDB_TRUNCATE_WSET(prh) (&((prh)->wSet)) +#define TSDB_TRUNCATE_REPO(prh) TSDB_READ_REPO(&((prh)->readh)) +#define TSDB_TRUNCATE_HEAD_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_HEAD) +#define TSDB_TRUNCATE_DATA_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_DATA) +#define TSDB_TRUNCATE_LAST_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_LAST) +#define TSDB_TRUNCATE_SMAD_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_SMAD) +#define TSDB_TRUNCATE_SMAL_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_SMAL) +#define TSDB_TRUNCATE_BUF(prh) TSDB_READ_BUF(&((prh)->readh)) +#define TSDB_TRUNCATE_COMP_BUF(prh) TSDB_READ_COMP_BUF(&((prh)->readh)) +#define TSDB_TRUNCATE_EXBUF(prh) TSDB_READ_EXBUF(&((prh)->readh)) + -/* -static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type); static void tsdbStartTruncate(STsdbRepo *pRepo); static void tsdbEndTruncate(STsdbRepo *pRepo, int eno); static int tsdbTruncateMeta(STsdbRepo *pRepo); -static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type); -//static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet); -//static int tsdbDeleteFSet(STruncateH *pTruncateH, SDFileSet *pSet); -static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo); -static void tsdbDestroyTruncateH(STruncateH *pTruncateH); -static int tsdbInitTruncateTblArray(STruncateH *pTruncateH); -static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH); -static int tsdbCacheFSetIndex(STruncateH *pTruncateH); +static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo); +static int tsdbTruncateFSet(STruncateH *prh, SDFileSet *pSet); +static int tsdbDeleteFSet(STruncateH *prh, SDFileSet *pSet); +static int tsdbInitTruncateH(STruncateH *prh, STsdbRepo *pRepo); +static void tsdbDestroyTruncateH(STruncateH *prh); +static int tsdbInitTruncateTblArray(STruncateH *prh); +static void tsdbDestroyTruncateTblArray(STruncateH *prh); +static int tsdbCacheFSetIndex(STruncateH *prh); static int tsdbTruncateCache(STsdbRepo *pRepo, void *param); -static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet); -static void tsdbTruncateFSetEnd(STruncateH *pTruncateH); -static int tsdbTruncateFSetImpl(STruncateH *pTruncateH); -static int tsdbDeleteFSetImpl(STruncateH *pTruncateH); -static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock); -static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf, +static int tsdbTruncateFSetInit(STruncateH *prh, SDFileSet *pSet); +static void tsdbTruncateFSetEnd(STruncateH *prh); +static int tsdbTruncateFSetImpl(STruncateH *prh); +static int tsdbDeleteFSetImpl(STruncateH *prh); +static bool tsdbBlockInterleaved(STruncateH *prh, SBlock *pBlock); +static int tsdbWriteBlockToRightFile(STruncateH *prh, STable *pTable, SDataCols *pDCols, void **ppBuf, void **ppCBuf, void **ppExBuf); -//static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type); +static int tsdbTruncateImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo); + + enum { TSDB_NO_TRUNCATE, @@ -74,28 +75,26 @@ enum { TSDB_WAITING_TRUNCATE, }; -int tsdbTruncateTbl(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, CONTROL_REQ); } -int tsdbDeleteData(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, CONTROL_REQ); } +// delete +int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlInfo) { + int ret = TSDB_CODE_SUCCESS; -void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) { - //tsdbTruncateImplCommon(pRepo, param, TRUNCATE_TBL_REQ); - return NULL; -} -void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param) { - //tsdbTruncateImplCommon(pRepo, param, DELETE_TBL_REQ); - return NULL; -} + if(pCtlInfo->pRsp) { + pCtlInfo->pRsp->affectedRows = htonl(23); + pCtlInfo->pRsp->code = ret; + } + return tsdbTruncateImplCommon(pRepo, pCtlInfo); +} -static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { - ASSERT(param != NULL); +static int tsdbTruncateImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { int32_t code = 0; // Step 1: check and clear cache - if ((code = tsdbTruncateCache(pRepo, param)) != 0) { + if ((code = tsdbTruncateCache(pRepo, pCtlInfo)) != 0) { pRepo->code = terrno; tsem_post(&(pRepo->readyToCommit)); tsdbInfo("vgId:%d failed to truncate since %s", REPO_ID(pRepo), tstrerror(terrno)); - return NULL; + return -1; } // Step 2: truncate and rebuild DFileSets @@ -104,7 +103,7 @@ static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T ty pRepo->truncateState = TSDB_NO_TRUNCATE; tsem_post(&(pRepo->readyToCommit)); tsdbInfo("vgId:%d truncate over, no meta or data file", REPO_ID(pRepo)); - return NULL; + return -1; } tsdbStartTruncate(pRepo); @@ -114,18 +113,18 @@ static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T ty goto _err; } - if (tsdbTruncateTSData(pRepo, param, type) < 0) { + if (tsdbTruncateTSData(pRepo, pCtlInfo) < 0) { tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } tsdbEndTruncate(pRepo, TSDB_CODE_SUCCESS); - return NULL; + return TSDB_CODE_SUCCESS; _err: pRepo->code = terrno; tsdbEndTruncate(pRepo, terrno); - return NULL; + return -1; } static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) { @@ -146,28 +145,6 @@ static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) { return 0; } -static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { - // avoid repeated input of commands by end users in a short period of time - if (pRepo->truncateState != TSDB_NO_TRUNCATE) { - tsdbInfo("vgId:%d retry later as tsdb in truncating state", REPO_ID(pRepo)); - return -1; - } - pRepo->truncateState = TSDB_WAITING_TRUNCATE; - - // flush the mem data to disk synchronously(have impact on the compression rate) - if (tsdbSyncCommit(pRepo) < 0) { - return -1; - } - - // truncate - tsem_wait(&(pRepo->readyToCommit)); - int code = tsdbScheduleCommit(pRepo, param, type); - if (code < 0) { - tsem_post(&(pRepo->readyToCommit)); - } - return code; -} - static void tsdbStartTruncate(STsdbRepo *pRepo) { assert(pRepo->truncateState != TSDB_IN_TRUNCATE); tsdbInfo("vgId:%d start to truncate!", REPO_ID(pRepo)); @@ -193,23 +170,22 @@ static int tsdbTruncateMeta(STsdbRepo *pRepo) { return 0; } -static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { +static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { STsdbCfg * pCfg = REPO_CFG(pRepo); STruncateH truncateH = {0}; SDFileSet * pSet = NULL; - STruncateTblMsg *pMsg = (STruncateTblMsg *)param; - tsdbDebug("vgId:%d start to truncate TS data for %" PRIu64, REPO_ID(pRepo), pMsg->uid); + tsdbDebug("vgId:%d start to truncate TS data for %" PRIu64, REPO_ID(pRepo), pCtlInfo->uid); if (tsdbInitTruncateH(&truncateH, pRepo) < 0) { return -1; } - truncateH.param = pMsg; - truncateH.type = type; + truncateH.pCtlInfo = pCtlInfo; + STimeWindow win = pCtlInfo->ctlData.win; - int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision); - int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision); + int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision); + int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision); ASSERT(sFid <= eFid); while ((pSet = tsdbFSIterNext(&(truncateH.fsIter)))) { @@ -236,13 +212,13 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { } #endif - if (truncateH.type == TRUNCATE_TBL_REQ) { + if (pCtlInfo->ctlData.command == CMD_TRUNCATE) { if (tsdbTruncateFSet(&truncateH, pSet) < 0) { tsdbDestroyTruncateH(&truncateH); tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); return -1; } - } else if (truncateH.type == DELETE_TBL_REQ) { + } else if (pCtlInfo->ctlData.command == CMD_DELETE_DATA) { if (tsdbDeleteFSet(&truncateH, pSet) < 0) { tsdbDestroyTruncateH(&truncateH); tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); @@ -259,68 +235,68 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { return 0; } -static int tsdbDeleteFSet(STruncateH *pTruncateH, SDFileSet *pSet) { - STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); +static int tsdbDeleteFSet(STruncateH *prh, SDFileSet *pSet) { + STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh); SDiskID did = {0}; tsdbDebug("vgId:%d start to truncate data in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); - if (tsdbTruncateFSetInit(pTruncateH, pSet) < 0) { + if (tsdbTruncateFSetInit(prh, pSet) < 0) { return -1; } // Create new fset as deleted fset - tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id)); + tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(prh->rtn)), &(did.level), &(did.id)); if (did.level == TFS_UNDECIDED_LEVEL) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); - tsdbTruncateFSetEnd(pTruncateH); + tsdbTruncateFSetEnd(prh); return -1; } - tsdbInitDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), + tsdbInitDFileSet(TSDB_TRUNCATE_WSET(prh), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER); - if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), true) < 0) { + if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(prh), true) < 0) { tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); - tsdbTruncateFSetEnd(pTruncateH); + tsdbTruncateFSetEnd(prh); return -1; } - if (tsdbDeleteFSetImpl(pTruncateH) < 0) { - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH)); - tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(pTruncateH)); - tsdbTruncateFSetEnd(pTruncateH); + if (tsdbDeleteFSetImpl(prh) < 0) { + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); + tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(prh)); + tsdbTruncateFSetEnd(prh); return -1; } - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH)); - tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(pTruncateH)); + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); + tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(prh)); tsdbDebug("vgId:%d FSET %d truncate data over", REPO_ID(pRepo), pSet->fid); - tsdbTruncateFSetEnd(pTruncateH); + tsdbTruncateFSetEnd(prh); return 0; } -static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) { - STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); +static int tsdbTruncateFSet(STruncateH *prh, SDFileSet *pSet) { + STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh); SDiskID did = {0}; - SDFileSet *pWSet = TSDB_TRUNCATE_WSET(pTruncateH); + SDFileSet *pWSet = TSDB_TRUNCATE_WSET(prh); tsdbDebug("vgId:%d start to truncate table in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); - if (tsdbTruncateFSetInit(pTruncateH, pSet) < 0) { + if (tsdbTruncateFSetInit(prh, pSet) < 0) { return -1; } // Create new fset as truncated fset - tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id)); + tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(prh->rtn)), &(did.level), &(did.id)); if (did.level == TFS_UNDECIDED_LEVEL) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); - tsdbTruncateFSetEnd(pTruncateH); + tsdbTruncateFSetEnd(prh); return -1; } @@ -344,81 +320,81 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) { return -1; } - if (tsdbTruncateFSetImpl(pTruncateH) < 0) { - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH)); - tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(pTruncateH)); - tsdbTruncateFSetEnd(pTruncateH); + if (tsdbTruncateFSetImpl(prh) < 0) { + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); + tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(prh)); + tsdbTruncateFSetEnd(prh); return -1; } - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH)); - tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(pTruncateH)); + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); + tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(prh)); tsdbDebug("vgId:%d FSET %d truncate table over", REPO_ID(pRepo), pSet->fid); - tsdbTruncateFSetEnd(pTruncateH); + tsdbTruncateFSetEnd(prh); return 0; } -static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) { +static int tsdbInitTruncateH(STruncateH *prh, STsdbRepo *pRepo) { STsdbCfg *pCfg = REPO_CFG(pRepo); - memset(pTruncateH, 0, sizeof(*pTruncateH)); + memset(prh, 0, sizeof(*prh)); - TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(pTruncateH)); + TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(prh)); - tsdbGetRtnSnap(pRepo, &(pTruncateH->rtn)); - tsdbFSIterInit(&(pTruncateH->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); + tsdbGetRtnSnap(pRepo, &(prh->rtn)); + tsdbFSIterInit(&(prh->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); - if (tsdbInitReadH(&(pTruncateH->readh), pRepo) < 0) { + if (tsdbInitReadH(&(prh->readh), pRepo) < 0) { return -1; } - if (tsdbInitTruncateTblArray(pTruncateH) < 0) { - tsdbDestroyTruncateH(pTruncateH); + if (tsdbInitTruncateTblArray(prh) < 0) { + tsdbDestroyTruncateH(prh); return -1; } - pTruncateH->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); - if (pTruncateH->aBlkIdx == NULL) { + prh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); + if (prh->aBlkIdx == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyTruncateH(pTruncateH); + tsdbDestroyTruncateH(prh); return -1; } - pTruncateH->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); - if (pTruncateH->aSupBlk == NULL) { + prh->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); + if (prh->aSupBlk == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyTruncateH(pTruncateH); + tsdbDestroyTruncateH(prh); return -1; } - pTruncateH->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); - if (pTruncateH->pDCols == NULL) { + prh->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); + if (prh->pDCols == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyTruncateH(pTruncateH); + tsdbDestroyTruncateH(prh); return -1; } return 0; } -static void tsdbDestroyTruncateH(STruncateH *pTruncateH) { - pTruncateH->pDCols = tdFreeDataCols(pTruncateH->pDCols); - pTruncateH->aSupBlk = taosArrayDestroy(&pTruncateH->aSupBlk); - pTruncateH->aBlkIdx = taosArrayDestroy(&pTruncateH->aBlkIdx); - tsdbDestroyTruncateTblArray(pTruncateH); - tsdbDestroyReadH(&(pTruncateH->readh)); - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH)); +static void tsdbDestroyTruncateH(STruncateH *prh) { + prh->pDCols = tdFreeDataCols(prh->pDCols); + prh->aSupBlk = taosArrayDestroy(&prh->aSupBlk); + prh->aBlkIdx = taosArrayDestroy(&prh->aBlkIdx); + tsdbDestroyTruncateTblArray(prh); + tsdbDestroyReadH(&(prh->readh)); + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); } -static int tsdbInitTruncateTblArray(STruncateH *pTruncateH) { - STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); +static int tsdbInitTruncateTblArray(STruncateH *prh) { + STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh); STsdbMeta *pMeta = pRepo->tsdbMeta; if (tsdbRLockRepoMeta(pRepo) < 0) return -1; - pTruncateH->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH)); - if (pTruncateH->tblArray == NULL) { + prh->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH)); + if (prh->tblArray == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbUnlockRepoMeta(pRepo); return -1; @@ -432,7 +408,7 @@ static int tsdbInitTruncateTblArray(STruncateH *pTruncateH) { ch.pTable = pMeta->tables[i]; } - if (taosArrayPush(pTruncateH->tblArray, &ch) == NULL) { + if (taosArrayPush(prh->tblArray, &ch) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbUnlockRepoMeta(pRepo); return -1; @@ -443,46 +419,46 @@ static int tsdbInitTruncateTblArray(STruncateH *pTruncateH) { return 0; } -static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH) { - STableTruncateH *pTblHandle = NULL; +static void tsdbDestroyTruncateTblArray(STruncateH *prh) { + STableTruncateH *pHandle = NULL; - if (pTruncateH->tblArray == NULL) return; + if (prh->tblArray == NULL) return; - for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tblArray); ++i) { - pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, i); - if (pTblHandle->pTable) { - tsdbUnRefTable(pTblHandle->pTable); + for (size_t i = 0; i < taosArrayGetSize(prh->tblArray); ++i) { + pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, i); + if (pHandle->pTable) { + tsdbUnRefTable(pHandle->pTable); } - tfree(pTblHandle->pInfo); + tfree(pHandle->pInfo); } - pTruncateH->tblArray = taosArrayDestroy(&pTruncateH->tblArray); + prh->tblArray = taosArrayDestroy(&prh->tblArray); } -static int tsdbCacheFSetIndex(STruncateH *pTruncateH) { - SReadH *pReadH = &(pTruncateH->readh); +static int tsdbCacheFSetIndex(STruncateH *prh) { + SReadH *pReadH = &(prh->readh); if (tsdbLoadBlockIdx(pReadH) < 0) { return -1; } - size_t tblArraySize = taosArrayGetSize(pTruncateH->tblArray); + size_t tblArraySize = taosArrayGetSize(prh->tblArray); for (size_t tid = 1; tid < tblArraySize; ++tid) { - STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid); - pTblHandle->pBlkIdx = NULL; + STableTruncateH *pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, tid); + pHandle->pBlkIdx = NULL; - if (pTblHandle->pTable == NULL) continue; - if (tsdbSetReadTable(pReadH, pTblHandle->pTable) < 0) { + if (pHandle->pTable == NULL) continue; + if (tsdbSetReadTable(pReadH, pHandle->pTable) < 0) { return -1; } if (pReadH->pBlkIdx == NULL) continue; - pTblHandle->bIndex = *(pReadH->pBlkIdx); - pTblHandle->pBlkIdx = &(pTblHandle->bIndex); + pHandle->bIndex = *(pReadH->pBlkIdx); + pHandle->pBlkIdx = &(pHandle->bIndex); uint32_t originLen = 0; - if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTblHandle->pInfo)), &originLen) < 0) { + if (tsdbLoadBlockInfo(pReadH, (void **)(&(pHandle->pInfo)), &originLen) < 0) { return -1; } } @@ -490,26 +466,26 @@ static int tsdbCacheFSetIndex(STruncateH *pTruncateH) { return 0; } -static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet) { - taosArrayClear(pTruncateH->aBlkIdx); - taosArrayClear(pTruncateH->aSupBlk); +static int tsdbTruncateFSetInit(STruncateH *prh, SDFileSet *pSet) { + taosArrayClear(prh->aBlkIdx); + taosArrayClear(prh->aSupBlk); - if (tsdbSetAndOpenReadFSet(&(pTruncateH->readh), pSet) < 0) { + if (tsdbSetAndOpenReadFSet(&(prh->readh), pSet) < 0) { return -1; } - if (tsdbCacheFSetIndex(pTruncateH) < 0) { - tsdbCloseAndUnsetFSet(&(pTruncateH->readh)); + if (tsdbCacheFSetIndex(prh) < 0) { + tsdbCloseAndUnsetFSet(&(prh->readh)); return -1; } return 0; } -static void tsdbTruncateFSetEnd(STruncateH *pTruncateH) { tsdbCloseAndUnsetFSet(&(pTruncateH->readh)); } +static void tsdbTruncateFSetEnd(STruncateH *prh) { tsdbCloseAndUnsetFSet(&(prh->readh)); } -static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock) { - // STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param; +static bool tsdbBlockInterleaved(STruncateH *prh, SBlock *pBlock) { + // STruncateTblMsg *pMsg = (STruncateTblMsg *)prh->param; // for (uint16_t i = 0; i < pMsg->nSpan; ++i) { // STimeWindow tw = pMsg->span[i]; // if (!(pBlock->keyFirst > tw.ekey || pBlock->keyLast < tw.skey)) { @@ -520,9 +496,9 @@ static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock) { return true; } -static int32_t tsdbFilterDataCols(STruncateH *pTruncateH, SDataCols *pSrcDCols) { - STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param; - SDataCols * pDstDCols = pTruncateH->pDCols; +static int32_t tsdbFilterDataCols(STruncateH *prh, SDataCols *pSrcDCols) { + SDataCols * pDstDCols = prh->pDCols; + SControlData* pCtlData = &prh->pCtlInfo->ctlData; tdResetDataCols(pDstDCols); pDstDCols->maxCols = pSrcDCols->maxCols; @@ -532,7 +508,7 @@ static int32_t tsdbFilterDataCols(STruncateH *pTruncateH, SDataCols *pSrcDCols) for (int i = 0; i < pSrcDCols->numOfRows; ++i) { int64_t tsKey = *(int64_t *)tdGetColDataOfRow(pSrcDCols->cols, i); - if ((tsKey >= pMsg->span[0].skey) && (tsKey <= pMsg->span[0].ekey)) { + if ((tsKey >= pCtlData->win.skey) && (tsKey <= pCtlData->win.ekey)) { printf("tsKey %" PRId64 " is filtered\n", tsKey); continue; } @@ -548,39 +524,38 @@ static int32_t tsdbFilterDataCols(STruncateH *pTruncateH, SDataCols *pSrcDCols) return 0; } -static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) { - STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH); - STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param; - // SReadH * pReadh = &(pTruncateH->readh); +static int tsdbTruncateFSetImpl(STruncateH *prh) { + STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(prh); + // SReadH * pReadh = &(prh->readh); SBlockIdx * pBlkIdx = NULL; - void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH)); - // void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH)); - // void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH)); + void ** ppBuf = &(TSDB_TRUNCATE_BUF(prh)); + // void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(prh)); + // void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(prh)); - taosArrayClear(pTruncateH->aBlkIdx); + taosArrayClear(prh->aBlkIdx); - for (size_t tid = 1; tid < taosArrayGetSize(pTruncateH->tblArray); ++tid) { - STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid); - pBlkIdx = pTblHandle->pBlkIdx; + for (size_t tid = 1; tid < taosArrayGetSize(prh->tblArray); ++tid) { + STableTruncateH *pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, tid); + pBlkIdx = pHandle->pBlkIdx; - if (pTblHandle->pTable == NULL || pTblHandle->pBlkIdx == NULL) continue; + if (pHandle->pTable == NULL || pHandle->pBlkIdx == NULL) continue; - taosArrayClear(pTruncateH->aSupBlk); + taosArrayClear(prh->aSupBlk); - uint64_t uid = pTblHandle->pTable->tableId.uid; + uint64_t uid = pHandle->pTable->tableId.uid; - if (uid != pMsg->uid) { - if ((pBlkIdx->numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (const void *)(pBlkIdx)) == NULL)) { + if (uid != prh->pCtlInfo->uid) { + if ((pBlkIdx->numOfBlocks > 0) && (taosArrayPush(prh->aBlkIdx, (const void *)(pBlkIdx)) == NULL)) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } } else { // Loop to mark delete flag for each block data tsdbDebug("vgId:%d uid %" PRIu64 " matched to truncate", REPO_ID(pRepo), uid); - // for (int i = 0; i < pTblHandle->pBlkIdx->numOfBlocks; ++i) { - // SBlock *pBlock = pTblHandle->pInfo->blocks + i; + // for (int i = 0; i < pHandle->pBlkIdx->numOfBlocks; ++i) { + // SBlock *pBlock = pHandle->pInfo->blocks + i; - // if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDCols, ppBuf, ppCBuf, ppExBuf) < + // if (tsdbWriteBlockToRightFile(prh, pHandle->pTable, prh->pDCols, ppBuf, ppCBuf, ppExBuf) < // 0) { // return -1; // } @@ -588,44 +563,43 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) { } } - if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTruncateH->aBlkIdx, ppBuf) < 0) { + if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(prh), prh->aBlkIdx, ppBuf) < 0) { return -1; } return 0; } -static int tsdbDeleteFSetImpl(STruncateH *pTruncateH) { - STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH); - STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param; +static int tsdbDeleteFSetImpl(STruncateH *prh) { + STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(prh); // STsdbCfg * pCfg = REPO_CFG(pRepo); - SReadH * pReadh = &(pTruncateH->readh); + SReadH * pReadh = &(prh->readh); SBlockIdx blkIdx = {0}; - void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH)); - void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH)); - void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH)); + void ** ppBuf = &(TSDB_TRUNCATE_BUF(prh)); + void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(prh)); + void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(prh)); // int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); - taosArrayClear(pTruncateH->aBlkIdx); + taosArrayClear(prh->aBlkIdx); - for (size_t tid = 1; tid < taosArrayGetSize(pTruncateH->tblArray); ++tid) { - STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid); + for (size_t tid = 1; tid < taosArrayGetSize(prh->tblArray); ++tid) { + STableTruncateH *pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, tid); STSchema * pSchema = NULL; - if (pTblHandle->pTable == NULL || pTblHandle->pBlkIdx == NULL) continue; + if (pHandle->pTable == NULL || pHandle->pBlkIdx == NULL) continue; - if ((pSchema = tsdbGetTableSchemaImpl(pTblHandle->pTable, true, true, -1, -1)) == NULL) { + if ((pSchema = tsdbGetTableSchemaImpl(pHandle->pTable, true, true, -1, -1)) == NULL) { return -1; } - taosArrayClear(pTruncateH->aSupBlk); + taosArrayClear(prh->aSupBlk); - uint64_t uid = pTblHandle->pTable->tableId.uid; + uint64_t uid = pHandle->pTable->tableId.uid; // if(uid != pMsg->uid) { // TODO: copy the block data directly // } - if ((tdInitDataCols(pTruncateH->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) || + if ((tdInitDataCols(prh->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tdFreeSchema(pSchema); @@ -635,57 +609,57 @@ static int tsdbDeleteFSetImpl(STruncateH *pTruncateH) { tdFreeSchema(pSchema); // Loop to truncate each block data - for (int i = 0; i < pTblHandle->pBlkIdx->numOfBlocks; ++i) { - SBlock *pBlock = pTblHandle->pInfo->blocks + i; + for (int i = 0; i < pHandle->pBlkIdx->numOfBlocks; ++i) { + SBlock *pBlock = pHandle->pInfo->blocks + i; // Copy the Blocks directly if TS is not interleaved. - if (!tsdbBlockInterleaved(pTruncateH, pBlock)) { + if (!tsdbBlockInterleaved(prh, pBlock)) { // tsdbWriteBlockAndDataToFile(); continue; } // Otherwise load the block data and copy the specific rows. - if (tsdbLoadBlockData(pReadh, pBlock, pTblHandle->pInfo) < 0) { + if (tsdbLoadBlockData(pReadh, pBlock, pHandle->pInfo) < 0) { return -1; } - if (uid == pMsg->uid) { - tsdbFilterDataCols(pTruncateH, pReadh->pDCols[0]); + if (uid == prh->pCtlInfo->uid) { + tsdbFilterDataCols(prh, pReadh->pDCols[0]); tsdbDebug("vgId:%d uid %" PRIu64 " matched, filter block data from rows %d to %d rows", REPO_ID(pRepo), uid, - pReadh->pDCols[0]->numOfRows, pTruncateH->pDCols->numOfRows); - if (pTruncateH->pDCols->numOfRows <= 0) continue; + pReadh->pDCols[0]->numOfRows, prh->pDCols->numOfRows); + if (prh->pDCols->numOfRows <= 0) continue; - if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDCols, ppBuf, ppCBuf, ppExBuf) < 0) { + if (tsdbWriteBlockToRightFile(prh, pHandle->pTable, prh->pDCols, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } } else { tsdbDebug("vgId:%d uid %" PRIu64 " not matched, copy block data directly\n", REPO_ID(pRepo), uid); - if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) { + if (tsdbWriteBlockToRightFile(prh, pHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } } } - if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTblHandle->pTable, pTruncateH->aSupBlk, NULL, + if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(prh), pHandle->pTable, prh->aSupBlk, NULL, ppBuf, &blkIdx) < 0) { return -1; } - if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (const void *)(&blkIdx)) == NULL)) { + if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(prh->aBlkIdx, (const void *)(&blkIdx)) == NULL)) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } } - if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTruncateH->aBlkIdx, ppBuf) < 0) { + if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(prh), prh->aBlkIdx, ppBuf) < 0) { return -1; } return 0; } -static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf, +static int tsdbWriteBlockToRightFile(STruncateH *prh, STable *pTable, SDataCols *pDCols, void **ppBuf, void **ppCBuf, void **ppExBuf) { - STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); + STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh); STsdbCfg * pCfg = REPO_CFG(pRepo); SDFile * pDFile = NULL; bool isLast = false; @@ -694,54 +668,23 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa ASSERT(pDCols->numOfRows > 0); if (pDCols->numOfRows < pCfg->minRowsPerFileBlock) { - pDFile = TSDB_TRUNCATE_LAST_FILE(pTruncateH); + pDFile = TSDB_TRUNCATE_LAST_FILE(prh); isLast = true; } else { - pDFile = TSDB_TRUNCATE_DATA_FILE(pTruncateH); + pDFile = TSDB_TRUNCATE_DATA_FILE(prh); isLast = false; } if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, - isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH), pDCols, + isLast ? TSDB_TRUNCATE_SMAL_FILE(prh) : TSDB_TRUNCATE_SMAD_FILE(prh), pDCols, &block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } - if (taosArrayPush(pTruncateH->aSupBlk, (void *)(&block)) == NULL) { + if (taosArrayPush(prh->aSupBlk, (void *)(&block)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } return 0; } - -// static int tsdbWriteBlockAndDataToFile(STruncateH *pTruncateH, STable *pTable, SBlock *pSupBlock, void **ppBuf, -// void **ppCBuf, void **ppExBuf) { -// STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); -// SDFile * pDFile = NULL; -// bool isLast = false; - -// ASSERT(pSupBlock->numOfRows > 0); - -// if (pSupBlock->last) { -// pDFile = TSDB_TRUNCATE_LAST_FILE(pTruncateH); -// isLast = true; -// } else { -// pDFile = TSDB_TRUNCATE_DATA_FILE(pTruncateH); -// isLast = false; -// } - -// if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, -// isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH), -// pDCols, &block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) { -// return -1; -// } - -// if (taosArrayPush(pTruncateH->aSupBlk, (void *)(&block)) == NULL) { -// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; -// return -1; -// } - -// return 0; -// } -*/ \ No newline at end of file -- GitLab