diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d26e99f423e82c5b1ee19ae5cf68a52bd424befe..9fe01b0cf61b185f5952c7ea51d4043841935beb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1368,8 +1368,8 @@ typedef struct { int32_t numOfCols; int64_t skey; int64_t ekey; - int64_t version; // for stream - TSKEY watermark;// for stream + int64_t version; // for stream + TSKEY watermark; // for stream char data[]; } SRetrieveTableRsp; @@ -3079,6 +3079,22 @@ typedef struct SDeleteRes { int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); +typedef struct { + int64_t uid; + int64_t ts; +} SSingleDeleteReq; + +int32_t tEncodeSSingleDeleteReq(SEncoder* pCoder, const SSingleDeleteReq* pReq); +int32_t tDecodeSSingleDeleteReq(SDecoder* pCoder, SSingleDeleteReq* pReq); + +typedef struct { + int64_t suid; + SArray* deleteReqs; // SArray +} SBatchDeleteReq; + +int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq); +int32_t tDecodeSBatchDeleteReq(SDecoder* pCoder, SBatchDeleteReq* pReq); + typedef struct { int32_t msgIdx; int32_t msgType; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9ddb8720078b5cf0fb0f486c25b1afc44b3937cc..6462c7afbfa80a48219d64cbed3797317f705c16 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -202,6 +202,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) + TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b630419fc4f68197194c100aa6b67e1fd6db448e..7dd3ce34c3d52495217e27663dd2c7c044c77c4d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -305,7 +305,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) taosArrayPush(desc.subDesc, &sDesc); } } - + ASSERT(desc.subPlanNum == taosArrayGetSize(desc.subDesc)); taosArrayPush(pReq->query->queryDesc, &desc); @@ -5770,3 +5770,40 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } return 0; } + +int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { + if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; + return 0; +} + +int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) { + if (tDecodeI64(pDecoder, &pReq->uid) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1; + return 0; +} + +int32_t tEncodeSBatchDeleteReq(SEncoder *pEncoder, const SBatchDeleteReq *pReq) { + if (tEncodeI64(pEncoder, pReq->suid) < 0) return -1; + int32_t sz = taosArrayGetSize(pReq->deleteReqs); + if (tEncodeI32(pEncoder, sz) < 0) return -1; + for (int32_t i = 0; i < sz; i++) { + SSingleDeleteReq *pOneReq = taosArrayGet(pReq->deleteReqs, i); + if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) { + if (tDecodeI64(pDecoder, &pReq->suid) < 0) return -1; + int32_t sz; + if (tDecodeI32(pDecoder, &sz) < 0) return -1; + pReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); + if (pReq->deleteReqs == NULL) return -1; + for (int32_t i = 0; i < sz; i++) { + SSingleDeleteReq deleteReq; + if (tDecodeSSingleDeleteReq(pDecoder, &deleteReq) < 0) return -1; + taosArrayPush(pReq->deleteReqs, &deleteReq); + } + return 0; +} diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 1b799b1e5e39a5119b868ef273e5764c0418c892..d13daffa0879853db127dde916d672d357ee9ed4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -364,6 +364,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b172b00fab2dd40a3303a85a564e80eee6bb9f29..b4af39e46788f97594dc481f7895e180a8612812 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -943,12 +943,17 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&targetDB, false); - char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - tNameFromString(&n, pStream->targetSTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - strcpy(&targetSTB[VARSTR_HEADER_SIZE], tNameGetTableName(&n)); - varDataSetLen(targetSTB, strlen(varDataVal(targetSTB))); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&targetSTB, false); + if (pStream->targetSTbName[0] == 0) { + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, NULL, true); + } else { + char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + tNameFromString(&n, pStream->targetSTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + strcpy(&targetSTB[VARSTR_HEADER_SIZE], tNameGetTableName(&n)); + varDataSetLen(targetSTB, strlen(varDataVal(targetSTB))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&targetSTB, false); + } pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7b22e061f0c246ac9326a7d9da16d2a071022084..ffc19667331a3ba563c5eb0a415220b49646d10c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -172,7 +172,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list); SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, - const char* stbFullName, int32_t vgId); + const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 7f69acc8895df7c921126b368a0cbb092d58daec..8da397f0c3ed65d77ecd3b17c6d4e8582890e004 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -119,7 +119,7 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { SName stbFullName = {0}; tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); SVCreateStbReq pReq = {0}; - pReq.name = (char*)tNameGetTableName(&stbFullName); + pReq.name = (char *)tNameGetTableName(&stbFullName); pReq.suid = pCfg->dstTbUid; pReq.schemaRow = pCfg->schemaRow; pReq.schemaTag = pCfg->schemaTag; @@ -200,8 +200,9 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { goto _err; } + SBatchDeleteReq deleteReq; SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid, - pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId); + pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq); if (!pSubmitReq) { smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), @@ -230,4 +231,4 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { _err: tdUnRefSmaStat(pSma, pStat); return TSDB_CODE_FAILED; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 6b4d5a2f3ebb4f272180c3165ddd16c055b39364..2b3283a395e0f3ee6fe32b57e49ff95ae3e64260 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -16,7 +16,7 @@ #include "tq.h" SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, - const char* stbFullName, int32_t vgId) { + const char* stbFullName, int32_t vgId, SBatchDeleteReq* deleteReq) { SSubmitReq* ret = NULL; SArray* schemaReqs = NULL; SArray* schemaReqSz = NULL; @@ -33,10 +33,13 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo schemaReqSz = taosArrayInit(sz, sizeof(int32_t)); for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.groupId, + if (pDataBlock->info.type == STREAM_DELETE_DATA) { + // + } + STagVal tagVal = { + .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.groupId, }; STag* pTag = NULL; taosArrayClear(tagArray); @@ -176,17 +179,45 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo } void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { - const SArray* pRes = (const SArray*)data; - SVnode* pVnode = (SVnode*)vnode; + const SArray* pRes = (const SArray*)data; + SVnode* pVnode = (SVnode*)vnode; + SBatchDeleteReq deleteReq = {0}; tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size); ASSERT(pTask->tbSink.pTSchema); + deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, - pTask->tbSink.stbFullName, pVnode->config.vgId); + pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); + int32_t code; + int32_t len; + tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); + if (code < 0) { + // + ASSERT(0); + } + SEncoder encoder; + void* buf = taosMemoryCalloc(1, len + sizeof(SMsgHead)); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, len); + tEncodeSBatchDeleteReq(&encoder, &deleteReq); + tEncoderClear(&encoder); + + if (taosArrayGetSize(deleteReq.deleteReqs) != 0) { + SRpcMsg msg = { + .msgType = TDMT_VND_BATCH_DEL, + .pCont = buf, + .contLen = len + sizeof(SMsgHead), + }; + if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + tqDebug("failed to put into write-queue since %s", terrstr()); + } + } + taosArrayDestroy(deleteReq.deleteReqs); + /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/ // build write msg SRpcMsg msg = { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e4e1e608e5e1a3893f463435925a65dbf9c6bb33..1f13bde0c19970995f0afd21adc470fa4ac3ac27 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -29,6 +29,7 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -190,6 +191,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp case TDMT_VND_DELETE: if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; + case TDMT_VND_BATCH_DEL: + if (vnodeProcessBatchDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + break; /* TQ */ case TDMT_VND_MQ_VG_CHANGE: if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), @@ -1053,6 +1057,23 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void return 0; } +static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + SBatchDeleteReq deleteReq; + SDecoder decoder; + tDecoderInit(&decoder, pReq, len); + tDecodeSBatchDeleteReq(&decoder, &deleteReq); + + int32_t sz = taosArrayGetSize(deleteReq.deleteReqs); + for (int32_t i = 0; i < sz; i++) { + SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i); + int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts); + if (code) { + // TODO + } + } + return 0; +} + static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; SDecoder *pCoder = &(SDecoder){0};