From f1ee1abd7b8566c685633619f455906c1438384b Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sun, 5 Jun 2022 20:33:21 +0800 Subject: [PATCH] delete data --- include/common/tmsg.h | 21 ++--- include/libs/qworker/qworker.h | 1 + source/common/src/tmsg.c | 79 ++++++++++++------ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/qnode/src/qnode.c | 1 + source/dnode/vnode/src/vnd/vnodeSvr.c | 17 ++++ source/libs/monitor/src/monMsg.c | 2 + source/libs/qworker/inc/qwInt.h | 2 + source/libs/qworker/src/qwMsg.c | 41 ++++++++-- source/libs/qworker/src/qwUtil.c | 5 +- source/libs/qworker/src/qworker.c | 91 ++++++++++++++++++++- source/libs/scheduler/src/schRemote.c | 40 +++++++++ 12 files changed, 253 insertions(+), 48 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 39dc5361e6..5019dcc51c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -933,6 +933,7 @@ typedef struct { int64_t numOfProcessedFetch; int64_t numOfProcessedDrop; int64_t numOfProcessedHb; + int64_t numOfProcessedDelete; int64_t cacheDataSize; int64_t numOfQueryInQueue; int64_t numOfFetchInQueue; @@ -2689,20 +2690,20 @@ int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq); int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq); typedef struct { - int64_t delUid; - int64_t tbUid; // super/child/normal table - int8_t type; // table type - int16_t nWnds; - char* tbFullName; - char* subPlan; - STimeWindow wnds[]; + SMsgHead header; + uint64_t sId; + uint64_t queryId; + uint64_t taskId; + uint32_t sqlLen; + uint32_t phyLen; + char* sql; + char* msg; } SVDeleteReq; -int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq); -int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq); +int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); +int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); typedef struct { - int32_t code; int64_t affectedRows; } SVDeleteRsp; diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index aa20082fe0..94ed3ace42 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -47,6 +47,7 @@ typedef struct { uint64_t fetchProcessed; uint64_t dropProcessed; uint64_t hbProcessed; + uint64_t deleteProcessed; uint64_t numOfQueryInQueue; uint64_t numOfFetchInQueue; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9c9f33ac96..26d18e910c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -933,6 +933,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDelete) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfQueryInQueue) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfFetchInQueue) < 0) return -1; @@ -1002,6 +1003,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDelete) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfQueryInQueue) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfFetchInQueue) < 0) return -1; @@ -3814,39 +3816,64 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq) return 0; } -int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) { - if (tStartEncode(pCoder) < 0) return -1; +int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } - if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1; - if (tEncodeI64(pCoder, pReq->tbUid) < 0) return -1; - if (tEncodeI8(pCoder, pReq->type) < 0) return -1; - if (tEncodeI16v(pCoder, pReq->nWnds) < 0) return -1; - if (tEncodeCStr(pCoder, pReq->tbFullName) < 0) return -1; - if (tEncodeCStr(pCoder, pReq->subPlan) < 0) return -1; - for (int16_t i = 0; i < pReq->nWnds; ++i) { - if (tEncodeI64(pCoder, pReq->wnds[i].skey) < 0) return -1; - if (tEncodeI64(pCoder, pReq->wnds[i].ekey) < 0) return -1; + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1; + if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; + if (tEncodeU32(&encoder, pReq->phyLen) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->msg) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); } - tEndEncode(pCoder); - return 0; + return tlen + headLen; } -int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) { - if (tStartDecode(pCoder) < 0) return -1; +int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { + int32_t headLen = sizeof(SMsgHead); - if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->tbUid) < 0) return -1; - if (tDecodeI8(pCoder, &pReq->type) < 0) return -1; - if (tDecodeI16v(pCoder, &pReq->nWnds) < 0) return -1; - if (tDecodeCStr(pCoder, &pReq->tbFullName) < 0) return -1; - if (tDecodeCStr(pCoder, &pReq->subPlan) < 0) return -1; - for (int16_t i = 0; i < pReq->nWnds; ++i) { - if (tDecodeI64(pCoder, &pReq->wnds[i].skey) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->wnds[i].ekey) < 0) return -1; - } + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; - tEndDecode(pCoder); + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; + if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1; + if (tDecodeU32(&decoder, &pReq->phyLen) < 0) return -1; + pReq->sql = taosMemoryCalloc(1, pReq->sqlLen + 1); + if (NULL == pReq->sql) return -1; + pReq->msg = taosMemoryCalloc(1, pReq->phyLen + 1); + if (NULL == pReq->msg) return -1; + if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->msg) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); return 0; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0819f79cf9..a811e3b997 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -358,6 +358,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 45b88318c4..ebaf73a952 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -59,6 +59,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { pLoad->numOfProcessedFetch = stat.fetchProcessed; pLoad->numOfProcessedDrop = stat.dropProcessed; pLoad->numOfProcessedHb = stat.hbProcessed; + pLoad->numOfProcessedDelete = stat.deleteProcessed; return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 537f2e0964..7f731cce50 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -23,6 +23,7 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, i static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg *pRsp); int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -141,6 +142,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg case TDMT_VND_SUBMIT: if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; break; + case TDMT_VND_DELETE: + if (vnodeProcessFetchMsg(pVnode, pMsg, pRsp) < 0) goto _err; + break; /* TQ */ case TDMT_VND_MQ_VG_CHANGE: if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), @@ -252,6 +256,19 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { } } +int vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { + vTrace("message in write queue is processing"); + char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + switch (pMsg->msgType) { + case TDMT_VND_DELETE: + return qWorkerProcessDeleteMsg(pVnode, pVnode->pQuery, pMsg, pRsp); + default: + vError("unknown msg type:%d in write queue", pMsg->msgType); + return TSDB_CODE_VND_APP_ERROR; + } +} + // TODO: remove the function void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO diff --git a/source/libs/monitor/src/monMsg.c b/source/libs/monitor/src/monMsg.c index 944a7b5475..a041b582a9 100644 --- a/source/libs/monitor/src/monMsg.c +++ b/source/libs/monitor/src/monMsg.c @@ -569,6 +569,7 @@ int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) { if (tEncodeI64(&encoder, pInfo->numOfProcessedFetch) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfProcessedDrop) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfProcessedHb) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->numOfProcessedDelete) < 0) return -1; if (tEncodeI64(&encoder, pInfo->cacheDataSize) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfQueryInQueue) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfFetchInQueue) < 0) return -1; @@ -591,6 +592,7 @@ int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) { if (tDecodeI64(&decoder, &pInfo->numOfProcessedFetch) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfProcessedDrop) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfProcessedHb) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->numOfProcessedDelete) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->cacheDataSize) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfQueryInQueue) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfFetchInQueue) < 0) return -1; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 082db6428f..1d31c86308 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -160,6 +160,7 @@ typedef struct SQWMsgStat { uint64_t cancelProcessed; uint64_t dropProcessed; uint64_t hbProcessed; + uint64_t deleteProcessed; } SQWMsgStat; typedef struct SQWRTStat { @@ -357,6 +358,7 @@ int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); void qwClearExpiredSch(SArray* pExpiredSch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); +void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 2f1ecc5172..d4bc7892e4 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -300,13 +300,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = msg->sId; - msg->queryId = msg->queryId; - msg->taskId = msg->taskId; - msg->refId = msg->refId; - msg->phyLen = msg->phyLen; - msg->sqlLen = msg->sqlLen; - uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; @@ -523,3 +516,37 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ return TSDB_CODE_SUCCESS; } + + +int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == pRsp) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t code = 0; + SVDeleteReq req = {0}; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + + QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1); + + tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req); + + uint64_t sId = req.sId; + uint64_t qId = req.queryId; + uint64_t tId = req.taskId; + int64_t rId = 0; + + SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info}; + QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql); + taosMemoryFreeClear(req.sql); + + QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRsp)); + + QW_SCH_TASK_DLOG("processDelete end, node:%p", node); + +_return: + + QW_RET(code); +} + + diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 8bfb80f061..4b881d2b4f 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -290,8 +290,9 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { QW_RET(code); } -void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { +void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); + ctx->ctrlConnInfo.handle = NULL; ctx->ctrlConnInfo.refId = -1; @@ -333,7 +334,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } - qwFreeTask(QW_FPARAMS(), &octx); + qwFreeTaskCtx(QW_FPARAMS(), &octx); QW_TASK_DLOG_E("task ctx dropped"); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 44a8fdf7f4..81a390f4c2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -183,7 +183,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) return TSDB_CODE_SUCCESS; } -int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { +int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { int32_t len = 0; SRetrieveTableRsp *rsp = NULL; bool queryEnd = false; @@ -242,6 +242,49 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void return TSDB_CODE_SUCCESS; } +int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg) { + int32_t len = 0; + SVDeleteRsp rsp = {0}; + bool queryEnd = false; + int32_t code = 0; + SOutputData output = {0}; + + dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); + + if (len <= 0 || len != sizeof(SVDeleteRsp)) { + QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + output.pData = taosMemoryCalloc(1, len); + if (NULL == output.pData) { + QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + code = dsGetDataBlock(ctx->sinkHandle, &output); + if (code) { + QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); + taosMemoryFree(output.pData); + QW_ERR_RET(code); + } + + rsp.affectedRows = *(int64_t*)output.pData; + + int32_t len; + int32_t ret = 0; + SEncoder coder = {0}; + tEncodeSize(tEncodeSVDeleteRsp, &rsp, len, ret); + void *msg = taosMemoryCalloc(1, len); + tEncoderInit(&coder, msg, len); + tEncodeSVDeleteRsp(&coder, &rsp); + tEncoderClear(&coder); + + *rspMsg = msg; + *dataLen = len; + + return TSDB_CODE_SUCCESS; +} + int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; @@ -547,7 +590,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; - QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus)); @@ -620,7 +663,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); SOutputData sOutput = {0}; - QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if (NULL == rsp) { ctx->dataConnInfo = qwMsg->connInfo; @@ -875,6 +918,47 @@ _return: qwRelease(refId); } +int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp) { + int32_t code = 0; + SSubplan *plan = NULL; + qTaskInfo_t pTaskInfo = NULL; + DataSinkHandle sinkHandle = NULL; + SQWTaskCtx ctx = {0}; + + code = qStringToSubplan(qwMsg->msg, &plan); + if (TSDB_CODE_SUCCESS != code) { + code = TSDB_CODE_INVALID_MSG; + QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_JRET(code); + } + + ctx->plan = plan; + + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); + if (code) { + QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_JRET(code); + } + + if (NULL == sinkHandle || NULL == pTaskInfo) { + QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + ctx->taskHandle = pTaskInfo; + ctx->sinkHandle = sinkHandle; + + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); + + QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, &pRsp->contLen, &pRsp->pCont)); + +_return: + + qwFreeTaskCtx(QW_FPARAMS(), &ctx); + + QW_RET(TSDB_CODE_SUCCESS); +} + int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) { @@ -1007,6 +1091,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed); pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed); pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed); + pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed); pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE); pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index bf51d8d631..0e9e714d7d 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -227,6 +227,25 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } + case TDMT_VND_DELETE_RSP: { + SCH_ERR_JRET(rspCode); + + if (msg) { + SDecoder coder = {0}; + SVDeleteRsp rsp = {0}; + tDecoderInit(&coder, msg, msgSize); + tDecodeSVDeleteRsp(&coder, &rsp); + + atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); + SCH_TASK_DLOG("delete succeed, affectedRows:%d", rsp->affectedRows); + } + + taosMemoryFreeClear(msg); + + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + + break; + } case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; @@ -982,6 +1001,27 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, break; } + case TDMT_VND_DELETE: { + SVDeleteReq req = {0}; + req.sId = schMgmt.sId; + req.queryId = pJob->queryId; + req.taskId = pTask->taskId; + req.phyLen = pTask->msgLen; + req.sqlLen = strlen(pJob->sql); + req.sql = pJob->sql; + req.msg = pTask->msg; + int32_t len = tSerializeSVDeleteReq(NULL, 0, &req); + msg = taosMemoryCalloc(1, len); + if (NULL == msg) { + SCH_TASK_ELOG("calloc %d failed", len); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + tSerializeSVDeleteReq(msg, len, &req); + SVDeleteReq *pMsg = msg; + pMsg->header.vgId = htonl(addr->nodeId); + break; + } case TDMT_VND_QUERY: { SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx)); -- GitLab