diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c5b0b89311a04fd69c35fff6973ad40edda9cd94..a39bd365fefa6538eddc2d3a8b6cf09f72ac3602 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2972,6 +2972,17 @@ typedef struct { int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq); int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq); +typedef struct SDeleteRes { + uint64_t suid; + SArray* uidList; + int64_t skey; + int64_t ekey; + int64_t affectedRows; +} SDeleteRes; + +int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); +int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 0c769e78114e1d5a25f37606c33692d8ba0112fb..71f7622f49f5c3ba376d24af03cc9d21604d1d35 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -20,9 +20,9 @@ extern "C" { #endif +#include "executor.h" #include "tmsgcb.h" #include "trpc.h" -#include "executor.h" enum { NODE_TYPE_VNODE = 1, @@ -31,14 +31,6 @@ enum { NODE_TYPE_MNODE, }; -typedef struct SDeleteRes { - uint64_t suid; - SArray* uidList; - int64_t skey; - int64_t ekey; - int64_t affectedRows; -} SDeleteRes; - typedef struct SQWorkerCfg { uint32_t maxSchedulerNum; uint32_t maxTaskNum; @@ -47,19 +39,19 @@ typedef struct SQWorkerCfg { typedef struct { uint64_t cacheDataSize; - + uint64_t queryProcessed; uint64_t cqueryProcessed; uint64_t fetchProcessed; uint64_t dropProcessed; uint64_t hbProcessed; uint64_t deleteProcessed; - + uint64_t numOfQueryInQueue; uint64_t numOfFetchInQueue; uint64_t timeInQueryQueue; uint64_t timeInFetchQueue; - + uint64_t numOfErrors; } SQWorkerStat; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e9b5c67d7664137b073d079f28a44568aeb9e32e..971c083c010f4e64b80e5bbb843d6c3a475c9d20 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2300,7 +2300,6 @@ int32_t tDeserializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp return 0; } - int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -2387,7 +2386,6 @@ int32_t tDeserializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp void tFreeSDnodeListRsp(SDnodeListRsp *pRsp) { taosArrayDestroy(pRsp->dnodeList); } - int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -2909,20 +2907,19 @@ int32_t tDeserializeSShowVariablesReq(void *buf, int32_t bufLen, SShowVariablesR return 0; } -int32_t tEncodeSVariablesInfo(SEncoder* pEncoder, SVariablesInfo* pInfo) { +int32_t tEncodeSVariablesInfo(SEncoder *pEncoder, SVariablesInfo *pInfo) { if (tEncodeCStr(pEncoder, pInfo->name) < 0) return -1; if (tEncodeCStr(pEncoder, pInfo->value) < 0) return -1; return 0; } -int32_t tDecodeSVariablesInfo(SDecoder* pDecoder, SVariablesInfo* pInfo) { +int32_t tDecodeSVariablesInfo(SDecoder *pDecoder, SVariablesInfo *pInfo) { if (tDecodeCStrTo(pDecoder, pInfo->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pInfo->value) < 0) return -1; return 0; } - -int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) { +int32_t tSerializeSShowVariablesRsp(void *buf, int32_t bufLen, SShowVariablesRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -2930,7 +2927,7 @@ int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp int32_t varNum = taosArrayGetSize(pRsp->variables); if (tEncodeI32(&encoder, varNum) < 0) return -1; for (int32_t i = 0; i < varNum; ++i) { - SVariablesInfo* pInfo = taosArrayGet(pRsp->variables, i); + SVariablesInfo *pInfo = taosArrayGet(pRsp->variables, i); if (tEncodeSVariablesInfo(&encoder, pInfo) < 0) return -1; } tEndEncode(&encoder); @@ -2940,7 +2937,7 @@ int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp return tlen; } -int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) { +int32_t tDeserializeSShowVariablesRsp(void *buf, int32_t bufLen, SShowVariablesRsp *pRsp) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -2962,11 +2959,11 @@ int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesR return 0; } -void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp) { +void tFreeSShowVariablesRsp(SShowVariablesRsp *pRsp) { if (NULL == pRsp) { return; } - + taosArrayDestroy(pRsp->variables); } @@ -5387,3 +5384,35 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { if (tDecodeCStrTo(pDecoder, pOffset->subKey) < 0) return -1; return 0; } + +int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { + int32_t nUid = taosArrayGetSize(pRes->uidList); + + if (tEncodeU64(pCoder, pRes->suid) < 0) return -1; + if (tEncodeI32v(pCoder, nUid) < 0) return -1; + for (int32_t iUid = 0; iUid < nUid; iUid++) { + if (tEncodeU64(pCoder, *(uint64_t *)taosArrayGet(pRes->uidList, iUid)) < 0) return -1; + } + if (tEncodeI64(pCoder, pRes->skey) < 0) return -1; + if (tEncodeI64(pCoder, pRes->ekey) < 0) return -1; + if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1; + + return 0; +} + +int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { + int32_t nUid; + uint64_t uid; + + if (tDecodeU64(pCoder, &pRes->suid) < 0) return -1; + if (tDecodeI32v(pCoder, &nUid) < 0) return -1; + for (int32_t iUid = 0; iUid < nUid; iUid++) { + if (tDecodeU64(pCoder, &uid) < 0) return -1; + taosArrayPush(pRes->uidList, &uid); + } + if (tDecodeI64(pCoder, &pRes->skey) < 0) return -1; + if (tDecodeI64(pCoder, &pRes->ekey) < 0) return -1; + if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1; + + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4ac10302691a6d59044a7dccbc55fcbae818a6fe..34b596e34a36d11378749770d6af3c84d4696c8f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -25,8 +25,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -93,11 +93,44 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { } } break; + case TDMT_VND_DELETE: { + int32_t size; + int32_t ret; + uint8_t *pCont; + SEncoder *pCoder = &(SEncoder){0}; + SDeleteRes res = {0}; + SReadHandle handle = { + .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + + code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); + if (code) goto _err; + + // malloc and encode + tEncodeSize(tEncodeDeleteRes, &res, size, ret); + pCont = rpcMallocCont(size + sizeof(SMsgHead)); + + ((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead)); + ((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode)); + + tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size); + tEncodeDeleteRes(pCoder, &res); + tEncoderClear(pCoder); + + rpcFreeCont(pMsg->pCont); + pMsg->pCont = pCont; + pMsg->contLen = size + sizeof(SMsgHead); + + taosArrayDestroy(res.uidList); + } break; default: break; } return code; + +_err: + vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code)); + return code; } int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { @@ -146,7 +179,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; break; case TDMT_VND_DELETE: - if (vnodeProcessWriteMsg(pVnode, version, pMsg, pRsp) < 0) goto _err; + if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; /* TQ */ case TDMT_VND_MQ_VG_CHANGE: @@ -279,22 +312,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { } } -int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) { - vTrace("message in write queue is processing"); - char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SDeleteRes res = {0}; - SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; - - switch (pMsg->msgType) { - case TDMT_VND_DELETE: - return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); - default: - vError("unknown msg type:%d in write queue", pMsg->msgType); - return TSDB_CODE_VND_APP_ERROR; - } -} - // TODO: remove the function void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO @@ -854,3 +871,31 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo // 3. reload sync return 0; } + +static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + int32_t code = 0; + SDecoder *pCoder = &(SDecoder){0}; + SDeleteRes *pRes = &(SDeleteRes){0}; + + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); + if (pRes->uidList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + tDecoderInit(pCoder, pReq, len); + tDecodeDeleteRes(pCoder, pRes); + + for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) { + code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid), + pRes->skey, pRes->ekey); + if (code) goto _err; + } + + tDecoderClear(pCoder); + taosArrayDestroy(pRes->uidList); + return code; + +_err: + return code; +} \ No newline at end of file