提交 6185cef9 编写于 作者: A Alex Duan

[TS-238]<feature>(tsdb): single table del data first ok

上级 5a5b4bf5
......@@ -1004,6 +1004,7 @@ typedef struct {
} STLV;
#define CMD_DELETE_DATA 0x00000001
#define CMD_TRUNCATE 0x00000002
typedef struct SControlData{
uint32_t command; // see define CMD_???
STimeWindow win;
......
......@@ -18,6 +18,8 @@
#ifdef __cplusplus
extern "C" {
#endif
// SControlData addition information
typedef struct {
SControlData ctlData;
// addition info
......@@ -25,11 +27,15 @@ typedef struct {
int32_t tid; // table id
tsem_t* pSem;
bool memNull; // pRepo->mem is NULL, this is true
uint64_t* uids; // delete table
int32_t uidCount;
SShellSubmitRspMsg *pRsp;
} SControlDataInfo;
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param);
void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param);
// -------- interface ---------
// delete
int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo);
#ifdef __cplusplus
}
......
......@@ -1780,17 +1780,6 @@ int tsdbApplyRtn(STsdbRepo *pRepo) {
return 0;
}
int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo) {
int ret = TSDB_CODE_SUCCESS;
if(pCtlDataInfo->pRsp) {
pCtlDataInfo->pRsp->affectedRows = htonl(23);
pCtlDataInfo->pRsp->code = ret;
}
return ret;
}
// do control task
int tsdbCommitControl(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo) {
int ret = TSDB_CODE_SUCCESS;
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbint.h"
#include "tsdbTruncate.h"
typedef struct {
STable * pTable;
......@@ -30,43 +31,43 @@ typedef struct {
SArray * aBlkIdx;
SArray * aSupBlk;
SDataCols *pDCols;
void * param; // STruncateTblMsg or SDeleteTblMsg
TSDB_REQ_T type; // truncate or delete
SControlDataInfo* pCtlInfo;
} STruncateH;
#define TSDB_TRUNCATE_WSET(pTruncateH) (&((pTruncateH)->wSet))
#define TSDB_TRUNCATE_REPO(pTruncateH) TSDB_READ_REPO(&((pTruncateH)->readh))
#define TSDB_TRUNCATE_HEAD_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_HEAD)
#define TSDB_TRUNCATE_DATA_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_DATA)
#define TSDB_TRUNCATE_LAST_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_LAST)
#define TSDB_TRUNCATE_SMAD_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_SMAD)
#define TSDB_TRUNCATE_SMAL_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_SMAL)
#define TSDB_TRUNCATE_BUF(pTruncateH) TSDB_READ_BUF(&((pTruncateH)->readh))
#define TSDB_TRUNCATE_COMP_BUF(pTruncateH) TSDB_READ_COMP_BUF(&((pTruncateH)->readh))
#define TSDB_TRUNCATE_EXBUF(pTruncateH) TSDB_READ_EXBUF(&((pTruncateH)->readh))
#define TSDB_TRUNCATE_WSET(prh) (&((prh)->wSet))
#define TSDB_TRUNCATE_REPO(prh) TSDB_READ_REPO(&((prh)->readh))
#define TSDB_TRUNCATE_HEAD_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_HEAD)
#define TSDB_TRUNCATE_DATA_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_DATA)
#define TSDB_TRUNCATE_LAST_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_LAST)
#define TSDB_TRUNCATE_SMAD_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_SMAD)
#define TSDB_TRUNCATE_SMAL_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_SMAL)
#define TSDB_TRUNCATE_BUF(prh) TSDB_READ_BUF(&((prh)->readh))
#define TSDB_TRUNCATE_COMP_BUF(prh) TSDB_READ_COMP_BUF(&((prh)->readh))
#define TSDB_TRUNCATE_EXBUF(prh) TSDB_READ_EXBUF(&((prh)->readh))
/*
static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
static void tsdbStartTruncate(STsdbRepo *pRepo);
static void tsdbEndTruncate(STsdbRepo *pRepo, int eno);
static int tsdbTruncateMeta(STsdbRepo *pRepo);
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
//static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet);
//static int tsdbDeleteFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *pTruncateH);
static int tsdbInitTruncateTblArray(STruncateH *pTruncateH);
static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH);
static int tsdbCacheFSetIndex(STruncateH *pTruncateH);
static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo);
static int tsdbTruncateFSet(STruncateH *prh, SDFileSet *pSet);
static int tsdbDeleteFSet(STruncateH *prh, SDFileSet *pSet);
static int tsdbInitTruncateH(STruncateH *prh, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *prh);
static int tsdbInitTruncateTblArray(STruncateH *prh);
static void tsdbDestroyTruncateTblArray(STruncateH *prh);
static int tsdbCacheFSetIndex(STruncateH *prh);
static int tsdbTruncateCache(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet);
static void tsdbTruncateFSetEnd(STruncateH *pTruncateH);
static int tsdbTruncateFSetImpl(STruncateH *pTruncateH);
static int tsdbDeleteFSetImpl(STruncateH *pTruncateH);
static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock);
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf,
static int tsdbTruncateFSetInit(STruncateH *prh, SDFileSet *pSet);
static void tsdbTruncateFSetEnd(STruncateH *prh);
static int tsdbTruncateFSetImpl(STruncateH *prh);
static int tsdbDeleteFSetImpl(STruncateH *prh);
static bool tsdbBlockInterleaved(STruncateH *prh, SBlock *pBlock);
static int tsdbWriteBlockToRightFile(STruncateH *prh, STable *pTable, SDataCols *pDCols, void **ppBuf,
void **ppCBuf, void **ppExBuf);
//static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
static int tsdbTruncateImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo);
enum {
TSDB_NO_TRUNCATE,
......@@ -74,28 +75,26 @@ enum {
TSDB_WAITING_TRUNCATE,
};
int tsdbTruncateTbl(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, CONTROL_REQ); }
int tsdbDeleteData(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, CONTROL_REQ); }
// delete
int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlInfo) {
int ret = TSDB_CODE_SUCCESS;
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) {
//tsdbTruncateImplCommon(pRepo, param, TRUNCATE_TBL_REQ);
return NULL;
}
void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param) {
//tsdbTruncateImplCommon(pRepo, param, DELETE_TBL_REQ);
return NULL;
}
if(pCtlInfo->pRsp) {
pCtlInfo->pRsp->affectedRows = htonl(23);
pCtlInfo->pRsp->code = ret;
}
return tsdbTruncateImplCommon(pRepo, pCtlInfo);
}
static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
ASSERT(param != NULL);
static int tsdbTruncateImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
int32_t code = 0;
// Step 1: check and clear cache
if ((code = tsdbTruncateCache(pRepo, param)) != 0) {
if ((code = tsdbTruncateCache(pRepo, pCtlInfo)) != 0) {
pRepo->code = terrno;
tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d failed to truncate since %s", REPO_ID(pRepo), tstrerror(terrno));
return NULL;
return -1;
}
// Step 2: truncate and rebuild DFileSets
......@@ -104,7 +103,7 @@ static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T ty
pRepo->truncateState = TSDB_NO_TRUNCATE;
tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d truncate over, no meta or data file", REPO_ID(pRepo));
return NULL;
return -1;
}
tsdbStartTruncate(pRepo);
......@@ -114,18 +113,18 @@ static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T ty
goto _err;
}
if (tsdbTruncateTSData(pRepo, param, type) < 0) {
if (tsdbTruncateTSData(pRepo, pCtlInfo) < 0) {
tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
tsdbEndTruncate(pRepo, TSDB_CODE_SUCCESS);
return NULL;
return TSDB_CODE_SUCCESS;
_err:
pRepo->code = terrno;
tsdbEndTruncate(pRepo, terrno);
return NULL;
return -1;
}
static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) {
......@@ -146,28 +145,6 @@ static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) {
return 0;
}
static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
// avoid repeated input of commands by end users in a short period of time
if (pRepo->truncateState != TSDB_NO_TRUNCATE) {
tsdbInfo("vgId:%d retry later as tsdb in truncating state", REPO_ID(pRepo));
return -1;
}
pRepo->truncateState = TSDB_WAITING_TRUNCATE;
// flush the mem data to disk synchronously(have impact on the compression rate)
if (tsdbSyncCommit(pRepo) < 0) {
return -1;
}
// truncate
tsem_wait(&(pRepo->readyToCommit));
int code = tsdbScheduleCommit(pRepo, param, type);
if (code < 0) {
tsem_post(&(pRepo->readyToCommit));
}
return code;
}
static void tsdbStartTruncate(STsdbRepo *pRepo) {
assert(pRepo->truncateState != TSDB_IN_TRUNCATE);
tsdbInfo("vgId:%d start to truncate!", REPO_ID(pRepo));
......@@ -193,23 +170,22 @@ static int tsdbTruncateMeta(STsdbRepo *pRepo) {
return 0;
}
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
STruncateH truncateH = {0};
SDFileSet * pSet = NULL;
STruncateTblMsg *pMsg = (STruncateTblMsg *)param;
tsdbDebug("vgId:%d start to truncate TS data for %" PRIu64, REPO_ID(pRepo), pMsg->uid);
tsdbDebug("vgId:%d start to truncate TS data for %" PRIu64, REPO_ID(pRepo), pCtlInfo->uid);
if (tsdbInitTruncateH(&truncateH, pRepo) < 0) {
return -1;
}
truncateH.param = pMsg;
truncateH.type = type;
truncateH.pCtlInfo = pCtlInfo;
STimeWindow win = pCtlInfo->ctlData.win;
int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision);
int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision);
ASSERT(sFid <= eFid);
while ((pSet = tsdbFSIterNext(&(truncateH.fsIter)))) {
......@@ -236,13 +212,13 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
}
#endif
if (truncateH.type == TRUNCATE_TBL_REQ) {
if (pCtlInfo->ctlData.command == CMD_TRUNCATE) {
if (tsdbTruncateFSet(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH);
tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
} else if (truncateH.type == DELETE_TBL_REQ) {
} else if (pCtlInfo->ctlData.command == CMD_DELETE_DATA) {
if (tsdbDeleteFSet(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH);
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
......@@ -259,68 +235,68 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
return 0;
}
static int tsdbDeleteFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
static int tsdbDeleteFSet(STruncateH *prh, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh);
SDiskID did = {0};
tsdbDebug("vgId:%d start to truncate data in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
if (tsdbTruncateFSetInit(pTruncateH, pSet) < 0) {
if (tsdbTruncateFSetInit(prh, pSet) < 0) {
return -1;
}
// Create new fset as deleted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id));
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(prh->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
tsdbTruncateFSetEnd(prh);
return -1;
}
tsdbInitDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
tsdbInitDFileSet(TSDB_TRUNCATE_WSET(prh), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER);
if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), true) < 0) {
if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(prh), true) < 0) {
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
tsdbTruncateFSetEnd(prh);
return -1;
}
if (tsdbDeleteFSetImpl(pTruncateH) < 0) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbTruncateFSetEnd(pTruncateH);
if (tsdbDeleteFSetImpl(prh) < 0) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh));
tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(prh));
tsdbTruncateFSetEnd(prh);
return -1;
}
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(pTruncateH));
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(prh));
tsdbDebug("vgId:%d FSET %d truncate data over", REPO_ID(pRepo), pSet->fid);
tsdbTruncateFSetEnd(pTruncateH);
tsdbTruncateFSetEnd(prh);
return 0;
}
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
static int tsdbTruncateFSet(STruncateH *prh, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh);
SDiskID did = {0};
SDFileSet *pWSet = TSDB_TRUNCATE_WSET(pTruncateH);
SDFileSet *pWSet = TSDB_TRUNCATE_WSET(prh);
tsdbDebug("vgId:%d start to truncate table in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
if (tsdbTruncateFSetInit(pTruncateH, pSet) < 0) {
if (tsdbTruncateFSetInit(prh, pSet) < 0) {
return -1;
}
// Create new fset as truncated fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id));
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(prh->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
tsdbTruncateFSetEnd(prh);
return -1;
}
......@@ -344,81 +320,81 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
return -1;
}
if (tsdbTruncateFSetImpl(pTruncateH) < 0) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbTruncateFSetEnd(pTruncateH);
if (tsdbTruncateFSetImpl(prh) < 0) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh));
tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(prh));
tsdbTruncateFSetEnd(prh);
return -1;
}
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(pTruncateH));
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(prh));
tsdbDebug("vgId:%d FSET %d truncate table over", REPO_ID(pRepo), pSet->fid);
tsdbTruncateFSetEnd(pTruncateH);
tsdbTruncateFSetEnd(prh);
return 0;
}
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) {
static int tsdbInitTruncateH(STruncateH *prh, STsdbRepo *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pTruncateH, 0, sizeof(*pTruncateH));
memset(prh, 0, sizeof(*prh));
TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(pTruncateH));
TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(prh));
tsdbGetRtnSnap(pRepo, &(pTruncateH->rtn));
tsdbFSIterInit(&(pTruncateH->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
tsdbGetRtnSnap(pRepo, &(prh->rtn));
tsdbFSIterInit(&(prh->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
if (tsdbInitReadH(&(pTruncateH->readh), pRepo) < 0) {
if (tsdbInitReadH(&(prh->readh), pRepo) < 0) {
return -1;
}
if (tsdbInitTruncateTblArray(pTruncateH) < 0) {
tsdbDestroyTruncateH(pTruncateH);
if (tsdbInitTruncateTblArray(prh) < 0) {
tsdbDestroyTruncateH(prh);
return -1;
}
pTruncateH->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pTruncateH->aBlkIdx == NULL) {
prh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (prh->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(pTruncateH);
tsdbDestroyTruncateH(prh);
return -1;
}
pTruncateH->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (pTruncateH->aSupBlk == NULL) {
prh->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (prh->aSupBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(pTruncateH);
tsdbDestroyTruncateH(prh);
return -1;
}
pTruncateH->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
if (pTruncateH->pDCols == NULL) {
prh->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
if (prh->pDCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(pTruncateH);
tsdbDestroyTruncateH(prh);
return -1;
}
return 0;
}
static void tsdbDestroyTruncateH(STruncateH *pTruncateH) {
pTruncateH->pDCols = tdFreeDataCols(pTruncateH->pDCols);
pTruncateH->aSupBlk = taosArrayDestroy(&pTruncateH->aSupBlk);
pTruncateH->aBlkIdx = taosArrayDestroy(&pTruncateH->aBlkIdx);
tsdbDestroyTruncateTblArray(pTruncateH);
tsdbDestroyReadH(&(pTruncateH->readh));
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
static void tsdbDestroyTruncateH(STruncateH *prh) {
prh->pDCols = tdFreeDataCols(prh->pDCols);
prh->aSupBlk = taosArrayDestroy(&prh->aSupBlk);
prh->aBlkIdx = taosArrayDestroy(&prh->aBlkIdx);
tsdbDestroyTruncateTblArray(prh);
tsdbDestroyReadH(&(prh->readh));
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(prh));
}
static int tsdbInitTruncateTblArray(STruncateH *pTruncateH) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
static int tsdbInitTruncateTblArray(STruncateH *prh) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh);
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
pTruncateH->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH));
if (pTruncateH->tblArray == NULL) {
prh->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH));
if (prh->tblArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
......@@ -432,7 +408,7 @@ static int tsdbInitTruncateTblArray(STruncateH *pTruncateH) {
ch.pTable = pMeta->tables[i];
}
if (taosArrayPush(pTruncateH->tblArray, &ch) == NULL) {
if (taosArrayPush(prh->tblArray, &ch) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
......@@ -443,46 +419,46 @@ static int tsdbInitTruncateTblArray(STruncateH *pTruncateH) {
return 0;
}
static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH) {
STableTruncateH *pTblHandle = NULL;
static void tsdbDestroyTruncateTblArray(STruncateH *prh) {
STableTruncateH *pHandle = NULL;
if (pTruncateH->tblArray == NULL) return;
if (prh->tblArray == NULL) return;
for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tblArray); ++i) {
pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, i);
if (pTblHandle->pTable) {
tsdbUnRefTable(pTblHandle->pTable);
for (size_t i = 0; i < taosArrayGetSize(prh->tblArray); ++i) {
pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, i);
if (pHandle->pTable) {
tsdbUnRefTable(pHandle->pTable);
}
tfree(pTblHandle->pInfo);
tfree(pHandle->pInfo);
}
pTruncateH->tblArray = taosArrayDestroy(&pTruncateH->tblArray);
prh->tblArray = taosArrayDestroy(&prh->tblArray);
}
static int tsdbCacheFSetIndex(STruncateH *pTruncateH) {
SReadH *pReadH = &(pTruncateH->readh);
static int tsdbCacheFSetIndex(STruncateH *prh) {
SReadH *pReadH = &(prh->readh);
if (tsdbLoadBlockIdx(pReadH) < 0) {
return -1;
}
size_t tblArraySize = taosArrayGetSize(pTruncateH->tblArray);
size_t tblArraySize = taosArrayGetSize(prh->tblArray);
for (size_t tid = 1; tid < tblArraySize; ++tid) {
STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid);
pTblHandle->pBlkIdx = NULL;
STableTruncateH *pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, tid);
pHandle->pBlkIdx = NULL;
if (pTblHandle->pTable == NULL) continue;
if (tsdbSetReadTable(pReadH, pTblHandle->pTable) < 0) {
if (pHandle->pTable == NULL) continue;
if (tsdbSetReadTable(pReadH, pHandle->pTable) < 0) {
return -1;
}
if (pReadH->pBlkIdx == NULL) continue;
pTblHandle->bIndex = *(pReadH->pBlkIdx);
pTblHandle->pBlkIdx = &(pTblHandle->bIndex);
pHandle->bIndex = *(pReadH->pBlkIdx);
pHandle->pBlkIdx = &(pHandle->bIndex);
uint32_t originLen = 0;
if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTblHandle->pInfo)), &originLen) < 0) {
if (tsdbLoadBlockInfo(pReadH, (void **)(&(pHandle->pInfo)), &originLen) < 0) {
return -1;
}
}
......@@ -490,26 +466,26 @@ static int tsdbCacheFSetIndex(STruncateH *pTruncateH) {
return 0;
}
static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet) {
taosArrayClear(pTruncateH->aBlkIdx);
taosArrayClear(pTruncateH->aSupBlk);
static int tsdbTruncateFSetInit(STruncateH *prh, SDFileSet *pSet) {
taosArrayClear(prh->aBlkIdx);
taosArrayClear(prh->aSupBlk);
if (tsdbSetAndOpenReadFSet(&(pTruncateH->readh), pSet) < 0) {
if (tsdbSetAndOpenReadFSet(&(prh->readh), pSet) < 0) {
return -1;
}
if (tsdbCacheFSetIndex(pTruncateH) < 0) {
tsdbCloseAndUnsetFSet(&(pTruncateH->readh));
if (tsdbCacheFSetIndex(prh) < 0) {
tsdbCloseAndUnsetFSet(&(prh->readh));
return -1;
}
return 0;
}
static void tsdbTruncateFSetEnd(STruncateH *pTruncateH) { tsdbCloseAndUnsetFSet(&(pTruncateH->readh)); }
static void tsdbTruncateFSetEnd(STruncateH *prh) { tsdbCloseAndUnsetFSet(&(prh->readh)); }
static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock) {
// STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param;
static bool tsdbBlockInterleaved(STruncateH *prh, SBlock *pBlock) {
// STruncateTblMsg *pMsg = (STruncateTblMsg *)prh->param;
// for (uint16_t i = 0; i < pMsg->nSpan; ++i) {
// STimeWindow tw = pMsg->span[i];
// if (!(pBlock->keyFirst > tw.ekey || pBlock->keyLast < tw.skey)) {
......@@ -520,9 +496,9 @@ static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock) {
return true;
}
static int32_t tsdbFilterDataCols(STruncateH *pTruncateH, SDataCols *pSrcDCols) {
STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param;
SDataCols * pDstDCols = pTruncateH->pDCols;
static int32_t tsdbFilterDataCols(STruncateH *prh, SDataCols *pSrcDCols) {
SDataCols * pDstDCols = prh->pDCols;
SControlData* pCtlData = &prh->pCtlInfo->ctlData;
tdResetDataCols(pDstDCols);
pDstDCols->maxCols = pSrcDCols->maxCols;
......@@ -532,7 +508,7 @@ static int32_t tsdbFilterDataCols(STruncateH *pTruncateH, SDataCols *pSrcDCols)
for (int i = 0; i < pSrcDCols->numOfRows; ++i) {
int64_t tsKey = *(int64_t *)tdGetColDataOfRow(pSrcDCols->cols, i);
if ((tsKey >= pMsg->span[0].skey) && (tsKey <= pMsg->span[0].ekey)) {
if ((tsKey >= pCtlData->win.skey) && (tsKey <= pCtlData->win.ekey)) {
printf("tsKey %" PRId64 " is filtered\n", tsKey);
continue;
}
......@@ -548,39 +524,38 @@ static int32_t tsdbFilterDataCols(STruncateH *pTruncateH, SDataCols *pSrcDCols)
return 0;
}
static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param;
// SReadH * pReadh = &(pTruncateH->readh);
static int tsdbTruncateFSetImpl(STruncateH *prh) {
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(prh);
// SReadH * pReadh = &(prh->readh);
SBlockIdx * pBlkIdx = NULL;
void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH));
// void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH));
// void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH));
void ** ppBuf = &(TSDB_TRUNCATE_BUF(prh));
// void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(prh));
// void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(prh));
taosArrayClear(pTruncateH->aBlkIdx);
taosArrayClear(prh->aBlkIdx);
for (size_t tid = 1; tid < taosArrayGetSize(pTruncateH->tblArray); ++tid) {
STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid);
pBlkIdx = pTblHandle->pBlkIdx;
for (size_t tid = 1; tid < taosArrayGetSize(prh->tblArray); ++tid) {
STableTruncateH *pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, tid);
pBlkIdx = pHandle->pBlkIdx;
if (pTblHandle->pTable == NULL || pTblHandle->pBlkIdx == NULL) continue;
if (pHandle->pTable == NULL || pHandle->pBlkIdx == NULL) continue;
taosArrayClear(pTruncateH->aSupBlk);
taosArrayClear(prh->aSupBlk);
uint64_t uid = pTblHandle->pTable->tableId.uid;
uint64_t uid = pHandle->pTable->tableId.uid;
if (uid != pMsg->uid) {
if ((pBlkIdx->numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (const void *)(pBlkIdx)) == NULL)) {
if (uid != prh->pCtlInfo->uid) {
if ((pBlkIdx->numOfBlocks > 0) && (taosArrayPush(prh->aBlkIdx, (const void *)(pBlkIdx)) == NULL)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
} else {
// Loop to mark delete flag for each block data
tsdbDebug("vgId:%d uid %" PRIu64 " matched to truncate", REPO_ID(pRepo), uid);
// for (int i = 0; i < pTblHandle->pBlkIdx->numOfBlocks; ++i) {
// SBlock *pBlock = pTblHandle->pInfo->blocks + i;
// for (int i = 0; i < pHandle->pBlkIdx->numOfBlocks; ++i) {
// SBlock *pBlock = pHandle->pInfo->blocks + i;
// if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDCols, ppBuf, ppCBuf, ppExBuf) <
// if (tsdbWriteBlockToRightFile(prh, pHandle->pTable, prh->pDCols, ppBuf, ppCBuf, ppExBuf) <
// 0) {
// return -1;
// }
......@@ -588,44 +563,43 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
}
}
if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTruncateH->aBlkIdx, ppBuf) < 0) {
if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(prh), prh->aBlkIdx, ppBuf) < 0) {
return -1;
}
return 0;
}
static int tsdbDeleteFSetImpl(STruncateH *pTruncateH) {
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param;
static int tsdbDeleteFSetImpl(STruncateH *prh) {
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(prh);
// STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pTruncateH->readh);
SReadH * pReadh = &(prh->readh);
SBlockIdx blkIdx = {0};
void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH));
void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH));
void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH));
void ** ppBuf = &(TSDB_TRUNCATE_BUF(prh));
void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(prh));
void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(prh));
// int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
taosArrayClear(pTruncateH->aBlkIdx);
taosArrayClear(prh->aBlkIdx);
for (size_t tid = 1; tid < taosArrayGetSize(pTruncateH->tblArray); ++tid) {
STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid);
for (size_t tid = 1; tid < taosArrayGetSize(prh->tblArray); ++tid) {
STableTruncateH *pHandle = (STableTruncateH *)taosArrayGet(prh->tblArray, tid);
STSchema * pSchema = NULL;
if (pTblHandle->pTable == NULL || pTblHandle->pBlkIdx == NULL) continue;
if (pHandle->pTable == NULL || pHandle->pBlkIdx == NULL) continue;
if ((pSchema = tsdbGetTableSchemaImpl(pTblHandle->pTable, true, true, -1, -1)) == NULL) {
if ((pSchema = tsdbGetTableSchemaImpl(pHandle->pTable, true, true, -1, -1)) == NULL) {
return -1;
}
taosArrayClear(pTruncateH->aSupBlk);
taosArrayClear(prh->aSupBlk);
uint64_t uid = pTblHandle->pTable->tableId.uid;
uint64_t uid = pHandle->pTable->tableId.uid;
// if(uid != pMsg->uid) {
// TODO: copy the block data directly
// }
if ((tdInitDataCols(pTruncateH->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
if ((tdInitDataCols(prh->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdFreeSchema(pSchema);
......@@ -635,57 +609,57 @@ static int tsdbDeleteFSetImpl(STruncateH *pTruncateH) {
tdFreeSchema(pSchema);
// Loop to truncate each block data
for (int i = 0; i < pTblHandle->pBlkIdx->numOfBlocks; ++i) {
SBlock *pBlock = pTblHandle->pInfo->blocks + i;
for (int i = 0; i < pHandle->pBlkIdx->numOfBlocks; ++i) {
SBlock *pBlock = pHandle->pInfo->blocks + i;
// Copy the Blocks directly if TS is not interleaved.
if (!tsdbBlockInterleaved(pTruncateH, pBlock)) {
if (!tsdbBlockInterleaved(prh, pBlock)) {
// tsdbWriteBlockAndDataToFile();
continue;
}
// Otherwise load the block data and copy the specific rows.
if (tsdbLoadBlockData(pReadh, pBlock, pTblHandle->pInfo) < 0) {
if (tsdbLoadBlockData(pReadh, pBlock, pHandle->pInfo) < 0) {
return -1;
}
if (uid == pMsg->uid) {
tsdbFilterDataCols(pTruncateH, pReadh->pDCols[0]);
if (uid == prh->pCtlInfo->uid) {
tsdbFilterDataCols(prh, pReadh->pDCols[0]);
tsdbDebug("vgId:%d uid %" PRIu64 " matched, filter block data from rows %d to %d rows", REPO_ID(pRepo), uid,
pReadh->pDCols[0]->numOfRows, pTruncateH->pDCols->numOfRows);
if (pTruncateH->pDCols->numOfRows <= 0) continue;
pReadh->pDCols[0]->numOfRows, prh->pDCols->numOfRows);
if (prh->pDCols->numOfRows <= 0) continue;
if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDCols, ppBuf, ppCBuf, ppExBuf) < 0) {
if (tsdbWriteBlockToRightFile(prh, pHandle->pTable, prh->pDCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
} else {
tsdbDebug("vgId:%d uid %" PRIu64 " not matched, copy block data directly\n", REPO_ID(pRepo), uid);
if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) {
if (tsdbWriteBlockToRightFile(prh, pHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
}
}
if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTblHandle->pTable, pTruncateH->aSupBlk, NULL,
if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(prh), pHandle->pTable, prh->aSupBlk, NULL,
ppBuf, &blkIdx) < 0) {
return -1;
}
if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (const void *)(&blkIdx)) == NULL)) {
if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(prh->aBlkIdx, (const void *)(&blkIdx)) == NULL)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTruncateH->aBlkIdx, ppBuf) < 0) {
if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(prh), prh->aBlkIdx, ppBuf) < 0) {
return -1;
}
return 0;
}
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf,
static int tsdbWriteBlockToRightFile(STruncateH *prh, STable *pTable, SDataCols *pDCols, void **ppBuf,
void **ppCBuf, void **ppExBuf) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(prh);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile = NULL;
bool isLast = false;
......@@ -694,54 +668,23 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa
ASSERT(pDCols->numOfRows > 0);
if (pDCols->numOfRows < pCfg->minRowsPerFileBlock) {
pDFile = TSDB_TRUNCATE_LAST_FILE(pTruncateH);
pDFile = TSDB_TRUNCATE_LAST_FILE(prh);
isLast = true;
} else {
pDFile = TSDB_TRUNCATE_DATA_FILE(pTruncateH);
pDFile = TSDB_TRUNCATE_DATA_FILE(prh);
isLast = false;
}
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile,
isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH), pDCols,
isLast ? TSDB_TRUNCATE_SMAL_FILE(prh) : TSDB_TRUNCATE_SMAD_FILE(prh), pDCols,
&block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
if (taosArrayPush(pTruncateH->aSupBlk, (void *)(&block)) == NULL) {
if (taosArrayPush(prh->aSupBlk, (void *)(&block)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
// static int tsdbWriteBlockAndDataToFile(STruncateH *pTruncateH, STable *pTable, SBlock *pSupBlock, void **ppBuf,
// void **ppCBuf, void **ppExBuf) {
// STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
// SDFile * pDFile = NULL;
// bool isLast = false;
// ASSERT(pSupBlock->numOfRows > 0);
// if (pSupBlock->last) {
// pDFile = TSDB_TRUNCATE_LAST_FILE(pTruncateH);
// isLast = true;
// } else {
// pDFile = TSDB_TRUNCATE_DATA_FILE(pTruncateH);
// isLast = false;
// }
// if (tsdbWriteBlockImpl(pRepo, pTable, pDFile,
// isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH),
// pDCols, &block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) {
// return -1;
// }
// if (taosArrayPush(pTruncateH->aSupBlk, (void *)(&block)) == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return -1;
// }
// return 0;
// }
*/
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册