diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 2539465890c7608263da517211bafe735fb8d586..2833a77142f72ffb8c25d058f200b7cb401da341 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -3331,7 +3331,7 @@ int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, S tscDebug("0x%"PRIx64" table deldata submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps); // set payload - size_t payloadLen = sizeof(SMsgDesc) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + sizeof(SControlData); + size_t payloadLen = sizeof(SMsgDesc) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + sizeof(SControlData) + sizeof(int32_t); int32_t ret = tscAllocPayload(pCmd, payloadLen); if (ret != TSDB_CODE_SUCCESS) { return ret; @@ -3351,22 +3351,24 @@ int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, S pMsgDesc->numOfVnodes = htonl(1); // SSubmitMsg int32_t size = pCmd->payloadLen - sizeof(SMsgDesc); - pSubmitMsg->header.vgId = htonl(pTableMeta->vgId); + pSubmitMsg->header.vgId = htonl(pTableMeta->vgId); pSubmitMsg->header.contLen = htonl(size); - pSubmitMsg->length = pSubmitMsg->header.contLen; - pSubmitMsg->numOfBlocks = htonl(1); + pSubmitMsg->length = pSubmitMsg->header.contLen; + pSubmitMsg->numOfBlocks = htonl(1); // SSubmitBlk - pSubmitBlk->flag = FLAG_BLK_CONTROL; // this is control block - pSubmitBlk->tid = htonl(pTableMeta->id.tid); - pSubmitBlk->uid = htobe64(pTableMeta->id.uid); + pSubmitBlk->flag = FLAG_BLK_CONTROL; // this is control block + pSubmitBlk->tid = htonl(pTableMeta->id.tid); + pSubmitBlk->uid = htobe64(pTableMeta->id.uid); pSubmitBlk->numOfRows = htons(1); pSubmitBlk->schemaLen = 0; // only server return TSDB_CODE_TDB_TABLE_RECONFIGURE need schema attached - pSubmitBlk->sversion = htonl(pTableMeta->sversion); - pSubmitBlk->dataLen = htonl(sizeof(SControlData)); + pSubmitBlk->sversion = htonl(pTableMeta->sversion); + pSubmitBlk->dataLen = htonl(sizeof(SControlData) + sizeof(int32_t)); // SControlData pControlData->command = htonl(CMD_DELETE_DATA); pControlData->win.skey = htobe64(pQueryInfo->window.skey); pControlData->win.ekey = htobe64(pQueryInfo->window.ekey); + pControlData->tnum = htonl(1); + pControlData->tids[0] = htonl(pTableMeta->id.tid); return TSDB_CODE_SUCCESS; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 8b289f7a8d3063aa7ba210320ea97db49b6fc330..80efba21bfc5e3de7fd7613b29e161743486e9b5 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -1005,9 +1005,13 @@ typedef struct { #define CMD_DELETE_DATA 0x00000001 #define CMD_TRUNCATE 0x00000002 + +#define GET_CTLDATA_SIZE(p) (sizeof(SControlData) + p->tnum * sizeof(int32_t)) typedef struct SControlData{ uint32_t command; // see define CMD_??? STimeWindow win; + int32_t tnum; // tids nums + int32_t tids[]; // delete table tid } SControlData; enum { diff --git a/src/tsdb/inc/tsdbTruncate.h b/src/tsdb/inc/tsdbTruncate.h index 9c0db697f5229adb2143bc4911a5b4d6bf1b98e7..4570f1e733c238e836aadcc1c5ad5aa66b7064ce 100644 --- a/src/tsdb/inc/tsdbTruncate.h +++ b/src/tsdb/inc/tsdbTruncate.h @@ -20,16 +20,13 @@ extern "C" { #endif // SControlData addition information +#define GET_CTLINFO_SIZE(p) (sizeof(SControlDataInfo) + p.ctlData.tnum * sizeof(int32_t)) typedef struct { - SControlData ctlData; // addition info - uint64_t uid; // table unique id - int32_t tid; // table id tsem_t* pSem; bool memNull; // pRepo->mem is NULL, this is true - uint64_t* uids; // delete table - int32_t uidCount; SShellSubmitRspMsg *pRsp; + SControlData ctlData; } SControlDataInfo; // -------- interface --------- diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index dee5bc86ffc6cf22dd051eb39a86ea23bf76bb74..6073aa2a117ef64543a77b417b67e73ee92afecd 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -240,7 +240,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); - if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; + if (tsdbMakeRoom(ppBuf, tlen + size) < 0) + return -1; void *ptr = POINTER_SHIFT(*ppBuf, tlen); tsdbEncodeSBlockIdx(&ptr, pBlkIdx); @@ -249,7 +250,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { } tlen += sizeof(TSCKSUM); - if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; + if (tsdbMakeRoom(ppBuf, tlen) < 0) + return -1; taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 59f09375da7f49947c9ea7a0ef6e36abd2af7c7c..b159867ddde41b4e4422c7ab1d63ac5215c293f5 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -1122,7 +1122,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r // Control Data int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmitRspMsg *pRsp, tsem_t** ppSem) { int32_t ret = TSDB_CODE_SUCCESS; - assert(pBlock->dataLen == sizeof(SControlData)); SControlData* pCtlData = (SControlData* )pBlock->data; // INIT SEM FOR ASYNC WAIT COMMIT RESULT @@ -1136,15 +1135,18 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit // anti-serialize pCtlData->command = htonl(pCtlData->command); + pCtlData->tnum = htonl(pCtlData->tnum); pCtlData->win.skey = htobe64(pCtlData->win.skey); pCtlData->win.ekey = htobe64(pCtlData->win.ekey); - + for (int32_t i=0; i < pCtlData->tnum; i++) { + pCtlData->tids[i] = htonl(pCtlData->tids[i]); + } + // server data set - SControlDataInfo* pNew = (SControlDataInfo* )tmalloc(sizeof(SControlDataInfo)); - memset(pNew, 0, sizeof(SControlDataInfo)); - pNew->ctlData = *pCtlData; - pNew->uid = pBlock->uid; - pNew->tid = pBlock->tid; + size_t nsize = sizeof(SControlDataInfo) + pCtlData->tnum * sizeof(int32_t); + SControlDataInfo* pNew = (SControlDataInfo* )tmalloc(nsize); + memset(pNew, 0, nsize); + memcpy(&pNew->ctlData, pCtlData, GET_CTLDATA_SIZE(pCtlData)); pNew->pRsp = pRsp; if (ppSem) pNew->pSem = *ppSem; @@ -1155,9 +1157,12 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit } // if async post failed , must set wait event ppSem NULL - if(ret != TSDB_CODE_SUCCESS && ppSem) { - tsem_destroy(*ppSem); - *ppSem = NULL; + if(ret != TSDB_CODE_SUCCESS) { + if(*ppSem) { + tsem_destroy(*ppSem); + *ppSem = NULL; + } + tfree(pNew); } return ret; diff --git a/src/tsdb/src/tsdbTruncate.c b/src/tsdb/src/tsdbTruncate.c index 30bd71e0c9d8a4c0bed5f1c1b8df25e33c0f2830..127429a52e4d51f047ee5aa7c61a60c281092b45 100644 --- a/src/tsdb/src/tsdbTruncate.c +++ b/src/tsdb/src/tsdbTruncate.c @@ -15,6 +15,18 @@ #include "tsdbint.h" #include "tsdbTruncate.h" +enum { + TSDB_NO_TRUNCATE, + TSDB_IN_TRUNCATE, + TSDB_WAITING_TRUNCATE, +}; + +enum BlockSolve { + BLOCK_RETAIN = 0, + BLOCK_MODIFY, + BLOCK_DELETE +}; + typedef struct { STable * pTable; SBlockIdx * pBlkIdx; @@ -30,51 +42,46 @@ typedef struct { SDFileSet wSet; SArray * aBlkIdx; SArray * aSupBlk; + SArray * aSubBlk; SDataCols *pDCols; SControlDataInfo* pCtlInfo; } STruncateH; -#define TSDB_TRUNCATE_WSET(prh) (&((prh)->wSet)) -#define TSDB_TRUNCATE_REPO(prh) TSDB_READ_REPO(&((prh)->readh)) -#define TSDB_TRUNCATE_HEAD_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_HEAD) -#define TSDB_TRUNCATE_DATA_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_DATA) -#define TSDB_TRUNCATE_LAST_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_LAST) -#define TSDB_TRUNCATE_SMAD_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_SMAD) -#define TSDB_TRUNCATE_SMAL_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_SMAL) -#define TSDB_TRUNCATE_BUF(prh) TSDB_READ_BUF(&((prh)->readh)) -#define TSDB_TRUNCATE_COMP_BUF(prh) TSDB_READ_COMP_BUF(&((prh)->readh)) -#define TSDB_TRUNCATE_EXBUF(prh) TSDB_READ_EXBUF(&((prh)->readh)) + +#define TSDB_TRUNCATE_WSET(ptru) (&((ptru)->wSet)) +#define TSDB_TRUNCATE_REPO(ptru) TSDB_READ_REPO(&((ptru)->readh)) +#define TSDB_TRUNCATE_HEAD_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_HEAD) +#define TSDB_TRUNCATE_DATA_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_DATA) +#define TSDB_TRUNCATE_LAST_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_LAST) +#define TSDB_TRUNCATE_SMAD_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_SMAD) +#define TSDB_TRUNCATE_SMAL_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_SMAL) +#define TSDB_TRUNCATE_BUF(ptru) TSDB_READ_BUF(&((ptru)->readh)) +#define TSDB_TRUNCATE_COMP_BUF(ptru) TSDB_READ_COMP_BUF(&((ptru)->readh)) +#define TSDB_TRUNCATE_EXBUF(ptru) TSDB_READ_EXBUF(&((ptru)->readh)) static void tsdbStartTruncate(STsdbRepo *pRepo); static void tsdbEndTruncate(STsdbRepo *pRepo, int eno); static int tsdbTruncateMeta(STsdbRepo *pRepo); static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo); -static int tsdbTruncateFSet(STruncateH *prh, SDFileSet *pSet); -static int tsdbDeleteFSet(STruncateH *prh, SDFileSet *pSet); -static int tsdbInitTruncateH(STruncateH *prh, STsdbRepo *pRepo); -static void tsdbDestroyTruncateH(STruncateH *prh); -static int tsdbInitTruncateTblArray(STruncateH *prh); -static void tsdbDestroyTruncateTblArray(STruncateH *prh); -static int tsdbCacheFSetIndex(STruncateH *prh); +static int tsdbFSetTruncate(STruncateH *ptru, SDFileSet *pSet); +static int tsdbFSetDelete(STruncateH *ptru, SDFileSet *pSet); +static int tsdbInitTruncateH(STruncateH *ptru, STsdbRepo *pRepo); +static void tsdbDestroyTruncateH(STruncateH *ptru); +static int tsdbInitTruncateTblArray(STruncateH *ptru); +static void tsdbDestroyTruncateTblArray(STruncateH *ptru); +static int tsdbCacheFSetIndex(STruncateH *ptru); static int tsdbTruncateCache(STsdbRepo *pRepo, void *param); -static int tsdbTruncateFSetInit(STruncateH *prh, SDFileSet *pSet); -static void tsdbTruncateFSetEnd(STruncateH *prh); -static int tsdbTruncateFSetImpl(STruncateH *prh); -static int tsdbDeleteFSetImpl(STruncateH *prh); -static bool tsdbBlockInterleaved(STruncateH *prh, SBlock *pBlock); -static int tsdbWriteBlockToRightFile(STruncateH *prh, STable *pTable, SDataCols *pDCols, void **ppBuf, - void **ppCBuf, void **ppExBuf); -static int tsdbTruncateImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo); +static int tsdbFSetInit(STruncateH *ptru, SDFileSet *pSet); +static void tsdbTruncateFSetEnd(STruncateH *ptru); +static int tsdbTruncateFSetImpl(STruncateH *ptru); +static int tsdbFSetDeleteImpl(STruncateH *ptru); +static int tsdbBlockSolve(STruncateH *ptru, SBlock *pBlock); +static int tsdbWriteBlockToFile(STruncateH *ptru, STable *pTable, SDataCols *pDCols, void **ppBuf, + void **ppCBuf, void **ppExBuf, SBlock * pBlock); +static int tsdbTruncateImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo); - -enum { - TSDB_NO_TRUNCATE, - TSDB_IN_TRUNCATE, - TSDB_WAITING_TRUNCATE, -}; - // delete int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlInfo) { int ret = TSDB_CODE_SUCCESS; @@ -175,7 +182,7 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { STruncateH truncateH = {0}; SDFileSet * pSet = NULL; - tsdbDebug("vgId:%d start to truncate TS data for %" PRIu64, REPO_ID(pRepo), pCtlInfo->uid); + tsdbDebug("vgId:%d start to truncate TS data for %d", REPO_ID(pRepo), pCtlInfo->ctlData.tids[0]); if (tsdbInitTruncateH(&truncateH, pRepo) < 0) { return -1; @@ -213,13 +220,13 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { #endif if (pCtlInfo->ctlData.command == CMD_TRUNCATE) { - if (tsdbTruncateFSet(&truncateH, pSet) < 0) { + if (tsdbFSetTruncate(&truncateH, pSet) < 0) { tsdbDestroyTruncateH(&truncateH); tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); return -1; } } else if (pCtlInfo->ctlData.command == CMD_DELETE_DATA) { - if (tsdbDeleteFSet(&truncateH, pSet) < 0) { + if (tsdbFSetDelete(&truncateH, pSet) < 0) { tsdbDestroyTruncateH(&truncateH); tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); return -1; @@ -235,68 +242,68 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { return 0; } -static int tsdbDeleteFSet(STruncateH *prh, SDFileSet *pSet) { - STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh); +static int tsdbFSetDelete(STruncateH *ptru, SDFileSet *pSet) { + STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru); SDiskID did = {0}; tsdbDebug("vgId:%d start to truncate data in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); - if (tsdbTruncateFSetInit(prh, pSet) < 0) { + if (tsdbFSetInit(ptru, pSet) < 0) { return -1; } // Create new fset as deleted fset - tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(prh->rtn)), &(did.level), &(did.id)); + tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ptru->rtn)), &(did.level), &(did.id)); if (did.level == TFS_UNDECIDED_LEVEL) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); - tsdbTruncateFSetEnd(prh); + tsdbTruncateFSetEnd(ptru); return -1; } - tsdbInitDFileSet(TSDB_TRUNCATE_WSET(prh), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), + tsdbInitDFileSet(TSDB_TRUNCATE_WSET(ptru), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER); - if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(prh), true) < 0) { + if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(ptru), true) < 0) { tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); - tsdbTruncateFSetEnd(prh); + tsdbTruncateFSetEnd(ptru); return -1; } - if (tsdbDeleteFSetImpl(prh) < 0) { - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); - tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(prh)); - tsdbTruncateFSetEnd(prh); + if (tsdbFSetDeleteImpl(ptru) < 0) { + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru)); + tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(ptru)); + tsdbTruncateFSetEnd(ptru); return -1; } - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); - tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(prh)); + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru)); + tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(ptru)); tsdbDebug("vgId:%d FSET %d truncate data over", REPO_ID(pRepo), pSet->fid); - tsdbTruncateFSetEnd(prh); + tsdbTruncateFSetEnd(ptru); return 0; } -static int tsdbTruncateFSet(STruncateH *prh, SDFileSet *pSet) { - STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh); +static int tsdbFSetTruncate(STruncateH *ptru, SDFileSet *pSet) { + STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru); SDiskID did = {0}; - SDFileSet *pWSet = TSDB_TRUNCATE_WSET(prh); + SDFileSet *pWSet = TSDB_TRUNCATE_WSET(ptru); tsdbDebug("vgId:%d start to truncate table in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); - if (tsdbTruncateFSetInit(prh, pSet) < 0) { + if (tsdbFSetInit(ptru, pSet) < 0) { return -1; } // Create new fset as truncated fset - tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(prh->rtn)), &(did.level), &(did.id)); + tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ptru->rtn)), &(did.level), &(did.id)); if (did.level == TFS_UNDECIDED_LEVEL) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); - tsdbTruncateFSetEnd(prh); + tsdbTruncateFSetEnd(ptru); return -1; } @@ -320,81 +327,90 @@ static int tsdbTruncateFSet(STruncateH *prh, SDFileSet *pSet) { return -1; } - if (tsdbTruncateFSetImpl(prh) < 0) { - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); - tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(prh)); - tsdbTruncateFSetEnd(prh); + if (tsdbTruncateFSetImpl(ptru) < 0) { + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru)); + tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(ptru)); + tsdbTruncateFSetEnd(ptru); return -1; } - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); - tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(prh)); + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru)); + tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(ptru)); tsdbDebug("vgId:%d FSET %d truncate table over", REPO_ID(pRepo), pSet->fid); - tsdbTruncateFSetEnd(prh); + tsdbTruncateFSetEnd(ptru); return 0; } -static int tsdbInitTruncateH(STruncateH *prh, STsdbRepo *pRepo) { +static int tsdbInitTruncateH(STruncateH *ptru, STsdbRepo *pRepo) { STsdbCfg *pCfg = REPO_CFG(pRepo); - memset(prh, 0, sizeof(*prh)); + memset(ptru, 0, sizeof(*ptru)); - TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(prh)); + TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(ptru)); - tsdbGetRtnSnap(pRepo, &(prh->rtn)); - tsdbFSIterInit(&(prh->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); + tsdbGetRtnSnap(pRepo, &(ptru->rtn)); + tsdbFSIterInit(&(ptru->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); - if (tsdbInitReadH(&(prh->readh), pRepo) < 0) { + if (tsdbInitReadH(&(ptru->readh), pRepo) < 0) { return -1; } - if (tsdbInitTruncateTblArray(prh) < 0) { - tsdbDestroyTruncateH(prh); + if (tsdbInitTruncateTblArray(ptru) < 0) { + tsdbDestroyTruncateH(ptru); return -1; } - prh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); - if (prh->aBlkIdx == NULL) { + ptru->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); + if (ptru->aBlkIdx == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyTruncateH(prh); + tsdbDestroyTruncateH(ptru); return -1; } - prh->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); - if (prh->aSupBlk == NULL) { + ptru->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); + if (ptru->aSupBlk == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyTruncateH(prh); + tsdbDestroyTruncateH(ptru); return -1; } - prh->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); - if (prh->pDCols == NULL) { + ptru->aSubBlk = taosArrayInit(20, sizeof(SBlock)); + if (ptru->aSubBlk == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyTruncateH(prh); + tsdbDestroyTruncateH(ptru); + return -1; + } + + ptru->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); + if (ptru->pDCols == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyTruncateH(ptru); return -1; } return 0; } -static void tsdbDestroyTruncateH(STruncateH *prh) { - prh->pDCols = tdFreeDataCols(prh->pDCols); - prh->aSupBlk = taosArrayDestroy(&prh->aSupBlk); - prh->aBlkIdx = taosArrayDestroy(&prh->aBlkIdx); - tsdbDestroyTruncateTblArray(prh); - tsdbDestroyReadH(&(prh->readh)); - tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh)); +static void tsdbDestroyTruncateH(STruncateH *ptru) { + ptru->pDCols = tdFreeDataCols(ptru->pDCols); + ptru->aSupBlk = taosArrayDestroy(&ptru->aSupBlk); + ptru->aSubBlk = taosArrayDestroy(&ptru->aSubBlk); + ptru->aBlkIdx = taosArrayDestroy(&ptru->aBlkIdx); + tsdbDestroyTruncateTblArray(ptru); + tsdbDestroyReadH(&(ptru->readh)); + tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru)); } -static int tsdbInitTruncateTblArray(STruncateH *prh) { - STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh); +// init tbl array with pRepo->meta +static int tsdbInitTruncateTblArray(STruncateH *ptru) { + STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru); STsdbMeta *pMeta = pRepo->tsdbMeta; if (tsdbRLockRepoMeta(pRepo) < 0) return -1; - prh->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH)); - if (prh->tblArray == NULL) { + ptru->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH)); + if (ptru->tblArray == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbUnlockRepoMeta(pRepo); return -1; @@ -402,13 +418,13 @@ static int tsdbInitTruncateTblArray(STruncateH *prh) { // Note here must start from 0 for (int i = 0; i < pMeta->maxTables; ++i) { - STableTruncateH ch = {0}; + STableTruncateH tbl = {0}; if (pMeta->tables[i] != NULL) { tsdbRefTable(pMeta->tables[i]); - ch.pTable = pMeta->tables[i]; + tbl.pTable = pMeta->tables[i]; } - if (taosArrayPush(prh->tblArray, &ch) == NULL) { + if (taosArrayPush(ptru->tblArray, &tbl) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbUnlockRepoMeta(pRepo); return -1; @@ -419,46 +435,46 @@ static int tsdbInitTruncateTblArray(STruncateH *prh) { return 0; } -static void tsdbDestroyTruncateTblArray(STruncateH *prh) { - STableTruncateH *pHandle = NULL; +static void tsdbDestroyTruncateTblArray(STruncateH *ptru) { + STableTruncateH *pItem = NULL; - if (prh->tblArray == NULL) return; + if (ptru->tblArray == NULL) return; - for (size_t i = 0; i < taosArrayGetSize(prh->tblArray); ++i) { - pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, i); - if (pHandle->pTable) { - tsdbUnRefTable(pHandle->pTable); + for (size_t i = 0; i < taosArrayGetSize(ptru->tblArray); ++i) { + pItem = (STableTruncateH *)taosArrayGet(ptru->tblArray, i); + if (pItem->pTable) { + tsdbUnRefTable(pItem->pTable); } - tfree(pHandle->pInfo); + tfree(pItem->pInfo); } - prh->tblArray = taosArrayDestroy(&prh->tblArray); + ptru->tblArray = taosArrayDestroy(&ptru->tblArray); } -static int tsdbCacheFSetIndex(STruncateH *prh) { - SReadH *pReadH = &(prh->readh); +static int tsdbCacheFSetIndex(STruncateH *ptru) { + SReadH *pReadH = &(ptru->readh); if (tsdbLoadBlockIdx(pReadH) < 0) { return -1; } - size_t tblArraySize = taosArrayGetSize(prh->tblArray); - for (size_t tid = 1; tid < tblArraySize; ++tid) { - STableTruncateH *pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, tid); - pHandle->pBlkIdx = NULL; + size_t cnt = taosArrayGetSize(ptru->tblArray); + for (size_t tid = 1; tid < cnt; ++tid) { + STableTruncateH *pItem = (STableTruncateH *)taosArrayGet(ptru->tblArray, tid); + pItem->pBlkIdx = NULL; - if (pHandle->pTable == NULL) continue; - if (tsdbSetReadTable(pReadH, pHandle->pTable) < 0) { + if (pItem->pTable == NULL) + continue; + if (tsdbSetReadTable(pReadH, pItem->pTable) < 0) return -1; - } - - if (pReadH->pBlkIdx == NULL) continue; - pHandle->bIndex = *(pReadH->pBlkIdx); - pHandle->pBlkIdx = &(pHandle->bIndex); + if (pReadH->pBlkIdx == NULL) + continue; + pItem->bIndex = *(pReadH->pBlkIdx); + pItem->pBlkIdx = &(pItem->bIndex); uint32_t originLen = 0; - if (tsdbLoadBlockInfo(pReadH, (void **)(&(pHandle->pInfo)), &originLen) < 0) { + if (tsdbLoadBlockInfo(pReadH, (void **)(&(pItem->pInfo)), &originLen) < 0) { return -1; } } @@ -466,39 +482,30 @@ static int tsdbCacheFSetIndex(STruncateH *prh) { return 0; } -static int tsdbTruncateFSetInit(STruncateH *prh, SDFileSet *pSet) { - taosArrayClear(prh->aBlkIdx); - taosArrayClear(prh->aSupBlk); +static int tsdbFSetInit(STruncateH *ptru, SDFileSet *pSet) { + taosArrayClear(ptru->aBlkIdx); + taosArrayClear(ptru->aSupBlk); - if (tsdbSetAndOpenReadFSet(&(prh->readh), pSet) < 0) { + // open + if (tsdbSetAndOpenReadFSet(&(ptru->readh), pSet) < 0) { return -1; } - if (tsdbCacheFSetIndex(prh) < 0) { - tsdbCloseAndUnsetFSet(&(prh->readh)); + // load index to cache + if (tsdbCacheFSetIndex(ptru) < 0) { + tsdbCloseAndUnsetFSet(&(ptru->readh)); return -1; } return 0; } -static void tsdbTruncateFSetEnd(STruncateH *prh) { tsdbCloseAndUnsetFSet(&(prh->readh)); } +static void tsdbTruncateFSetEnd(STruncateH *ptru) { tsdbCloseAndUnsetFSet(&(ptru->readh)); } -static bool tsdbBlockInterleaved(STruncateH *prh, SBlock *pBlock) { - // STruncateTblMsg *pMsg = (STruncateTblMsg *)prh->param; - // for (uint16_t i = 0; i < pMsg->nSpan; ++i) { - // STimeWindow tw = pMsg->span[i]; - // if (!(pBlock->keyFirst > tw.ekey || pBlock->keyLast < tw.skey)) { - // return true; - // } - // } - // return false; - return true; -} -static int32_t tsdbFilterDataCols(STruncateH *prh, SDataCols *pSrcDCols) { - SDataCols * pDstDCols = prh->pDCols; - SControlData* pCtlData = &prh->pCtlInfo->ctlData; +static int32_t tsdbFilterDataCols(STruncateH *ptru, SDataCols *pSrcDCols) { + SDataCols * pDstDCols = ptru->pDCols; + SControlData* pCtlData = &ptru->pCtlInfo->ctlData; tdResetDataCols(pDstDCols); pDstDCols->maxCols = pSrcDCols->maxCols; @@ -518,44 +525,52 @@ static int32_t tsdbFilterDataCols(STruncateH *prh, SDataCols *pSrcDCols) { pDstDCols->maxPoints, 0); } } - ++pDstDCols->numOfRows; + ++ pDstDCols->numOfRows; } return 0; } -static int tsdbTruncateFSetImpl(STruncateH *prh) { - STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(prh); - // SReadH * pReadh = &(prh->readh); - SBlockIdx * pBlkIdx = NULL; - void ** ppBuf = &(TSDB_TRUNCATE_BUF(prh)); - // void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(prh)); - // void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(prh)); +// table in delete list +bool tableInDel(STruncateH* ptru, int32_t tid) { + for (int32_t i = 0; i < ptru->pCtlInfo->ctlData.tnum; i++) { + if (tid == ptru->pCtlInfo->ctlData.tids[i]) + return true; + } + + return false; +} - taosArrayClear(prh->aBlkIdx); +static int tsdbTruncateFSetImpl(STruncateH *ptru) { + STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(ptru); + // SReadH * pReadh = &(ptru->readh); + SBlockIdx * pBlkIdx = NULL; + void ** ppBuf = &(TSDB_TRUNCATE_BUF(ptru)); + // void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(ptru)); + // void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(ptru)); - for (size_t tid = 1; tid < taosArrayGetSize(prh->tblArray); ++tid) { - STableTruncateH *pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, tid); - pBlkIdx = pHandle->pBlkIdx; + taosArrayClear(ptru->aBlkIdx); - if (pHandle->pTable == NULL || pHandle->pBlkIdx == NULL) continue; + for (size_t tid = 1; tid < taosArrayGetSize(ptru->tblArray); ++tid) { + STableTruncateH *pItem = (STableTruncateH *)taosArrayGet(ptru->tblArray, tid); + pBlkIdx = pItem->pBlkIdx; - taosArrayClear(prh->aSupBlk); + if (pItem->pTable == NULL || pItem->pBlkIdx == NULL) continue; - uint64_t uid = pHandle->pTable->tableId.uid; + taosArrayClear(ptru->aSupBlk); - if (uid != prh->pCtlInfo->uid) { - if ((pBlkIdx->numOfBlocks > 0) && (taosArrayPush(prh->aBlkIdx, (const void *)(pBlkIdx)) == NULL)) { + if (!tableInDel(ptru, tid)) { + if ((pBlkIdx->numOfBlocks > 0) && (taosArrayPush(ptru->aBlkIdx, (const void *)(pBlkIdx)) == NULL)) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } } else { // Loop to mark delete flag for each block data - tsdbDebug("vgId:%d uid %" PRIu64 " matched to truncate", REPO_ID(pRepo), uid); - // for (int i = 0; i < pHandle->pBlkIdx->numOfBlocks; ++i) { - // SBlock *pBlock = pHandle->pInfo->blocks + i; + tsdbDebug("vgId:%d tid %ld matched to truncate", REPO_ID(pRepo), tid); + // for (int i = 0; i < pItem->pBlkIdx->numOfBlocks; ++i) { + // SBlock *pBlock = pItem->pInfo->blocks + i; - // if (tsdbWriteBlockToRightFile(prh, pHandle->pTable, prh->pDCols, ppBuf, ppCBuf, ppExBuf) < + // if (tsdbWriteBlockToFile(ptru, pItem->pTable, ptru->pDCols, ppBuf, ppCBuf, ppExBuf) < // 0) { // return -1; // } @@ -563,126 +578,234 @@ static int tsdbTruncateFSetImpl(STruncateH *prh) { } } - if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(prh), prh->aBlkIdx, ppBuf) < 0) { + if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(ptru), ptru->aBlkIdx, ppBuf) < 0) { return -1; } return 0; } -static int tsdbDeleteFSetImpl(STruncateH *prh) { - STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(prh); - // STsdbCfg * pCfg = REPO_CFG(pRepo); - SReadH * pReadh = &(prh->readh); - SBlockIdx blkIdx = {0}; - void ** ppBuf = &(TSDB_TRUNCATE_BUF(prh)); - void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(prh)); - void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(prh)); - // int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); +// if pBlock is border block return true else return false +static int tsdbBlockSolve(STruncateH *ptru, SBlock *pBlock) { + // delete window + STimeWindow* pdel = &ptru->pCtlInfo->ctlData.win; - taosArrayClear(prh->aBlkIdx); + // do nothing for no delete + if(pBlock->keyFirst > pdel->ekey || pBlock->keyLast < pdel->skey) + return BLOCK_RETAIN; - for (size_t tid = 1; tid < taosArrayGetSize(prh->tblArray); ++tid) { - STableTruncateH *pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, tid); - STSchema * pSchema = NULL; + // border block + if(pBlock->keyFirst <= pdel->skey || pBlock->keyLast >= pdel->ekey) + return BLOCK_MODIFY; - if (pHandle->pTable == NULL || pHandle->pBlkIdx == NULL) continue; + // need del + return BLOCK_DELETE; +} - if ((pSchema = tsdbGetTableSchemaImpl(pHandle->pTable, true, true, -1, -1)) == NULL) { - return -1; +// remove del block from pBlockInfo +int tsdbRemoveDelBlocks(STruncateH *ptru, STableTruncateH * pItem) { + // loop + int numOfBlocks = pItem->pBlkIdx->numOfBlocks; + int from = -1; + int delAll = 0; + + for (int i = numOfBlocks - 1; i >= 0; --i) { + SBlock *pBlock = pItem->pInfo->blocks + i; + int32_t solve = tsdbBlockSolve(ptru, pBlock); + bool doDel = false; + if (solve == BLOCK_DELETE) { + if (from == -1) + from = i; + if (i == 0) + doDel = true; + } else { + if(from != -1) + doDel = true; } - taosArrayClear(prh->aSupBlk); + // do del + if (doDel) { + int delCnt = from - i + 1; + memmove(pBlock, pItem->pInfo->blocks + i + delCnt, sizeof(SBlock) * delCnt); + delAll += delCnt; + } + } - uint64_t uid = pHandle->pTable->tableId.uid; - // if(uid != pMsg->uid) { - // TODO: copy the block data directly - // } + // set value + pItem->pBlkIdx->numOfBlocks -= delAll; - if ((tdInitDataCols(prh->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) || - (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tdFreeSchema(pSchema); - return -1; + return delAll; +} + +static void tsdbAddBlock(STruncateH *ptru, STableTruncateH *pItem, SBlock *pBlock) { + taosArrayPush(ptru->aSubBlk, (const void *)pBlock); + // have sub block + if (pBlock->numOfSubBlocks > 1) { + SBlock *jBlock = POINTER_SHIFT(pItem->pInfo, pBlock->offset);; + for (int j = 0; j < pBlock->numOfSubBlocks; j++) { + taosArrayPush(ptru->aSubBlk, (const void *)jBlock++); } + } +} +// need modify blocks +static int tsdbModifyBlocks(STruncateH *ptru, STableTruncateH *pItem) { + SReadH * pReadh = &(ptru->readh); + void ** ppBuf = &(TSDB_TRUNCATE_BUF(ptru)); + void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(ptru)); + void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(ptru)); + STSchema *pSchema = NULL; + SBlockIdx blkIdx = {0}; + + // get pSchema for del table + if ((pSchema = tsdbGetTableSchemaImpl(pItem->pTable, true, true, -1, -1)) == NULL) { + return -1; + } + + if ((tdInitDataCols(ptru->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) || + (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tdFreeSchema(pSchema); + return -1; + } + tdFreeSchema(pSchema); - // Loop to truncate each block data - for (int i = 0; i < pHandle->pBlkIdx->numOfBlocks; ++i) { - SBlock *pBlock = pHandle->pInfo->blocks + i; + // delete block + tsdbRemoveDelBlocks(ptru, pItem); + if(pItem->pBlkIdx->numOfBlocks == 0) { + // all blocks were deleted + return TSDB_CODE_SUCCESS; + } - // Copy the Blocks directly if TS is not interleaved. - if (!tsdbBlockInterleaved(prh, pBlock)) { - // tsdbWriteBlockAndDataToFile(); - continue; - } + taosArrayClear(ptru->aSupBlk); + taosArrayClear(ptru->aSubBlk); - // Otherwise load the block data and copy the specific rows. - if (tsdbLoadBlockData(pReadh, pBlock, pHandle->pInfo) < 0) { - return -1; - } - if (uid == prh->pCtlInfo->uid) { - tsdbFilterDataCols(prh, pReadh->pDCols[0]); - tsdbDebug("vgId:%d uid %" PRIu64 " matched, filter block data from rows %d to %d rows", REPO_ID(pRepo), uid, - pReadh->pDCols[0]->numOfRows, prh->pDCols->numOfRows); - if (prh->pDCols->numOfRows <= 0) continue; - - if (tsdbWriteBlockToRightFile(prh, pHandle->pTable, prh->pDCols, ppBuf, ppCBuf, ppExBuf) < 0) { - return -1; - } - } else { - tsdbDebug("vgId:%d uid %" PRIu64 " not matched, copy block data directly\n", REPO_ID(pRepo), uid); - if (tsdbWriteBlockToRightFile(prh, pHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) { - return -1; - } - } - } + // Loop to truncate each block data + for (int i = 0; i < pItem->pBlkIdx->numOfBlocks; ++i) { + SBlock *pBlock = pItem->pInfo->blocks + i; + int32_t solve = tsdbBlockSolve(ptru, pBlock); + if (solve == BLOCK_RETAIN) { + tsdbAddBlock(ptru, pItem, pBlock); + continue; + } - if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(prh), pHandle->pTable, prh->aSupBlk, NULL, - ppBuf, &blkIdx) < 0) { + // border block need load to delete no-use data + if (tsdbLoadBlockData(pReadh, pBlock, pItem->pInfo) < 0) { return -1; } - if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(prh->aBlkIdx, (const void *)(&blkIdx)) == NULL)) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbFilterDataCols(ptru, pReadh->pDCols[0]); + if (ptru->pDCols->numOfRows <= 0) { + continue; + } + + SBlock newBlock = {0}; + if (tsdbWriteBlockToFile(ptru, pItem->pTable, ptru->pDCols, ppBuf, ppCBuf, ppExBuf, &newBlock) < 0) { return -1; } + + // add new block to info + tsdbAddBlock(ptru, pItem, &newBlock); + } + + // write block info for each table + if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(ptru), pItem->pTable, ptru->aSupBlk, ptru->aSubBlk, + ppBuf, &blkIdx) < 0) { + return -1; } - if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(prh), prh->aBlkIdx, ppBuf) < 0) { + // each table's blkIdx + if (blkIdx.numOfBlocks > 0 && taosArrayPush(ptru->aBlkIdx, (const void *)(&blkIdx)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } return 0; } -static int tsdbWriteBlockToRightFile(STruncateH *prh, STable *pTable, SDataCols *pDCols, void **ppBuf, - void **ppCBuf, void **ppExBuf) { - STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh); +// keep intact blocks info and write to head file then save offset to blkIdx +static int tsdbKeepIntactBlocks(STruncateH *ptru, STableTruncateH * pItem) { + // init + SBlockIdx blkIdx = {0}; + taosArrayClear(ptru->aSupBlk); + taosArrayClear(ptru->aSubBlk); + + for (int32_t i = 0; i < pItem->pBlkIdx->numOfBlocks; i++) { + SBlock *pBlock = pItem->pInfo->blocks + i; + tsdbAddBlock(ptru, pItem, pBlock); + } + + // write block info for one table + void **ppBuf = &(TSDB_TRUNCATE_BUF(ptru)); + int32_t ret = tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(ptru), pItem->pTable, ptru->aSupBlk, + ptru->aSubBlk, ppBuf, &blkIdx); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + + // each table's blkIdx + if (blkIdx.numOfBlocks > 0 && taosArrayPush(ptru->aBlkIdx, (const void *)&blkIdx) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return ret; +} + +static int tsdbFSetDeleteImpl(STruncateH *ptru) { + void ** ppBuf = &(TSDB_TRUNCATE_BUF(ptru)); + int32_t ret = TSDB_CODE_SUCCESS; + + // 1.INIT + taosArrayClear(ptru->aBlkIdx); + + for (size_t tid = 1; tid < taosArrayGetSize(ptru->tblArray); ++tid) { + STableTruncateH *pItem = (STableTruncateH *)taosArrayGet(ptru->tblArray, tid); + + // no table in this tid position + if (pItem->pTable == NULL || pItem->pBlkIdx == NULL) + continue; + + // 2.WRITE INFO OF EACH TABLE BLOCK INFO TO HEAD FILE + if (tableInDel(ptru, tid)) { + // modify blocks info and write to head file then save offset to blkIdx + ret = tsdbModifyBlocks(ptru, pItem); + } else { + // keep intact blocks info and write to head file then save offset to blkIdx + ret = tsdbKeepIntactBlocks(ptru, pItem); + } + if (ret != TSDB_CODE_SUCCESS) + return ret; + } // tid for + + // 3.WRITE INDEX OF ALL TABLE'S BLOCK TO HEAD FILE + if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(ptru), ptru->aBlkIdx, ppBuf) < 0) { + return -1; + } + + return ret; +} + +static int tsdbWriteBlockToFile(STruncateH *ptru, STable *pTable, SDataCols *pDCols, void **ppBuf, + void **ppCBuf, void **ppExBuf, SBlock *pBlock) { + STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru); STsdbCfg * pCfg = REPO_CFG(pRepo); SDFile * pDFile = NULL; bool isLast = false; - SBlock block = {0}; ASSERT(pDCols->numOfRows > 0); if (pDCols->numOfRows < pCfg->minRowsPerFileBlock) { - pDFile = TSDB_TRUNCATE_LAST_FILE(prh); + pDFile = TSDB_TRUNCATE_LAST_FILE(ptru); isLast = true; } else { - pDFile = TSDB_TRUNCATE_DATA_FILE(prh); + pDFile = TSDB_TRUNCATE_DATA_FILE(ptru); isLast = false; } if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, - isLast ? TSDB_TRUNCATE_SMAL_FILE(prh) : TSDB_TRUNCATE_SMAD_FILE(prh), pDCols, - &block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) { - return -1; - } - - if (taosArrayPush(prh->aSupBlk, (void *)(&block)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + isLast ? TSDB_TRUNCATE_SMAL_FILE(ptru) : TSDB_TRUNCATE_SMAD_FILE(ptru), pDCols, + pBlock, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; }