提交 564ebc1a 编写于 作者: C Cary Xu

truncate/delete

上级 15be8ab1
......@@ -22,6 +22,9 @@
extern "C" {
#endif
// global definition
extern bool isBigEndian;
// cluster
extern char tsFirst[];
extern char tsSecond[];
......
......@@ -23,10 +23,14 @@ static tsem_t exitSem;
static void siguser1Handler(int32_t signum, void *sigInfo, void *context);
static void siguser2Handler(int32_t signum, void *sigInfo, void *context);
static void sigintHandler(int32_t signum, void *sigInfo, void *context);
static const int __TEST_NUMBER__ = 1;
bool isBigEndian = false;
int32_t main(int32_t argc, char *argv[]) {
int dump_config = 0;
isBigEndian = (*(char *)&__TEST_NUMBER__) == 0;
// Set global configuration file
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
......
......@@ -198,14 +198,14 @@ static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
return vnodeCompact(pCompactVnode->vgId);
}
#else
// static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
// STruncateTblMsg *pTruncateMsg = rpcMsg->pCont;
// return vnodeTruncateTbl(pTruncateMsg);
// }
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
STruncateTblMsg *pTruncateMsg = rpcMsg->pCont;
return vnodeTruncateTbl(pTruncateMsg);
}
// static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
// SDeleteDataMsg *pDeleteDataMsg = rpcMsg->pCont;
// return vnodeDeleteData(pDeleteDataMsg);
// }
#endif
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
......
......@@ -417,7 +417,7 @@ typedef struct {
uint16_t nSpan;
char tableFname[TSDB_TABLE_FNAME_LEN];
STimeWindow span[];
} SDeleteTblMsg;
} SDeleteDataMsg;
// N.B. JUST Utility for DEMO Implementation(not formal definition)
typedef struct SColIndex {
int16_t colId; // column id
......
......@@ -415,8 +415,11 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo);
// For TSDB Truncate
int tsdbTruncate(STsdbRepo *pRepo, void *param);
// For TSDB truncate table
int tsdbTruncateTbl(STsdbRepo *pRepo, void *param);
// For TSDB delete data
int tsdbDeleteData(STsdbRepo *pRepo, void *param);
// For TSDB Health Monitor
......
......@@ -64,7 +64,7 @@ int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
int32_t vnodeCompact(int32_t vgId);
int32_t vnodeTruncateTbl(STruncateTblMsg *pMsg);
int32_t vnodeDeleteTbl(SDeleteTblMsg *pMsg);
int32_t vnodeDeleteData(SDeleteDataMsg *pMsg);
// vnodeMgmt
int32_t vnodeInitMgmt();
......
......@@ -19,8 +19,8 @@
typedef enum {
COMMIT_REQ,
COMPACT_REQ,
TRUNCATE_REQ,
DELETE_REQ,
TRUNCATE_TBL_REQ,
DELETE_TBL_REQ,
COMMIT_CONFIG_REQ,
} TSDB_REQ_T;
......
......@@ -18,6 +18,7 @@
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_DELIMITER_DELETE 0xF00AFA1F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TSDB_IVLD_FID INT_MIN
#define TSDB_FILE_STATE_OK 0
......
......@@ -153,10 +153,9 @@ static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) {
}
typedef struct {
int32_t delimiter; // For recovery usage
int32_t delete : 1; // For recovery usage(not included when calculating checksum)
int32_t numOfCols : 31; // For recovery usage
uint64_t uid; // For recovery usage
int32_t delimiter; // For recovery usage(not included when calculating checksum)
int32_t numOfCols; // For recovery usage
uint64_t uid; // For recovery usage
SBlockCol cols[];
} SBlockData;
......
......@@ -192,9 +192,9 @@ static void *tsdbLoopCommit(void *arg) {
tsdbCommitData(pRepo);
} else if (req == COMPACT_REQ) {
tsdbCompactImpl(pRepo);
} else if (req == TRUNCATE_REQ) {
} else if (req == TRUNCATE_TBL_REQ) {
tsdbTruncateImpl(pRepo, param);
} else if (req == DELETE_REQ) {
} else if (req == DELETE_TBL_REQ) {
tsdbDeleteImpl(pRepo, param);
} else if (req == COMMIT_CONFIG_REQ) {
ASSERT(pRepo->config_changed);
......
......@@ -449,6 +449,8 @@ static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock) {
return -1;
}
pReadh->pBlkData->delimiter = TSDB_FILE_DELIMITER; // reset in case of TSDB_FILE_DELIMITER_DELETE
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
......@@ -651,7 +653,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
return -1;
}
pBlockData->delete = 0; // ignore delete flag
pBlockData->delimiter = TSDB_FILE_DELIMITER; // reset in case of TSDB_FILE_DELIMITER_DELETE
int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
......
......@@ -51,6 +51,7 @@ static void tsdbEndTruncate(STsdbRepo *pRepo, int eno);
static int tsdbTruncateMeta(STsdbRepo *pRepo);
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static int tsdbDeleteFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *pTruncateH);
static int tsdbInitTruncateTblArray(STruncateH *pTruncateH);
......@@ -60,6 +61,7 @@ static int tsdbTruncateCache(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet);
static void tsdbTruncateFSetEnd(STruncateH *pTruncateH);
static int tsdbTruncateFSetImpl(STruncateH *pTruncateH);
static int tsdbDeleteFSetImpl(STruncateH *pTruncateH);
static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock);
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf,
void **ppCBuf, void **ppExBuf);
......@@ -71,15 +73,15 @@ enum {
TSDB_WAITING_TRUNCATE,
};
int tsdbTruncate(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, TRUNCATE_REQ); }
int tsdbDelete(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, DELETE_REQ); }
int tsdbTruncateTbl(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, TRUNCATE_TBL_REQ); }
int tsdbDeleteData(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, DELETE_TBL_REQ); }
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) {
tsdbTruncateImplCommon(pRepo, param, TRUNCATE_REQ);
tsdbTruncateImplCommon(pRepo, param, TRUNCATE_TBL_REQ);
return NULL;
}
void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param) {
tsdbTruncateImplCommon(pRepo, param, DELETE_REQ);
tsdbTruncateImplCommon(pRepo, param, DELETE_TBL_REQ);
return NULL;
}
......@@ -231,10 +233,20 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
continue;
}
#endif
if (tsdbTruncateFSet(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH);
tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
if (truncateH.type == TRUNCATE_TBL_REQ) {
if (tsdbTruncateFSet(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH);
tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
} else if (truncateH.type == DELETE_TBL_REQ) {
if (tsdbDeleteFSet(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH);
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
} else {
ASSERT(false);
}
}
......@@ -243,22 +255,22 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
return 0;
}
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
static int tsdbDeleteFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
SDiskID did = {0};
tsdbDebug("vgId:%d start to truncate FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet),
TSDB_FSET_ID(pSet));
tsdbDebug("vgId:%d start to truncate data in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
if (tsdbTruncateFSetInit(pTruncateH, pSet) < 0) {
return -1;
}
// Create new fset as truncated fset
// Create new fset as deleted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
......@@ -267,11 +279,67 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER);
if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), true) < 0) {
tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
if (tsdbDeleteFSetImpl(pTruncateH) < 0) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(pTruncateH));
tsdbDebug("vgId:%d FSET %d truncate data over", REPO_ID(pRepo), pSet->fid);
tsdbTruncateFSetEnd(pTruncateH);
return 0;
}
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
SDiskID did = {0};
SDFileSet *pWSet = TSDB_TRUNCATE_WSET(pTruncateH);
tsdbDebug("vgId:%d start to truncate table in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
if (tsdbTruncateFSetInit(pTruncateH, pSet) < 0) {
return -1;
}
// Create new fset as truncated fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
// Only .head is created, use original .data/.last/.smad/.smal
tsdbInitDFileSetEx(pWSet, pSet);
pWSet->state = 0;
SDFile *pHeadFile = TSDB_DFILE_IN_SET(pWSet, TSDB_FILE_HEAD);
tsdbInitDFile(pHeadFile, did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pHeadFile, true, TSDB_FILE_HEAD) < 0) {
tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCloseDFile(pHeadFile);
tsdbRemoveDFile(pHeadFile);
return -1;
}
tsdbCloseDFile(pHeadFile);
if (tsdbOpenDFileSet(pWSet, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file set %d since %s", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), tstrerror(terrno));
return -1;
}
if (tsdbTruncateFSetImpl(pTruncateH) < 0) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
......@@ -281,7 +349,7 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(pTruncateH));
tsdbDebug("vgId:%d FSET %d truncate over", REPO_ID(pRepo), pSet->fid);
tsdbDebug("vgId:%d FSET %d truncate table over", REPO_ID(pRepo), pSet->fid);
tsdbTruncateFSetEnd(pTruncateH);
return 0;
......@@ -477,6 +545,53 @@ static int32_t tsdbFilterDataCols(STruncateH *pTruncateH, SDataCols *pSrcDCols)
}
static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param;
// SReadH * pReadh = &(pTruncateH->readh);
SBlockIdx * pBlkIdx = NULL;
void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH));
// void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH));
// void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH));
taosArrayClear(pTruncateH->aBlkIdx);
for (size_t tid = 1; tid < taosArrayGetSize(pTruncateH->tblArray); ++tid) {
STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid);
pBlkIdx = pTblHandle->pBlkIdx;
if (pTblHandle->pTable == NULL || pTblHandle->pBlkIdx == NULL) continue;
taosArrayClear(pTruncateH->aSupBlk);
uint64_t uid = pTblHandle->pTable->tableId.uid;
if (uid != pMsg->uid) {
if ((pBlkIdx->numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (const void *)(pBlkIdx)) == NULL)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
} else {
// Loop to mark delete flag for each block data
tsdbDebug("vgId:%d uid %" PRIu64 " matched to truncate", REPO_ID(pRepo), uid);
// for (int i = 0; i < pTblHandle->pBlkIdx->numOfBlocks; ++i) {
// SBlock *pBlock = pTblHandle->pInfo->blocks + i;
// if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDCols, ppBuf, ppCBuf, ppExBuf) <
// 0) {
// return -1;
// }
// }
}
}
if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTruncateH->aBlkIdx, ppBuf) < 0) {
return -1;
}
return 0;
}
static int tsdbDeleteFSetImpl(STruncateH *pTruncateH) {
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param;
// STsdbCfg * pCfg = REPO_CFG(pRepo);
......
......@@ -130,24 +130,18 @@ int32_t vnodeCompact(int32_t vgId) {
}
int32_t vnodeTruncateTbl(STruncateTblMsg *pMsg) {
// build test data
// pMsg->vgId = 2;
// pMsg->uid = 562949986978701;
// pMsg->nSpan = 1;
// pMsg->span = malloc(pMsg->nSpan * sizeof(STimeWindow));
int32_t vgId = 2;
int32_t vgId = 2;
void * pVnode = vnodeAcquire(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, truncate table %s msg is received", vgId, pMsg->tableFname);
// not care success or not
STruncateTblMsg *param = (STruncateTblMsg *)calloc(1, sizeof(STruncateTblMsg) + pMsg->nSpan * sizeof(STimeWindow));
param->vgId = 2;
param->uid = 562949986979009;
param->uid = 562949986978880;
param->nSpan = 1;
param->span[0].skey = 1634701320001;
param->span[0].ekey = 1634701320001;
if (tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
if (tsdbTruncateTbl(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
tfree(param);
}
vnodeRelease(pVnode);
......@@ -158,25 +152,19 @@ int32_t vnodeTruncateTbl(STruncateTblMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeDeleteTbl(SDeleteTblMsg *pMsg) {
// build test data
// pMsg->vgId = 2;
// pMsg->uid = 562949986978701;
// pMsg->nSpan = 1;
// pMsg->span = malloc(pMsg->nSpan * sizeof(STimeWindow));
int32_t vgId = 2;
int32_t vnodeDeleteData(SDeleteDataMsg *pMsg) {
int32_t vgId = 2;
void * pVnode = vnodeAcquire(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, truncate table %s msg is received", vgId, pMsg->tableFname);
// not care success or not
STruncateTblMsg *param = (STruncateTblMsg *)calloc(1, sizeof(STruncateTblMsg) + pMsg->nSpan * sizeof(STimeWindow));
SDeleteDataMsg *param = (SDeleteDataMsg *)calloc(1, sizeof(STruncateTblMsg) + pMsg->nSpan * sizeof(STimeWindow));
param->vgId = 2;
param->uid = 562949986979009;
param->uid = 562949986978880;
param->nSpan = 1;
param->span[0].skey = 1634701320001;
param->span[0].ekey = 1634701320001;
if (tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
if (tsdbDeleteData(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
tfree(param);
}
vnodeRelease(pVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册