From 9e7bbf622a84697b8ecf9ee29a3c8e134803342b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 7 Dec 2022 17:54:09 +0800 Subject: [PATCH] refactor: support submitreq2 --- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/dnode/vnode/src/tq/tqRead.c | 6 ++++-- source/libs/executor/inc/executorimpl.h | 13 ++++++------- source/libs/executor/src/projectoperator.c | 19 +++++++++++-------- source/libs/executor/src/scanoperator.c | 7 +++---- 5 files changed, 26 insertions(+), 23 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 330018d619..69a13c1fd2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -674,8 +674,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHead->msgType == TDMT_VND_SUBMIT) { SPackedSubmit submit = { - .msgStr = pHead->body, - .msgLen = pHead->bodyLen, + .msgStr = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)), + .msgLen = pHead->bodyLen - sizeof(SMsgHead), .ver = pHead->version, }; if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 747206d71e..d0f938563b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -320,8 +320,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ASSERT(ret->offset.version >= 0); return -1; } - void* body = pReader->pWalReader->pHead->head.body; - int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen; + void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SMsgHead)); + int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SMsgHead); int64_t ver = pReader->pWalReader->pHead->head.version; #if 0 if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { @@ -383,9 +383,11 @@ int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, ASSERT(pReader->msg2.msgStr == NULL); ASSERT(msgStr); ASSERT(msgLen); + ASSERT(ver >= 0); pReader->msg2.msgStr = msgStr; pReader->msg2.msgLen = msgLen; pReader->msg2.ver = ver; + pReader->ver = ver; tqDebug("tq reader set msg %p", msgStr); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 7417e4e657..fe78507a0b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -126,13 +126,12 @@ enum { typedef struct { // TODO remove prepareStatus - STqOffsetVal prepareStatus; // for tmq - STqOffsetVal lastStatus; // for tmq - SMqMetaRsp metaRsp; // for tmq fetching meta - int8_t returned; - int64_t snapshotVer; - const SSubmitReq* pReq; - int64_t scanVer; + STqOffsetVal prepareStatus; // for tmq + STqOffsetVal lastStatus; // for tmq + SMqMetaRsp metaRsp; // for tmq fetching meta + int8_t returned; + int64_t snapshotVer; + // const SSubmitReq* pReq; SPackedSubmit submit; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 65bb40b195..2be6508272 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "filter.h" #include "executorimpl.h" +#include "filter.h" #include "functionMgt.h" typedef struct SProjectOperatorInfo { @@ -90,7 +90,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->binfo.pRes = pResBlock; pInfo->pFinalRes = createOneDataBlock(pResBlock, false); - pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM)? false:pProjPhyNode->mergeDataBlock; + pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? false : pProjPhyNode->mergeDataBlock; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -117,9 +117,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); - setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, - destroyProjectOperatorInfo, optrDefaultBufFn, NULL); + setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo, + optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -220,7 +221,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { blockDataCleanup(pFinalRes); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - if (pTaskInfo->streamInfo.pReq) { + if (pTaskInfo->streamInfo.submit.msgStr) { pOperator->status = OP_OPENED; } @@ -414,8 +415,10 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pInfo->binfo.pRes = pResBlock; pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); - setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, optrDefaultBufFn, NULL); + setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, + optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e04fe0c30a..48a6c87f19 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1525,10 +1525,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("queue scan called"); - if (pTaskInfo->streamInfo.pReq != NULL) { + if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pInfo->tqReader->msg2.msgStr == NULL) { /*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/ - pInfo->tqReader->ver = pTaskInfo->streamInfo.scanVer; /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ @@ -1538,7 +1537,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qError("submit msg messed up when initing stream submit block %p", submit.msgStr); pInfo->tqReader->msg2 = (SPackedSubmit){0}; pInfo->tqReader->setMsg = 0; - pTaskInfo->streamInfo.pReq = NULL; ASSERT(0); } } @@ -1562,7 +1560,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } } - pTaskInfo->streamInfo.pReq = NULL; + pInfo->tqReader->msg2 = (SPackedSubmit){0}; + pInfo->tqReader->setMsg = 0; return NULL; } -- GitLab