From c191f44b1426b5bb81c139c4e75851a3d6c7bebd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 11 Aug 2022 12:01:20 +0800 Subject: [PATCH] feat(stream): session window trigger delete --- include/common/tcommon.h | 21 +++++--- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/sma/smaTimeRange.c | 5 +- source/dnode/vnode/src/tq/tqSink.c | 62 ++++++++++++++++++++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 3 +- source/libs/executor/inc/executorimpl.h | 7 --- 6 files changed, 76 insertions(+), 24 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index be18ef1fc0..e04d9d5e86 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -103,12 +103,12 @@ typedef struct SDataBlockInfo { int16_t hasVarCol; uint32_t capacity; // TODO: optimize and remove following - int64_t version; // used for stream, and need serialization - int64_t ts; // used for stream, and need serialization - int32_t childId; // used for stream, do not serialize - EStreamType type; // used for stream, do not serialize - STimeWindow calWin; // used for stream, do not serialize - TSKEY watermark;// used for stream + int64_t version; // used for stream, and need serialization + int64_t ts; // used for stream, and need serialization + int32_t childId; // used for stream, do not serialize + EStreamType type; // used for stream, do not serialize + STimeWindow calWin; // used for stream, do not serialize + TSKEY watermark; // used for stream } SDataBlockInfo; typedef struct SSDataBlock { @@ -268,6 +268,15 @@ typedef struct SSortExecInfo { int32_t readBytes; // read io bytes } SSortExecInfo; +// stream special block column + +#define START_TS_COLUMN_INDEX 0 +#define END_TS_COLUMN_INDEX 1 +#define UID_COLUMN_INDEX 2 +#define GROUPID_COLUMN_INDEX 3 +#define CALCULATE_START_TS_COLUMN_INDEX 4 +#define CALCULATE_END_TS_COLUMN_INDEX 5 + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index ffc1966733..43bb92ec23 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -171,7 +171,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list); -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, +SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq); // sma diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 8da397f0c3..f46d9dc29c 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -201,8 +201,9 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { } SBatchDeleteReq deleteReq; - SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid, - pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq); + SSubmitReq *pSubmitReq = + tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid, + pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq); if (!pSubmitReq) { smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 2b3283a395..f37574a064 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -13,10 +13,44 @@ * along with this program. If not, see . */ +#include "tcommon.h" +#include "tmsg.h" #include "tq.h" -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, - const char* stbFullName, int32_t vgId, SBatchDeleteReq* deleteReq) { +int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, + SBatchDeleteReq* deleteReq) { + ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT); + int32_t totRow = pDataBlock->info.rows; + SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); + for (int32_t row = 0; row < totRow; row++) { + int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); + /*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/ + int64_t groupId = 0; + char* name = buildCtbNameByGroupId(stbFullName, groupId); + tqDebug("delete msg: groupId :%ld, name: %s", groupId, name); + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + if (metaGetTableEntryByName(&mr, name) < 0) { + metaReaderClear(&mr); + taosMemoryFree(name); + return -1; + } + + int64_t uid = mr.me.uid; + metaReaderClear(&mr); + taosMemoryFree(name); + SSingleDeleteReq req = { + .ts = ts, + .uid = uid, + }; + taosArrayPush(deleteReq->deleteReqs, &req); + } + return 0; +} + +SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb, + int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) { SSubmitReq* ret = NULL; SArray* schemaReqs = NULL; SArray* schemaReqSz = NULL; @@ -33,9 +67,13 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo schemaReqSz = taosArrayInit(sz, sizeof(int32_t)); for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock->info.type == STREAM_DELETE_DATA) { - // + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + int32_t padding1 = 0; + void* padding2 = taosMemoryMalloc(1); + taosArrayPush(schemaReqSz, &padding1); + taosArrayPush(schemaReqs, &padding2); } + STagVal tagVal = { .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, .type = TSDB_DATA_TYPE_UBIGINT, @@ -97,7 +135,10 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo int32_t cap = sizeof(SSubmitReq); for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - int32_t rows = pDataBlock->info.rows; + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + continue; + } + int32_t rows = pDataBlock->info.rows; // TODO min int32_t rowSize = pDataBlock->info.rowSize; int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); @@ -119,6 +160,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + pDeleteReq->suid = suid; + tdBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); + continue; + } blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->sversion = htonl(pTSchema->version); @@ -187,7 +233,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(pTask->tbSink.pTSchema); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, + SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); @@ -200,12 +246,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(0); } SEncoder encoder; - void* buf = taosMemoryCalloc(1, len + sizeof(SMsgHead)); + void* buf = rpcMallocCont(len + sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncoderInit(&encoder, abuf, len); tEncodeSBatchDeleteReq(&encoder, &deleteReq); tEncoderClear(&encoder); + ((SMsgHead*)buf)->vgId = pVnode->config.vgId; + if (taosArrayGetSize(deleteReq.deleteReqs) != 0) { SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1f13bde0c1..ecff58f3b1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -145,7 +145,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp int32_t len; int32_t ret; - vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), + vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); pVnode->state.applied = version; @@ -1071,6 +1071,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void // TODO } } + taosArrayDestroy(deleteReq.deleteReqs); return 0; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 8439cf700d..9faa1cec3e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -52,13 +52,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) -#define START_TS_COLUMN_INDEX 0 -#define END_TS_COLUMN_INDEX 1 -#define UID_COLUMN_INDEX 2 -#define GROUPID_COLUMN_INDEX 3 -#define CALCULATE_START_TS_COLUMN_INDEX 4 -#define CALCULATE_END_TS_COLUMN_INDEX 5 - enum { // when this task starts to execute, this status will set TASK_NOT_COMPLETED = 0x1u, -- GitLab