diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 871dd6074185569135cef7b36f8a957b693cb990..3489b359cbae646ae561352b48a85c86bdb1f947 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -216,6 +216,7 @@ int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); +void qStreamCloseTsdbReader(void* task); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d0017e4ba227d197d72a88a487bac26e4868c3f5..d0c3233488396d5b997ec1c3b231453c6baed088 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -725,6 +725,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; + tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); + taosWLockLatch(&pTq->pushLock); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { @@ -791,6 +793,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock + + tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { @@ -881,6 +886,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); taosMemoryFree(req.qmsg); + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + qStreamCloseTsdbReader(pHandle->execHandle.task); + } if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { } // close handle diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0a2802ba375cd3a3efa186ad3bc3ba852a83164b..a40e29ba090358eb260619bc2406ea8b75a42717 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1207,3 +1207,4 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { rpcFreeCont(pMsg->pCont); destroySendMsgInfo(pSendInfo); } + diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 149efef884b10149bf005407e150380f9f1f0979..680d08fa19b4b13717de940802615b68862e1a81 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -604,9 +604,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB } } -bool isTaskKilled(SExecTaskInfo* pTaskInfo) { - return (0 != pTaskInfo->code) ? true : false; -} +bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code) ? true : false; } void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } @@ -1349,7 +1347,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan } } -static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) { +static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { if (!tsCountAlwaysReturnValue) { return TSDB_CODE_SUCCESS; } @@ -1357,12 +1355,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && - ((STableScanInfo *)downstream->info)->hasGroupByTag == true)) { + ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) { return TSDB_CODE_SUCCESS; } SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; - bool hasCountFunc = false; + bool hasCountFunc = false; for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; @@ -1411,7 +1409,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc return TSDB_CODE_SUCCESS; } -static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) { +static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { if (!blockAllocated) { return; } @@ -1437,8 +1435,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; - bool hasValidBlock = false; - bool blockAllocated = false; + bool hasValidBlock = false; + bool blockAllocated = false; while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -1481,7 +1479,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - } // the downstream operator may return with error code, so let's check the code before generating results. @@ -1636,7 +1633,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) { } int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, - const char* pkey) { + const char* pkey) { int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1759,7 +1756,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, + optrDefaultBufFn, NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; @@ -2608,3 +2606,22 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; } + +void qStreamCloseTsdbReader(void* task) { + if (task == NULL) return; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task; + SOperatorInfo* pOp = pTaskInfo->pRoot; + pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0}; + while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) { + SOperatorInfo* pDownstreamOp = pOp->pDownstream[0]; + if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pInfo = pDownstreamOp->info; + if (pInfo->pTableScanOp) { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; + return; + } + } + } +}