提交 15be8ab1 编写于 作者: C Cary Xu

support delete

上级 4d6c9c1d
...@@ -198,9 +198,13 @@ static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -198,9 +198,13 @@ static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
return vnodeCompact(pCompactVnode->vgId); return vnodeCompact(pCompactVnode->vgId);
} }
#else #else
// static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
// STruncateTblMsg *pTruncateMsg = rpcMsg->pCont;
// return vnodeTruncateTbl(pTruncateMsg);
// }
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
STruncateTblMsg *pTruncateMsg = rpcMsg->pCont; STruncateTblMsg *pTruncateMsg = rpcMsg->pCont;
return vnodeTruncate(pTruncateMsg); return vnodeTruncateTbl(pTruncateMsg);
} }
#endif #endif
......
...@@ -410,6 +410,14 @@ typedef struct { ...@@ -410,6 +410,14 @@ typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
STimeWindow span[]; STimeWindow span[];
} STruncateTblMsg; } STruncateTblMsg;
typedef struct {
int32_t contLen;
int32_t vgId;
uint64_t uid;
uint16_t nSpan;
char tableFname[TSDB_TABLE_FNAME_LEN];
STimeWindow span[];
} SDeleteTblMsg;
// N.B. JUST Utility for DEMO Implementation(not formal definition) // N.B. JUST Utility for DEMO Implementation(not formal definition)
typedef struct SColIndex { typedef struct SColIndex {
int16_t colId; // column id int16_t colId; // column id
......
...@@ -63,7 +63,8 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); ...@@ -63,7 +63,8 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId); int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
int32_t vnodeCompact(int32_t vgId); int32_t vnodeCompact(int32_t vgId);
int32_t vnodeTruncate(STruncateTblMsg *pMsg); int32_t vnodeTruncateTbl(STruncateTblMsg *pMsg);
int32_t vnodeDeleteTbl(SDeleteTblMsg *pMsg);
// vnodeMgmt // vnodeMgmt
int32_t vnodeInitMgmt(); int32_t vnodeInitMgmt();
......
...@@ -16,7 +16,13 @@ ...@@ -16,7 +16,13 @@
#ifndef _TD_TSDB_COMMIT_QUEUE_H_ #ifndef _TD_TSDB_COMMIT_QUEUE_H_
#define _TD_TSDB_COMMIT_QUEUE_H_ #define _TD_TSDB_COMMIT_QUEUE_H_
typedef enum { COMMIT_REQ, COMPACT_REQ, TRUNCATE_REQ, COMMIT_CONFIG_REQ } TSDB_REQ_T; typedef enum {
COMMIT_REQ,
COMPACT_REQ,
TRUNCATE_REQ,
DELETE_REQ,
COMMIT_CONFIG_REQ,
} TSDB_REQ_T;
int tsdbScheduleCommit(STsdbRepo *pRepo, void* param, TSDB_REQ_T req); int tsdbScheduleCommit(STsdbRepo *pRepo, void* param, TSDB_REQ_T req);
......
...@@ -153,9 +153,10 @@ static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) { ...@@ -153,9 +153,10 @@ static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) {
} }
typedef struct { typedef struct {
int32_t delimiter; // For recovery usage int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage int32_t delete : 1; // For recovery usage(not included when calculating checksum)
uint64_t uid; // For recovery usage int32_t numOfCols : 31; // For recovery usage
uint64_t uid; // For recovery usage
SBlockCol cols[]; SBlockCol cols[];
} SBlockData; } SBlockData;
......
...@@ -20,6 +20,7 @@ extern "C" { ...@@ -20,6 +20,7 @@ extern "C" {
#endif #endif
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param); void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param);
void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -194,6 +194,8 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -194,6 +194,8 @@ static void *tsdbLoopCommit(void *arg) {
tsdbCompactImpl(pRepo); tsdbCompactImpl(pRepo);
} else if (req == TRUNCATE_REQ) { } else if (req == TRUNCATE_REQ) {
tsdbTruncateImpl(pRepo, param); tsdbTruncateImpl(pRepo, param);
} else if (req == DELETE_REQ) {
tsdbDeleteImpl(pRepo, param);
} else if (req == COMMIT_CONFIG_REQ) { } else if (req == COMMIT_CONFIG_REQ) {
ASSERT(pRepo->config_changed); ASSERT(pRepo->config_changed);
tsdbApplyRepoConfig(pRepo); tsdbApplyRepoConfig(pRepo);
......
...@@ -651,6 +651,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat ...@@ -651,6 +651,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
return -1; return -1;
} }
pBlockData->delete = 0; // ignore delete flag
int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer); int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) { if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
......
...@@ -30,7 +30,8 @@ typedef struct { ...@@ -30,7 +30,8 @@ typedef struct {
SArray * aBlkIdx; SArray * aBlkIdx;
SArray * aSupBlk; SArray * aSupBlk;
SDataCols *pDCols; SDataCols *pDCols;
void * param; // STruncateTblMsg *pMsg void * param; // STruncateTblMsg or SDeleteTblMsg
TSDB_REQ_T type; // truncate or delete
} STruncateH; } STruncateH;
#define TSDB_TRUNCATE_WSET(pTruncateH) (&((pTruncateH)->wSet)) #define TSDB_TRUNCATE_WSET(pTruncateH) (&((pTruncateH)->wSet))
...@@ -44,24 +45,25 @@ typedef struct { ...@@ -44,24 +45,25 @@ typedef struct {
#define TSDB_TRUNCATE_COMP_BUF(pTruncateH) TSDB_READ_COMP_BUF(&((pTruncateH)->readh)) #define TSDB_TRUNCATE_COMP_BUF(pTruncateH) TSDB_READ_COMP_BUF(&((pTruncateH)->readh))
#define TSDB_TRUNCATE_EXBUF(pTruncateH) TSDB_READ_EXBUF(&((pTruncateH)->readh)) #define TSDB_TRUNCATE_EXBUF(pTruncateH) TSDB_READ_EXBUF(&((pTruncateH)->readh))
static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param); static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
static void tsdbStartTruncate(STsdbRepo *pRepo); static void tsdbStartTruncate(STsdbRepo *pRepo);
static void tsdbEndTruncate(STsdbRepo *pRepo, int eno); static void tsdbEndTruncate(STsdbRepo *pRepo, int eno);
static int tsdbTruncateMeta(STsdbRepo *pRepo); static int tsdbTruncateMeta(STsdbRepo *pRepo);
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param); static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet); static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo); static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *pTruncateH); static void tsdbDestroyTruncateH(STruncateH *pTruncateH);
static int tsdbInitTruncateTblArray(STruncateH *pTruncateH); static int tsdbInitTruncateTblArray(STruncateH *pTruncateH);
static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH); static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH);
static int tsdbCacheFSetIndex(STruncateH *pTruncateH); static int tsdbCacheFSetIndex(STruncateH *pTruncateH);
static int tsdbTruncateCache(STsdbRepo *pRepo, void *param); static int tsdbTruncateCache(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet); static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet);
static void tsdbTruncateFSetEnd(STruncateH *pTruncateH); static void tsdbTruncateFSetEnd(STruncateH *pTruncateH);
static int tsdbTruncateFSetImpl(STruncateH *pTruncateH); static int tsdbTruncateFSetImpl(STruncateH *pTruncateH);
static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock); static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock);
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf, static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf,
void **ppCBuf, void **ppExBuf); void **ppCBuf, void **ppExBuf);
static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
enum { enum {
TSDB_NO_TRUNCATE, TSDB_NO_TRUNCATE,
...@@ -69,9 +71,19 @@ enum { ...@@ -69,9 +71,19 @@ enum {
TSDB_WAITING_TRUNCATE, TSDB_WAITING_TRUNCATE,
}; };
int tsdbTruncate(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param); } int tsdbTruncate(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, TRUNCATE_REQ); }
int tsdbDelete(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, DELETE_REQ); }
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) { void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) {
tsdbTruncateImplCommon(pRepo, param, TRUNCATE_REQ);
return NULL;
}
void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param) {
tsdbTruncateImplCommon(pRepo, param, DELETE_REQ);
return NULL;
}
static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
ASSERT(param != NULL); ASSERT(param != NULL);
int32_t code = 0; int32_t code = 0;
// Step 1: check and clear cache // Step 1: check and clear cache
...@@ -98,7 +110,7 @@ void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) { ...@@ -98,7 +110,7 @@ void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) {
goto _err; goto _err;
} }
if (tsdbTruncateTSData(pRepo, param) < 0) { if (tsdbTruncateTSData(pRepo, param, type) < 0) {
tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; goto _err;
} }
...@@ -130,7 +142,7 @@ static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) { ...@@ -130,7 +142,7 @@ static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) {
return 0; return 0;
} }
static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param) { static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
// avoid repeated input of commands by end users in a short period of time // avoid repeated input of commands by end users in a short period of time
if (pRepo->truncateState != TSDB_NO_TRUNCATE) { if (pRepo->truncateState != TSDB_NO_TRUNCATE) {
tsdbInfo("vgId:%d retry later as tsdb in truncating state", REPO_ID(pRepo)); tsdbInfo("vgId:%d retry later as tsdb in truncating state", REPO_ID(pRepo));
...@@ -145,7 +157,7 @@ static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param) { ...@@ -145,7 +157,7 @@ static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param) {
// truncate // truncate
tsem_wait(&(pRepo->readyToCommit)); tsem_wait(&(pRepo->readyToCommit));
int code = tsdbScheduleCommit(pRepo, param, TRUNCATE_REQ); int code = tsdbScheduleCommit(pRepo, param, type);
if (code < 0) { if (code < 0) {
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
} }
...@@ -177,7 +189,7 @@ static int tsdbTruncateMeta(STsdbRepo *pRepo) { ...@@ -177,7 +189,7 @@ static int tsdbTruncateMeta(STsdbRepo *pRepo) {
return 0; return 0;
} }
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) { static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
STruncateH truncateH = {0}; STruncateH truncateH = {0};
SDFileSet * pSet = NULL; SDFileSet * pSet = NULL;
...@@ -188,7 +200,9 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) { ...@@ -188,7 +200,9 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) {
if (tsdbInitTruncateH(&truncateH, pRepo) < 0) { if (tsdbInitTruncateH(&truncateH, pRepo) < 0) {
return -1; return -1;
} }
truncateH.param = pMsg; truncateH.param = pMsg;
truncateH.type = type;
int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision); int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision); int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision);
......
...@@ -129,7 +129,36 @@ int32_t vnodeCompact(int32_t vgId) { ...@@ -129,7 +129,36 @@ int32_t vnodeCompact(int32_t vgId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeTruncate(STruncateTblMsg *pMsg) { int32_t vnodeTruncateTbl(STruncateTblMsg *pMsg) {
// build test data
// pMsg->vgId = 2;
// pMsg->uid = 562949986978701;
// pMsg->nSpan = 1;
// pMsg->span = malloc(pMsg->nSpan * sizeof(STimeWindow));
int32_t vgId = 2;
void * pVnode = vnodeAcquire(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, truncate table %s msg is received", vgId, pMsg->tableFname);
// not care success or not
STruncateTblMsg *param = (STruncateTblMsg *)calloc(1, sizeof(STruncateTblMsg) + pMsg->nSpan * sizeof(STimeWindow));
param->vgId = 2;
param->uid = 562949986979009;
param->nSpan = 1;
param->span[0].skey = 1634701320001;
param->span[0].ekey = 1634701320001;
if (tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
tfree(param);
}
vnodeRelease(pVnode);
} else {
vInfo("vgId:%d, vnode not exist, can't truncate table %s in it", vgId, pMsg->tableFname);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
return TSDB_CODE_SUCCESS;
}
int32_t vnodeDeleteTbl(SDeleteTblMsg *pMsg) {
// build test data // build test data
// pMsg->vgId = 2; // pMsg->vgId = 2;
// pMsg->uid = 562949986978701; // pMsg->uid = 562949986978701;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册