diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 25fd1a5e79d5e48dc740e6b33d372bffc283d367..dc6deb162cf7051596ccc3bc8e01da79e688d135 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -65,7 +65,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); * @return */ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, - SSchemaWrapper** pSchemaWrapper, int64_t* ntbUid); + SSchemaWrapper** pSchemaWrapper); /** * Set the input data block for the stream scan. diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 744a5d7eace05858a6ad532e6f7eda1bf821b1a9..b063e552f683536736546d7a5e2f7df04ec303ec 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -110,9 +110,6 @@ typedef struct { // exec STqExecHandle execHandle; - // prevent drop - int64_t ntbUid; - SArray* colIdList; // SArray } STqHandle; struct STQ { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8c1f13d2a8283d138bae10c0c3c277b4b73eb492..9f80bc50a4e4e6bf9191741aa65c0338ab4011b4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -272,6 +272,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = 0; STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal fetchOffsetNew; + SWalCkHead* pCkHead = NULL; // 1.find handle STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); @@ -461,6 +462,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } OVER: + if (pCkHead) taosMemoryFree(pCkHead); // TODO wrap in destroy func taosArrayDestroy(dataRsp.blockDataLen); taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree); @@ -542,7 +544,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { }; pHandle->execHandle.execCol.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, - &pHandle->execHandle.pSchemaWrapper, &pHandle->ntbUid); + &pHandle->execHandle.pSchemaWrapper); ASSERT(pHandle->execHandle.execCol.task); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 4e2750d9f087087b1223f088e508b63d385c2c42..d8851c37756cea6fe61ba330b29bd9efe89aac37 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -201,10 +201,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->msgIter.uid; if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + blockDataFreeRes(&block); continue; } } tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); + blockDataFreeRes(&block); tqAddBlockSchemaToRsp(pExec, pRsp); pRsp->blockNum++; } @@ -220,10 +222,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->msgIter.uid; if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + blockDataFreeRes(&block); continue; } } tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); + blockDataFreeRes(&block); tqAddBlockSchemaToRsp(pExec, pRsp); pRsp->blockNum++; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 4ee87e32031a0e975c77d73bc6757c9f8f1b6eda..86b26339bc939ce752729b71a7585599dd2794f9 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -89,9 +89,8 @@ int32_t tqMetaOpen(STQ* pTq) { .version = handle.snapshotVer, }; - handle.execHandle.execCol.task = - qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, - &handle.execHandle.pSchemaWrapper, &handle.ntbUid); + handle.execHandle.execCol.task = qCreateQueueExecTaskInfo( + handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); ASSERT(handle.execHandle.execCol.task); void* scanner = NULL; qExtractStreamScanner(handle.execHandle.execCol.task, &scanner); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 17842615c497b5b23d3ec82dda80fc47f0c36cf7..821b48275ac79701059dd1b815e619892f56708b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -340,29 +340,30 @@ FAIL: void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } -int tqReaderSetTbUidList(STqReader* pHandle, const SArray* tbUidList) { - if (pHandle->tbIdHash) { - taosHashClear(pHandle->tbIdHash); +int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { + if (pReader->tbIdHash) { + taosHashClear(pReader->tbIdHash); + } else { + pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); } - pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (pHandle->tbIdHash == NULL) { + if (pReader->tbIdHash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); - taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0); } return 0; } -int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) { - if (pHandle->tbIdHash == NULL) { - pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (pHandle->tbIdHash == NULL) { +int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) { + if (pReader->tbIdHash == NULL) { + pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pReader->tbIdHash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -370,18 +371,18 @@ int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) { for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); - taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0); } return 0; } -int tqReaderRemoveTbUidList(STqReader* pHandle, const SArray* tbUidList) { - ASSERT(pHandle->tbIdHash != NULL); +int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) { + ASSERT(pReader->tbIdHash != NULL); for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); - taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t)); + taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)); } return 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4c78dfeda4c67e8aceeecc44a81e068fd295783a..21068c68a4882c8e7f5591d9c2ec8d4477946480 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -157,7 +157,6 @@ typedef struct { SQueryTableDataCond tableCond; int64_t recoverStartVer; int64_t recoverEndVer; - int64_t ntbUid; } SStreamTaskInfo; typedef struct { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e33e115ec92aa6360a7c5ee65aa733cd7436f889..c5aa90e0eb1947eb26cc4f3ac18340577ec5841d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -121,7 +121,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO } qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, - SSchemaWrapper** pSchemaWrapper, int64_t* ntbUid) { + SSchemaWrapper** pSchemaWrapper) { if (msg == NULL) { // TODO create raw scan return NULL; @@ -156,7 +156,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n } *pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw); - *ntbUid = ((SExecTaskInfo*)pTaskInfo)->streamInfo.ntbUid; return pTaskInfo; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1e10691eb97e43f3d233acedd1ccef764b70c1a7..e444a6ec164d5b85ddc42e8195d5eac020b169d5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4165,9 +4165,6 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; } else { pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { - pTaskInfo->streamInfo.ntbUid = mr.me.uid; - } } metaReaderClear(&mr); @@ -4443,10 +4440,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - STimeWindowAggSupp aggSup = (STimeWindowAggSupp){ - .waterMark = pTableScanNode->watermark, - .calTrigger = pTableScanNode->triggerType, - .maxTs = INT64_MIN, + STimeWindowAggSupp aggSup = (STimeWindowAggSupp){ + .waterMark = pTableScanNode->watermark, + .calTrigger = pTableScanNode->triggerType, + .maxTs = INT64_MIN, }; if (pHandle->vnode) {