未验证 提交 bc487492 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21764 from taosdata/enh/TD-24346-3.0e

enh: support delete tsma interval
...@@ -29,7 +29,7 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { ...@@ -29,7 +29,7 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) { if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno)); smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
} }
return code; return code;
...@@ -346,6 +346,43 @@ _end: ...@@ -346,6 +346,43 @@ _end:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) {
int32_t code = 0;
int32_t lino = 0;
if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
int32_t len = 0;
tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
TSDB_CHECK_CODE(code, lino, _exit);
void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
if (!pBuf) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
SEncoder encoder;
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
tEncodeSBatchDeleteReq(&encoder, pDelReq);
tEncoderClear(&encoder);
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
taosArrayDestroy(pDelReq->deleteReqs);
if (code) {
smaError("vgId:%d, failed at line %d to process delete req for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), lino,
indexUid, tstrerror(code));
}
return code;
}
/** /**
* @brief Insert/Update Time-range-wise SMA data. * @brief Insert/Update Time-range-wise SMA data.
* *
...@@ -355,7 +392,6 @@ _end: ...@@ -355,7 +392,6 @@ _end:
*/ */
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
const SArray *pDataBlocks = (const SArray *)msg; const SArray *pDataBlocks = (const SArray *)msg;
// TODO: destroy SSDataBlocks(msg)
if (!pDataBlocks) { if (!pDataBlocks) {
terrno = TSDB_CODE_TSMA_INVALID_PTR; terrno = TSDB_CODE_TSMA_INVALID_PTR;
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma)); smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma));
...@@ -419,8 +455,10 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char ...@@ -419,8 +455,10 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
goto _err; goto _err;
} }
// TODO deleteReq if ((terrno = tsmaProcessDelReq(pSma, indexUid, &deleteReq)) != 0) {
taosArrayDestroy(deleteReq.deleteReqs); goto _err;
}
#if 0 #if 0
if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) { if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
......
...@@ -593,9 +593,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -593,9 +593,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
} }
} }
// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
// blockDebugShowDataBlocks(data, __func__); // blockDebugShowDataBlocks(data, __func__);
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
} }
......
...@@ -414,6 +414,21 @@ if $data05 != 30.000000000 then ...@@ -414,6 +414,21 @@ if $data05 != 30.000000000 then
return -1 return -1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT sql delete from stb;
print =============== query after delete in common vgroups
sql select _wstart, _wend, min(c1),max(c2),max(c1),max(c3) from stb interval(5m,10s) sliding(5m) order by _wstart;
if $rows != 0 then
print rows $rows != 0
return -1
endi
sleep 2000
print =============== query after delete in designated vgroups
sql select _wend, min(c1),max(c2),max(c1) from stb interval(5m,10s) sliding(5m) order by _wstart;
if $rows != 0 then
print rows $rows != 0
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册