diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2544cedda744901ddc98bf9d7698fddf5907513f..2add3332ab28320f52c666141470f30e40b11fe8 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -177,6 +177,7 @@ typedef struct SSDataBlock { enum { FETCH_TYPE__DATA = 1, FETCH_TYPE__META, + FETCH_TYPE__SEP, FETCH_TYPE__NONE, }; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e0f48a4534cf8085eb7d438b9d88b3e5301e4d6d..66f992f05f94f1e583118044cea2271ec8fcd7ee 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1603,6 +1603,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; + tscDebug("consumer %ld actual process poll rsp", tmq->consumerId); /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { @@ -1718,7 +1719,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { while (1) { tmqHandleAllDelayedTask(tmq); - if (tmqPollImpl(tmq, timeout) < 0) return NULL; + if (tmqPollImpl(tmq, timeout) < 0) { + tscDebug("return since poll err"); + /*return NULL;*/ + } rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 490a313cd4372b29d7229213da8ca0fe766c4cf9..070abadee2d59dc8641feb5438966b28f05b381e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -420,6 +420,8 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su return -1; } + pRsp->withTbName = 0; +#if 0 pRsp->withTbName = pReq->withTbName; if (pRsp->withTbName) { pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); @@ -428,6 +430,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su return -1; } } +#endif if (subType == TOPIC_SUB_TYPE__COLUMN) { pRsp->withSchema = false; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 13ed36e2bc1dc609bff5cc542d5f303a02936b6d..e7d6c1eb47f215056af6410ac93172617949dc60 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,6 +213,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { + tqDebug("vgId:%d tq push msg ver %ld", pTq->pVnode->config.vgId, ver); + if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost taosWLockLatch(&pTq->pushLock); @@ -257,6 +259,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) pRsp->blockNum++; } + tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, + pPushEntry->pHandle->subKey, pRsp->blockNum); if (pRsp->blockNum > 0) { // set offset tqOffsetResetToLog(&pRsp->rspOffset, ver); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index d93a8fe0301c78a85b40bc938e2fdb5a9e406a19..3bd31e66608a0dcda4aad43edb4e8a885e22b81a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -314,14 +314,18 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { return -1; } void* body = pReader->pWalReader->pHead->head.body; +#if 0 if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { // TODO do filter ret->fetchType = FETCH_TYPE__META; ret->meta = pReader->pWalReader->pHead->head.body; return 0; } else { - tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version); +#endif + tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version); +#if 0 } +#endif } while (tqNextDataBlock(pReader)) { @@ -333,6 +337,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { continue; } ret->fetchType = FETCH_TYPE__DATA; + tqDebug("return data rows %d", ret->data.info.rows); return 0; } @@ -340,7 +345,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; ASSERT(pReader->ver >= 0); - ret->fetchType = FETCH_TYPE__NONE; + ret->fetchType = FETCH_TYPE__SEP; tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version); return 0; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 27760d7969d385786fb88215d117835f14b1baa4..56470f066801283eae6a5414b8f6d90d86ce8e86 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -193,6 +193,7 @@ enum { OP_OPENED = 0x1, OP_RES_TO_RETURN = 0x5, OP_EXEC_DONE = 0x9, + OP_EXEC_RECV = 0x11, }; typedef struct SOperatorFpSet { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index b7c5b5634e9fa472beaf5d0ea88e16b8d67e62b5..8d40824cc4d38684f7df886ed7df2a925f5ec468 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -241,9 +241,20 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // The downstream exec may change the value of the newgroup, so use a local variable instead. SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pOperator->status == OP_EXEC_RECV && + pFinalRes->info.rows == 0) { + pOperator->status = OP_OPENED; + continue; + } + qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, + pFinalRes->info.rows); doSetOperatorCompleted(pOperator); break; } + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { + qDebug("set status recv"); + pOperator->status = OP_EXEC_RECV; + } // for stream interval if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT || @@ -258,7 +269,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } setInfoForNewGroup(pBlock, pLimitInfo, pOperator); - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && pOperator->status == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { break; } @@ -302,6 +313,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { + qDebug("project return %d rows, status %d", pFinalRes->info.rows, pOperator->status); break; } } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 46280295b9b57bad5225fdea9ba70f82e49c583f..927ef2c64d03c050d859a2878765d4cfcddc7f41 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1504,8 +1504,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (setBlockIntoRes(pInfo, &ret.data) < 0) { ASSERT(0); } - // TODO clean data block if (pInfo->pRes->info.rows > 0) { + pOperator->status = OP_EXEC_RECV; qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); return pInfo->pRes; } @@ -1514,18 +1514,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { // pTaskInfo->streamInfo.lastStatus = ret.offset; // pTaskInfo->streamInfo.metaBlk = ret.meta; // return NULL; - } else if (ret.fetchType == FETCH_TYPE__NONE) { + } else if (ret.fetchType == FETCH_TYPE__NONE || + (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { pTaskInfo->streamInfo.lastStatus = ret.offset; ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); char formatBuf[80]; tFormatOffset(formatBuf, 80, &ret.offset); qDebug("queue scan log return null, offset %s", formatBuf); + pOperator->status = OP_OPENED; return NULL; - } else { - ASSERT(0); } } +#if 0 } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { @@ -1534,6 +1535,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } qDebug("stream scan tsdb return null"); return NULL; +#endif } else { ASSERT(0); return NULL;