diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e074a2d8a680261131bd00430238e5f57f81f05c..41b43026eadd1055eec8f8e35309ed5fcf37eee8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -358,9 +358,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } + // 2. check re-balance status + if (pHandle->consumerId != consumerId) { + tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, + consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); + terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + taosWUnLockLatch(&pTq->lock); + return -1; + } + bool exec = tqIsHandleExec(pHandle); if(!exec) { tqSetHandleExec(pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); taosWUnLockLatch(&pTq->lock); break; } @@ -370,17 +380,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(10); } - // 2. check re-balance status - if (pHandle->consumerId != consumerId) { - tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, - consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); - terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; - code = -1; - goto end; - } - - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); - // 3. update the epoch value int32_t savedEpoch = pHandle->epoch; if (savedEpoch < reqEpoch) { @@ -396,7 +395,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); -end: tqSetHandleIdle(pHandle); tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); return code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7d3165237eb56248bc76e6939f6b9639d5a6b419..638662aeb57aa8804f0ff15ed6d07890d651ab2e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1665,37 +1665,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("start to exec queue scan, %s", id); -#if 0 - if (pTaskInfo->streamInfo.submit.msgStr != NULL) { - if (pInfo->tqReader->msg.msgStr == NULL) { - SPackedData submit = pTaskInfo->streamInfo.submit; - if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { - qError("submit msg messed up when initing stream submit block %p", submit.msgStr); - return NULL; - } - } - - blockDataCleanup(pInfo->pRes); - SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - - while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); - if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { - continue; - } - - setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, true); - - if (pBlockInfo->rows > 0) { - return pInfo->pRes; - } - } - - pInfo->tqReader->msg = (SPackedData){0}; - pTaskInfo->streamInfo.submit = (SPackedData){0}; + if (isTaskKilled(pTaskInfo)) { return NULL; } -#endif if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);