From 15be8ab1a129ff871248d51ec807c45fc56014d2 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 23 Nov 2021 19:26:25 +0800 Subject: [PATCH] support delete --- src/dnode/src/dnodeVMgmt.c | 6 +++- src/inc/taosmsg.h | 8 +++++ src/inc/vnode.h | 3 +- src/tsdb/inc/tsdbCommitQueue.h | 8 ++++- src/tsdb/inc/tsdbReadImpl.h | 7 ++-- src/tsdb/inc/tsdbTruncate.h | 1 + src/tsdb/src/tsdbCommitQueue.c | 2 ++ src/tsdb/src/tsdbReadImpl.c | 2 ++ src/tsdb/src/tsdbTruncate.c | 62 +++++++++++++++++++++------------- src/vnode/src/vnodeMain.c | 31 ++++++++++++++++- 10 files changed, 99 insertions(+), 31 deletions(-) diff --git a/src/dnode/src/dnodeVMgmt.c b/src/dnode/src/dnodeVMgmt.c index 4fae882d3a..5ff96e9adf 100644 --- a/src/dnode/src/dnodeVMgmt.c +++ b/src/dnode/src/dnodeVMgmt.c @@ -198,9 +198,13 @@ static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) { return vnodeCompact(pCompactVnode->vgId); } #else +// static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) { +// STruncateTblMsg *pTruncateMsg = rpcMsg->pCont; +// return vnodeTruncateTbl(pTruncateMsg); +// } static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) { STruncateTblMsg *pTruncateMsg = rpcMsg->pCont; - return vnodeTruncate(pTruncateMsg); + return vnodeTruncateTbl(pTruncateMsg); } #endif diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 13956b1920..82b270ee83 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -410,6 +410,14 @@ typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; STimeWindow span[]; } STruncateTblMsg; +typedef struct { + int32_t contLen; + int32_t vgId; + uint64_t uid; + uint16_t nSpan; + char tableFname[TSDB_TABLE_FNAME_LEN]; + STimeWindow span[]; +} SDeleteTblMsg; // N.B. JUST Utility for DEMO Implementation(not formal definition) typedef struct SColIndex { int16_t colId; // column id diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 2f74dd8e54..bb45f4cc1d 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -63,7 +63,8 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeSync(int32_t vgId); int32_t vnodeClose(int32_t vgId); int32_t vnodeCompact(int32_t vgId); -int32_t vnodeTruncate(STruncateTblMsg *pMsg); +int32_t vnodeTruncateTbl(STruncateTblMsg *pMsg); +int32_t vnodeDeleteTbl(SDeleteTblMsg *pMsg); // vnodeMgmt int32_t vnodeInitMgmt(); diff --git a/src/tsdb/inc/tsdbCommitQueue.h b/src/tsdb/inc/tsdbCommitQueue.h index 579499ef88..898a8f3f5f 100644 --- a/src/tsdb/inc/tsdbCommitQueue.h +++ b/src/tsdb/inc/tsdbCommitQueue.h @@ -16,7 +16,13 @@ #ifndef _TD_TSDB_COMMIT_QUEUE_H_ #define _TD_TSDB_COMMIT_QUEUE_H_ -typedef enum { COMMIT_REQ, COMPACT_REQ, TRUNCATE_REQ, COMMIT_CONFIG_REQ } TSDB_REQ_T; +typedef enum { + COMMIT_REQ, + COMPACT_REQ, + TRUNCATE_REQ, + DELETE_REQ, + COMMIT_CONFIG_REQ, +} TSDB_REQ_T; int tsdbScheduleCommit(STsdbRepo *pRepo, void* param, TSDB_REQ_T req); diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index 20d8b88c83..ac3576cc8e 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -153,9 +153,10 @@ static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) { } typedef struct { - int32_t delimiter; // For recovery usage - int32_t numOfCols; // For recovery usage - uint64_t uid; // For recovery usage + int32_t delimiter; // For recovery usage + int32_t delete : 1; // For recovery usage(not included when calculating checksum) + int32_t numOfCols : 31; // For recovery usage + uint64_t uid; // For recovery usage SBlockCol cols[]; } SBlockData; diff --git a/src/tsdb/inc/tsdbTruncate.h b/src/tsdb/inc/tsdbTruncate.h index 91d82a5264..1c4a08e774 100644 --- a/src/tsdb/inc/tsdbTruncate.h +++ b/src/tsdb/inc/tsdbTruncate.h @@ -20,6 +20,7 @@ extern "C" { #endif void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param); +void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index b64c5c44cd..7d6e5f3038 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -194,6 +194,8 @@ static void *tsdbLoopCommit(void *arg) { tsdbCompactImpl(pRepo); } else if (req == TRUNCATE_REQ) { tsdbTruncateImpl(pRepo, param); + } else if (req == DELETE_REQ) { + tsdbDeleteImpl(pRepo, param); } else if (req == COMMIT_CONFIG_REQ) { ASSERT(pRepo->config_changed); tsdbApplyRepoConfig(pRepo); diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 98fac3df77..bfe0a7460f 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -651,6 +651,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat return -1; } + pBlockData->delete = 0; // ignore delete flag + int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer); if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; diff --git a/src/tsdb/src/tsdbTruncate.c b/src/tsdb/src/tsdbTruncate.c index 82ab2a20c0..c07c6e3af3 100644 --- a/src/tsdb/src/tsdbTruncate.c +++ b/src/tsdb/src/tsdbTruncate.c @@ -30,7 +30,8 @@ typedef struct { SArray * aBlkIdx; SArray * aSupBlk; SDataCols *pDCols; - void * param; // STruncateTblMsg *pMsg + void * param; // STruncateTblMsg or SDeleteTblMsg + TSDB_REQ_T type; // truncate or delete } STruncateH; #define TSDB_TRUNCATE_WSET(pTruncateH) (&((pTruncateH)->wSet)) @@ -44,24 +45,25 @@ typedef struct { #define TSDB_TRUNCATE_COMP_BUF(pTruncateH) TSDB_READ_COMP_BUF(&((pTruncateH)->readh)) #define TSDB_TRUNCATE_EXBUF(pTruncateH) TSDB_READ_EXBUF(&((pTruncateH)->readh)) -static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param); -static void tsdbStartTruncate(STsdbRepo *pRepo); -static void tsdbEndTruncate(STsdbRepo *pRepo, int eno); -static int tsdbTruncateMeta(STsdbRepo *pRepo); -static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param); -static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet); -static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo); -static void tsdbDestroyTruncateH(STruncateH *pTruncateH); -static int tsdbInitTruncateTblArray(STruncateH *pTruncateH); -static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH); -static int tsdbCacheFSetIndex(STruncateH *pTruncateH); -static int tsdbTruncateCache(STsdbRepo *pRepo, void *param); -static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet); -static void tsdbTruncateFSetEnd(STruncateH *pTruncateH); -static int tsdbTruncateFSetImpl(STruncateH *pTruncateH); -static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock); -static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf, - void **ppCBuf, void **ppExBuf); +static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type); +static void tsdbStartTruncate(STsdbRepo *pRepo); +static void tsdbEndTruncate(STsdbRepo *pRepo, int eno); +static int tsdbTruncateMeta(STsdbRepo *pRepo); +static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type); +static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet); +static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo); +static void tsdbDestroyTruncateH(STruncateH *pTruncateH); +static int tsdbInitTruncateTblArray(STruncateH *pTruncateH); +static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH); +static int tsdbCacheFSetIndex(STruncateH *pTruncateH); +static int tsdbTruncateCache(STsdbRepo *pRepo, void *param); +static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet); +static void tsdbTruncateFSetEnd(STruncateH *pTruncateH); +static int tsdbTruncateFSetImpl(STruncateH *pTruncateH); +static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock); +static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf, + void **ppCBuf, void **ppExBuf); +static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type); enum { TSDB_NO_TRUNCATE, @@ -69,9 +71,19 @@ enum { TSDB_WAITING_TRUNCATE, }; -int tsdbTruncate(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param); } +int tsdbTruncate(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, TRUNCATE_REQ); } +int tsdbDelete(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, DELETE_REQ); } void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) { + tsdbTruncateImplCommon(pRepo, param, TRUNCATE_REQ); + return NULL; +} +void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param) { + tsdbTruncateImplCommon(pRepo, param, DELETE_REQ); + return NULL; +} + +static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { ASSERT(param != NULL); int32_t code = 0; // Step 1: check and clear cache @@ -98,7 +110,7 @@ void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) { goto _err; } - if (tsdbTruncateTSData(pRepo, param) < 0) { + if (tsdbTruncateTSData(pRepo, param, type) < 0) { tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } @@ -130,7 +142,7 @@ static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) { return 0; } -static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param) { +static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { // avoid repeated input of commands by end users in a short period of time if (pRepo->truncateState != TSDB_NO_TRUNCATE) { tsdbInfo("vgId:%d retry later as tsdb in truncating state", REPO_ID(pRepo)); @@ -145,7 +157,7 @@ static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param) { // truncate tsem_wait(&(pRepo->readyToCommit)); - int code = tsdbScheduleCommit(pRepo, param, TRUNCATE_REQ); + int code = tsdbScheduleCommit(pRepo, param, type); if (code < 0) { tsem_post(&(pRepo->readyToCommit)); } @@ -177,7 +189,7 @@ static int tsdbTruncateMeta(STsdbRepo *pRepo) { return 0; } -static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) { +static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { STsdbCfg * pCfg = REPO_CFG(pRepo); STruncateH truncateH = {0}; SDFileSet * pSet = NULL; @@ -188,7 +200,9 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) { if (tsdbInitTruncateH(&truncateH, pRepo) < 0) { return -1; } + truncateH.param = pMsg; + truncateH.type = type; int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision); int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index d5d4cc4c2e..9e38add752 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -129,7 +129,36 @@ int32_t vnodeCompact(int32_t vgId) { return TSDB_CODE_SUCCESS; } -int32_t vnodeTruncate(STruncateTblMsg *pMsg) { +int32_t vnodeTruncateTbl(STruncateTblMsg *pMsg) { + // build test data + // pMsg->vgId = 2; + // pMsg->uid = 562949986978701; + // pMsg->nSpan = 1; + // pMsg->span = malloc(pMsg->nSpan * sizeof(STimeWindow)); + + int32_t vgId = 2; + void * pVnode = vnodeAcquire(vgId); + if (pVnode != NULL) { + vDebug("vgId:%d, truncate table %s msg is received", vgId, pMsg->tableFname); + // not care success or not + STruncateTblMsg *param = (STruncateTblMsg *)calloc(1, sizeof(STruncateTblMsg) + pMsg->nSpan * sizeof(STimeWindow)); + param->vgId = 2; + param->uid = 562949986979009; + param->nSpan = 1; + param->span[0].skey = 1634701320001; + param->span[0].ekey = 1634701320001; + if (tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param) < 0) { + tfree(param); + } + vnodeRelease(pVnode); + } else { + vInfo("vgId:%d, vnode not exist, can't truncate table %s in it", vgId, pMsg->tableFname); + return TSDB_CODE_VND_INVALID_VGROUP_ID; + } + return TSDB_CODE_SUCCESS; +} + +int32_t vnodeDeleteTbl(SDeleteTblMsg *pMsg) { // build test data // pMsg->vgId = 2; // pMsg->uid = 562949986978701; -- GitLab