diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2df27a432c4fca8191d753705895f712a715ccca..14d58ef58c89c4f45a7b3fb178532606c93f2cf2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3274,10 +3274,15 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag); #define TSDB_MSG_FLG_DECODE 0x2 typedef struct { - void* msgStr; - int32_t msgLen; - int64_t ver; -} SPackedSubmit; + union { + struct { + void* msgStr; + int32_t msgLen; + int64_t ver; + }; + void* pDataBlock; + }; +} SPackedData; #pragma pack(pop) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 063c5bd4edfa61cdadfc89eea79c1bac3e95b5c2..4d9c0866c31c7ee5c11c58754c399d273796e0af 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -192,7 +192,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver); // -int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit); +int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8721f1ba4127c0cf14caffc2635e1857271998f9..a2a512fb0713287edae2c52eb2ff7250e15c6f03 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -120,10 +120,10 @@ typedef struct { #endif typedef struct { - int8_t type; - int64_t ver; - int32_t* dataRef; - SPackedSubmit submit; + int8_t type; + int64_t ver; + int32_t* dataRef; + SPackedData submit; } SStreamDataSubmit2; typedef struct { @@ -235,7 +235,7 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { } } -SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit); +SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit); void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 130cf9999a7442c108e40c7dfc9da417a88e5eae..83392049b8544f27d4033e8cb4f1972e5364d19c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -226,8 +226,8 @@ typedef struct STqReader { // SSubmitMsgIter msgIter; // SSubmitBlkIter blkIter; - int64_t ver; - SPackedSubmit msg2; + int64_t ver; + SPackedData msg2; int8_t setMsg; SSubmitReq2 submit; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 817c6dc596ba777cfa052892856fbe6a38dfaa0c..0ef8e8bac592490dd1f8e48276b32b6effe52095 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -153,7 +153,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); // tqExec -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedSubmit submit, STaosxRsp* pRsp); +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); // int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e1e3366c031ef694c704b0add58fe533c6baef02..bf028b0e7630bca77bac6d7e74fe143d12e8f282 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -180,7 +180,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit); +int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 69a13c1fd20a64a9370502d0b19222a40bb4acee..cb1069d25681814b32ab975c879357e147456fe3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -673,7 +673,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); if (pHead->msgType == TDMT_VND_SUBMIT) { - SPackedSubmit submit = { + SPackedData submit = { .msgStr = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)), .msgLen = pHead->bodyLen - sizeof(SMsgHead), .ver = pHead->version, @@ -1332,7 +1332,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { return 0; } -int32_t tqProcessSubmitReq(STQ* pTq, SPackedSubmit submit) { +int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { void* pIter = NULL; bool failed = false; SStreamDataSubmit2* pSubmit = NULL; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index c0a13f47d0b596dba92c7e8023df49e0dc5f9bd5..1074e2f6d5abaac724bfeba6e7d9f33ffe248a35 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -215,7 +215,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta return 0; } -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedSubmit submit, STaosxRsp* pRsp) { +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp) { STqExecHandle* pExec = &pHandle->execHandle; ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 3d4419a930fed826e9992654f55bb496cecbdbc9..948e037c97427f5c35cf956604a91eb47f1aac44 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -258,7 +258,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) SMqDataRsp* pRsp = &pPushEntry->dataRsp; // prepare scan mem data - SPackedSubmit submit = { + SPackedData submit = { .msgStr = data, .msgLen = len, .ver = ver, @@ -324,7 +324,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) return -1; } memcpy(data, pReq, len); - SPackedSubmit submit = { + SPackedData submit = { .msgStr = data, .msgLen = len, .ver = ver, diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fe78507a0ba749b44e28b2101a14cc75d06304a1..3d3a08c8d04b6f36c4dccdafcc36619e8d6058f9 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -133,7 +133,7 @@ typedef struct { int64_t snapshotVer; // const SSubmitReq* pReq; - SPackedSubmit submit; + SPackedData submit; SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 64bb07bc6de079bcd294df6bcba25559d2ff2408..0c6ecaf34339f95570773b67ab4c602dfb8d12ff 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -115,7 +115,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu if (type == STREAM_INPUT__MERGED_SUBMIT) { // ASSERT(numOfBlocks > 1); for (int32_t i = 0; i < numOfBlocks; i++) { - SPackedSubmit* pReq = POINTER_SHIFT(input, i * sizeof(SPackedSubmit)); + SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); taosArrayPush(pInfo->pBlockLists, pReq); } pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; @@ -125,9 +125,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { - SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - SPackedSubmit tmp = { - .msgStr = pDataBlock, + SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + SPackedData tmp = { + .pDataBlock = pDataBlock, }; taosArrayPush(pInfo->pBlockLists, &tmp); } @@ -1016,7 +1016,7 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc } #endif -int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedSubmit submit) { +int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); ASSERT(pTaskInfo->streamInfo.submit.msgStr == NULL); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 40ba8572a8348f1e42a31b4bd2532d154182eec2..0b198afb0d4c8b0fd654521d38d2686ec602752d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1532,10 +1532,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ /*void* msgStr = pTaskInfo->streamInfo.*/ - SPackedSubmit submit = pTaskInfo->streamInfo.submit; + SPackedData submit = pTaskInfo->streamInfo.submit; if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { qError("submit msg messed up when initing stream submit block %p", submit.msgStr); - pInfo->tqReader->msg2 = (SPackedSubmit){0}; + pInfo->tqReader->msg2 = (SPackedData){0}; pInfo->tqReader->setMsg = 0; ASSERT(0); } @@ -1560,9 +1560,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } } - pInfo->tqReader->msg2 = (SPackedSubmit){0}; + pInfo->tqReader->msg2 = (SPackedData){0}; pInfo->tqReader->setMsg = 0; - pTaskInfo->streamInfo.submit = (SPackedSubmit){0}; + pTaskInfo->streamInfo.submit = (SPackedData){0}; return NULL; } @@ -1791,7 +1791,8 @@ FETCH_NEXT_BLOCK: } int32_t current = pInfo->validBlockIndex++; - SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); + SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); + SSDataBlock* pBlock = pPacked->pDataBlock; if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName); } @@ -1928,8 +1929,8 @@ FETCH_NEXT_BLOCK: return NULL; } - int32_t current = pInfo->validBlockIndex++; - SPackedSubmit* pSubmit = taosArrayGet(pInfo->pBlockLists, current); + int32_t current = pInfo->validBlockIndex++; + SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, @@ -2264,7 +2265,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } } - pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedSubmit)); + pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData)); if (pInfo->pBlockLists == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _error; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 68e8473c6d1da7a13a487896697d28b3fd7d30cb..8321235605eaf60c3e978d8b0cfbe1e506406368 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -66,7 +66,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock return 0; } -SStreamDataSubmit2* streamDataSubmitNew(SPackedSubmit submit) { +SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) { SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM); if (pDataSubmit == NULL) return NULL; pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); @@ -83,7 +83,7 @@ FAIL: SStreamMergedSubmit2* streamMergedSubmitNew() { SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM); if (pMerged == NULL) return NULL; - pMerged->submits = taosArrayInit(0, sizeof(SPackedSubmit)); + pMerged->submits = taosArrayInit(0, sizeof(SPackedData)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); if (pMerged->dataRefs == NULL || pMerged->submits == NULL) goto FAIL; pMerged->type = STREAM_INPUT__MERGED_SUBMIT; @@ -172,7 +172,7 @@ void streamFreeQitem(SStreamQueueItem* data) { int32_t ref = atomic_sub_fetch_32(pRef, 1); ASSERT(ref >= 0); if (ref == 0) { - SPackedSubmit* pSubmit = (SPackedSubmit*)taosArrayGet(pMerge->submits, i); + SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i); taosMemoryFree(pSubmit->msgStr); taosMemoryFree(pRef); }