diff --git a/docs/zh/14-reference/05-taosbenchmark.md b/docs/zh/14-reference/05-taosbenchmark.md index 67d25e24aaf32ec3b833dbaba916f251c842d412..0d6aad62401daf76737caf803461c187189cb76f 100644 --- a/docs/zh/14-reference/05-taosbenchmark.md +++ b/docs/zh/14-reference/05-taosbenchmark.md @@ -87,7 +87,7 @@ taosBenchmark -f subscribe.json ```json -{{#include /taos-tools/example/tmq.json}} +{{#include /taos-tools/example/subscribe.json}} ``` diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c8b90a9991ba9d048c39650f372d68c5142a90ec..8693061d12a82e6978aa70a0e663462792ebf47d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2994,6 +2994,7 @@ typedef struct { int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp); int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp); +void tDeleteSTaosxRsp(STaosxRsp* pRsp); typedef struct { SMqRspHead head; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f08f54ef4bc06d2f2f137e0609e247da4448d46e..9f9a14952e75bdac29564c39dd4ef60da0d07ef0 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1773,7 +1773,7 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { } return TMQ_RES_TABLE_META; } else if (TD_RES_TMQ_TAOSX(res)) { - return TMQ_RES_TAOSX; + return TMQ_RES_DATA; } else { return TMQ_RES_INVALID; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1921415239cae804641820d8cda96443be655f14..b056677a0389f178a9d438d4bc452752d5d59d08 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5984,6 +5984,17 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { } return 0; } + +void tDeleteSTaosxRsp(STaosxRsp *pRsp) { + taosArrayDestroy(pRsp->blockDataLen); + taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); + taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); + + taosArrayDestroy(pRsp->createTableLen); + taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); +} + int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1; if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 19dd321814eee08fff0ad8e296b6e72d78bd4c6a..c3441a43f0e736881fc8bc491dd5717223645ed4 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -140,11 +140,12 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); // tqRead -int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); +int32_t tqScan(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); +int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); // tqExec -int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp); +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); // tqMeta diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index eed997b486f39a25eb09abc35158c48c5216f5af..3f42d8360ee8648ec66068c31773caea6faea41f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -196,6 +196,66 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } +int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { + ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); + + if (pRsp->withSchema) { + ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); + } else { + ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + } + + if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { + if (pRsp->blockNum > 0) { + ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + } else { + ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + } + } + + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); + if (code < 0) { + return -1; + } + int32_t tlen = sizeof(SMqRspHead) + len; + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; + ((SMqRspHead*)buf)->consumerId = pReq->consumerId; + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + + SEncoder encoder = {0}; + tEncoderInit(&encoder, abuf, len); + tEncodeSTaosxRsp(&encoder, pRsp); + tEncoderClear(&encoder); + + SRpcMsg rsp = { + .info = pMsg->info, + .pCont = buf, + .contLen = tlen, + .code = 0, + }; + tmsgSendRsp(&rsp); + + char buf1[80] = {0}; + char buf2[80] = {0}; + tFormatOffset(buf1, 80, &pRsp->reqOffset); + tFormatOffset(buf2, 80, &pRsp->rspOffset); + tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64 + ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", + TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); + + return 0; +} + static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && pLeft->val.version <= pRight->val.version; @@ -303,6 +363,22 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su return 0; } +static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { + pRsp->reqOffset = pReq->reqOffset; + + pRsp->withTbName = 1; + pRsp->withSchema = 1; + pRsp->blockData = taosArrayInit(0, sizeof(void*)); + pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); + pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); + pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); + + if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { + return -1; + } + return 0; +} + int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; @@ -341,9 +417,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); - SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); - // 2.reset offset if needed if (reqOffset.type > 0) { fetchOffsetNew = reqOffset; @@ -367,6 +440,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); @@ -380,16 +456,38 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; - code = -1; - tDeleteSMqDataRsp(&dataRsp); - return code; + return -1; } } } - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG) { - SMqMetaRsp metaRsp = {0}; - tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew); + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); + + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } + + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%ld, version:%ld", + consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, + dataRsp.rspOffset.uid, dataRsp.rspOffset.version); + + tDeleteSMqDataRsp(&dataRsp); + return code; + } + + // for taosx + ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN); + + SMqMetaRsp metaRsp = {0}; + + STaosxRsp taosxRsp = {0}; + tqInitTaosxRsp(&taosxRsp, pReq); + + if (fetchOffsetNew.type != TMQ_OFFSET__LOG) { + tqScan(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew); if (metaRsp.metaRspLen > 0) { if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { @@ -399,29 +497,30 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version); taosMemoryFree(metaRsp.metaRsp); - goto OVER; + tDeleteSTaosxRsp(&taosxRsp); + return code; } - if (dataRsp.blockNum > 0) { - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + if (taosxRsp.blockNum > 0) { + if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) { code = -1; } - goto OVER; + tDeleteSTaosxRsp(&taosxRsp); + return code; } else { - fetchOffsetNew = dataRsp.rspOffset; + fetchOffsetNew = taosxRsp.rspOffset; } - tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", - consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, - dataRsp.rspOffset.uid, dataRsp.rspOffset.version); + tqDebug("taosx poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", + consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type, + taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version); } - if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { + if (fetchOffsetNew.type == TMQ_OFFSET__LOG) { int64_t fetchVer = fetchOffsetNew.version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { - code = -1; - goto OVER; + return -1; } walSetReaderCapacity(pHandle->pWalReader, 2048); @@ -436,13 +535,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { - // TODO add push mgr - - tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) { code = -1; } - goto OVER; + tDeleteSTaosxRsp(&taosxRsp); + if (pCkHead) taosMemoryFree(pCkHead); + return code; } SWalCont* pHead = &pCkHead->head; @@ -453,17 +552,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - if (tqLogScanExec(pTq, pHandle, pCont, &dataRsp) < 0) { + if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) { /*ASSERT(0);*/ } // TODO batch optimization: // TODO continue scan until meeting batch requirement - if (dataRsp.blockNum > 0 /* threshold */) { - tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + if (taosxRsp.blockNum > 0 /* threshold */) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) { code = -1; } - goto OVER; + tDeleteSTaosxRsp(&taosxRsp); + if (pCkHead) taosMemoryFree(pCkHead); + return code; } else { fetchVer++; } @@ -472,30 +573,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pHandle->fetchMeta); ASSERT(IS_META_MSG(pHead->msgType)); tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); - SMqMetaRsp metaRsp = {0}; tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { code = -1; - goto OVER; + taosMemoryFree(pCkHead); + return code; } code = 0; - goto OVER; + if (pCkHead) taosMemoryFree(pCkHead); + return code; } } } - - // send empty to client - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; - } - -OVER: - if (pCkHead) taosMemoryFree(pCkHead); - tDeleteSMqDataRsp(&dataRsp); - return code; + return 0; } int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { @@ -602,10 +695,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); + buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, + (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = - qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index da596d07f9c63797d2cfc8a628d6767aea8b468b..8c3fa254461e2f31a71787606f6378c2f50868b5 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -60,7 +60,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { return 0; } -int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { +int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); @@ -89,18 +89,41 @@ int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs if (qExecTask(task, &pDataBlock, &ts) < 0) { ASSERT(0); } - tqDebug("tmq task execute end, get %p", pDataBlock); + tqDebug("tmq task executed, get %p", pDataBlock); - if (pDataBlock) { - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); - pRsp->blockNum++; + if (pDataBlock == NULL) { + break; + } + + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + pRsp->blockNum++; + + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + rowCnt += pDataBlock->info.rows; + if (rowCnt >= 4096) break; + } + } + + if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { + ASSERT(0); + return -1; + } + ASSERT(pRsp->rspOffset.type != 0); + + if (pRsp->withTbName) { + if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { + int64_t uid = pExec->pExecReader->msgIter.uid; + tqAddTbNameToRsp(pTq, uid, pRsp); + } else { + pRsp->withTbName = false; } } + ASSERT(pRsp->withSchema == false); return 0; } -int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) { +int32_t tqScan(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; @@ -134,7 +157,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* int64_t uid = 0; if (pOffset->type == TMQ_OFFSET__LOG) { uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { continue; } } else { @@ -144,18 +167,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* } if (pRsp->withSchema) { if (pOffset->type == TMQ_OFFSET__LOG) { - tqAddBlockSchemaToRsp(pExec, pRsp); + tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); } else { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); taosArrayPush(pRsp->blockSchema, &pSW); } } - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); - } else { - tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); - } + tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); pRsp->blockNum++; if (pOffset->type == TMQ_OFFSET__LOG) { continue; @@ -165,34 +184,32 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* } } - if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { - if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - if (qStreamExtractPrepareUid(task) != 0) { - continue; - } - tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), - pHandle->snapshotVer + 1); - break; - } - - if (pRsp->blockNum > 0) { - tqDebug("tmqsnap task exec exited, get data"); - break; - } - - SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); - if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { - tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts); - qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); - tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; - tqDebug("tmqsnap task exec change to get data"); + if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (qStreamExtractPrepareUid(task) != 0) { continue; } + tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), + pHandle->snapshotVer + 1); + break; + } - *pMetaRsp = *tmp; - tqDebug("tmqsnap task exec exited, get meta"); + if (pRsp->blockNum > 0) { + tqDebug("tmqsnap task exec exited, get data"); + break; } + SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); + if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { + tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts); + qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); + tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; + tqDebug("tmqsnap task exec change to get data"); + continue; + } + + *pMetaRsp = *tmp; + tqDebug("tmqsnap task exec exited, get meta"); + tqDebug("task exec exited"); break; } @@ -205,12 +222,11 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* return 0; } -int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp) { +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp) { STqExecHandle* pExec = &pHandle->execHandle; ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { - pRsp->withSchema = 1; STqReader* pReader = pExec->pExecReader; tqReaderSetDataMsg(pReader, pReq, 0); while (tqNextDataBlock(pReader)) { @@ -220,18 +236,17 @@ int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp } if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { blockDataFreeRes(&block); continue; } } - tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); + tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock)); blockDataFreeRes(&block); - tqAddBlockSchemaToRsp(pExec, pRsp); + tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); pRsp->blockNum++; } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { - pRsp->withSchema = 1; STqReader* pReader = pExec->pExecReader; tqReaderSetDataMsg(pReader, pReq, 0); while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { @@ -241,24 +256,25 @@ int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp } if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) { blockDataFreeRes(&block); continue; } } - tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); + tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock)); blockDataFreeRes(&block); - tqAddBlockSchemaToRsp(pExec, pRsp); + tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); pRsp->blockNum++; } -#if 0 +#if 1 if (pHandle->fetchMeta && pRsp->blockNum) { SSubmitMsgIter iter = {0}; tInitSubmitMsgIter(pReq, &iter); STaosxRsp* pXrsp = (STaosxRsp*)pRsp; while (1) { SSubmitBlk* pBlk = NULL; - if (tGetSubmitMsgNext(&iter, &pBlk) < 0) return -1; + if (tGetSubmitMsgNext(&iter, &pBlk) < 0) break; + if (pBlk == NULL) break; if (pBlk->schemaLen > 0) { if (pXrsp->createTableNum == 0) { pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b4e28403300249cc71e32f427505a8520e6f1297..89e5b2da0a173d20bff06906a3cdd33ba0fc67ed 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -143,6 +143,7 @@ typedef struct { STqOffsetVal prepareStatus; // for tmq STqOffsetVal lastStatus; // for tmq SMqMetaRsp metaRsp; // for tmq fetching meta + int8_t returned; int64_t snapshotVer; SSchemaWrapper *schema; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index f1ac9ef8b18ecbd74ee200f069281c36a7582fcb..278f02b2283f0e9ccc037d01f77e5956179a3627 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -745,6 +745,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SOperatorInfo* pOperator = pTaskInfo->pRoot; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); pTaskInfo->streamInfo.prepareStatus = *pOffset; + pTaskInfo->streamInfo.returned = 0; if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index de6768b83ab56b6c1302b1af6de4c7696463ba0c..9a5368e90e6b5c7f014314c7bc8fababff1c3370 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1285,17 +1285,22 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { qDebug("queue scan tsdb return %d rows", pResult->info.rows); + pTaskInfo->streamInfo.returned = 1; return pResult; } else { - STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; - tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); - qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1); - if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { + if (!pTaskInfo->streamInfo.returned) { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->dataReader); + pTSInfo->dataReader = NULL; + tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); + qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1); + if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { + return NULL; + } + ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1); + } else { return NULL; } - ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1); } }