From c7bf08d5af4991b57ddec1304ba2d1b42e4e81e7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 24 Aug 2022 17:42:33 +0800 Subject: [PATCH] feat: get snapshot data for taosX --- include/libs/executor/executor.h | 1 - source/dnode/vnode/inc/vnode.h | 2 - source/dnode/vnode/src/inc/tq.h | 4 + source/dnode/vnode/src/tq/tq.c | 119 ++++++++++++++++++++---- source/dnode/vnode/src/tq/tqExec.c | 58 +++++++++--- source/dnode/vnode/src/tq/tqMeta.c | 5 +- source/dnode/vnode/src/tq/tqRead.c | 21 +++-- source/libs/executor/inc/executorimpl.h | 5 - source/libs/executor/src/scanoperator.c | 118 ++++++++--------------- 9 files changed, 203 insertions(+), 130 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 327324d86d..25a6221fcb 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -43,7 +43,6 @@ typedef struct { int32_t numOfVgroups; void* sContext; // SSnapContext* - SHashObj *pFilterOutTbUid; void* pStateBackend; } SReadHandle; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ac0d9ebbe0..627dbf0ac5 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -215,8 +215,6 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); -int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, SWalCkHead** ppCkHead); -SSDataBlock* tqLogScanExec(int8_t subType, STqReader* pReader, SHashObj* pFilterOutTbUid, SSDataBlock* block); void vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index dd010e72aa..7c394c4baf 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -101,6 +101,7 @@ typedef struct { int64_t snapshotVer; + SWalReader* pWalReader; SWalRef* pRef; @@ -140,7 +141,10 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); // tqRead int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); +int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); +// tqExec +int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); // tqMeta diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aa1835efd2..89c33d76d3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -310,6 +310,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = 0; STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal fetchOffsetNew; + SWalCkHead* pCkHead = NULL; // 1.find handle STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); @@ -340,7 +341,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); - SMqMetaRsp metaRsp = {0}; SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); @@ -385,32 +385,113 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } } - tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew); + if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG){ + SMqMetaRsp metaRsp = {0}; + tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew); - if(dataRsp.blockNum != 0){ - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; + if(metaRsp.metaRspLen > 0){ + if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { + code = -1; + } + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey, + TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version); + taosMemoryFree(metaRsp.metaRsp); + goto OVER; + } + + if (dataRsp.blockNum > 0){ + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } + }else{ + fetchOffsetNew = dataRsp.rspOffset; } - goto OVER; + + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey, + TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.version); } - if(metaRsp.metaRspLen > 0){ - if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { + int64_t fetchVer = fetchOffsetNew.version + 1; + pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + if (pCkHead == NULL) { code = -1; + goto OVER; } - taosMemoryFree(metaRsp.metaRsp); - goto OVER; - } - tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, no data", consumerId, pHandle->subKey, - TD_VID(pTq->pVnode)); + walSetReaderCapacity(pHandle->pWalReader, 2048); + + while (1) { + consumerEpoch = atomic_load_32(&pHandle->epoch); + if (consumerEpoch > reqEpoch) { + tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64 + ", found new consumer epoch %d, discard req epoch %d", + consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); + break; + } + + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { + // TODO add push mgr + + tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); + ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version); + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } + goto OVER; + } + + SWalCont* pHead = &pCkHead->head; - tqOffsetResetToLog(&dataRsp.rspOffset, fetchOffsetNew.version); + tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, + pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); + + if (pHead->msgType == TDMT_VND_SUBMIT) { + SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + + if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) { + /*ASSERT(0);*/ + } + // TODO batch optimization: + // TODO continue scan until meeting batch requirement + if (dataRsp.blockNum > 0 /* threshold */) { + tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); + ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version); + + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } + goto OVER; + } else { + fetchVer++; + } + + } else { + ASSERT(pHandle->fetchMeta); + ASSERT(IS_META_MSG(pHead->msgType)); + tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + SMqMetaRsp metaRsp = {0}; + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); + metaRsp.resMsgType = pHead->msgType; + metaRsp.metaRspLen = pHead->bodyLen; + metaRsp.metaRsp = pHead->body; + if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { + code = -1; + goto OVER; + } + code = 0; + goto OVER; + } + } + } + + // send empty to client if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } -OVER: +OVER: + if (pCkHead) taosMemoryFree(pCkHead); // TODO wrap in destroy func taosArrayDestroy(dataRsp.blockDataLen); taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree); @@ -526,16 +607,19 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { + pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); - handle.tqReader = pHandle->execHandle.pExecReader; - handle.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid; pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + + pHandle->execHandle.execTb.suid = req.suid; + SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid); @@ -548,7 +632,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe taosArrayDestroy(tbUidList); buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); - handle.tqReader = pHandle->execHandle.pExecReader; pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 845b75610c..e7cb7a3af4 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -120,18 +120,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* } } + qStreamExtractOffset(task, &pRsp->rspOffset); + if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && qStreamExtractPrepareUid(task) != 0){ continue; } tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); -// tqOffsetResetToLog(pOffset, pHandle->snapshotVer); -// qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); + break; } if (pRsp->blockNum > 0){ - qStreamExtractOffset(task, &pRsp->rspOffset); tqDebug("tmqsnap task exec exited, get data"); break; } @@ -203,22 +203,56 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S } #endif -SSDataBlock* tqLogScanExec(int8_t subType, STqReader* pReader, SHashObj* pFilterOutTbUid, SSDataBlock* block) { - if (subType == TOPIC_SUB_TYPE__TABLE) { +int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) { + ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); + + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + pRsp->withSchema = 1; + STqReader* pReader = pExec->pExecReader; + tqReaderSetDataMsg(pReader, pReq, 0); while (tqNextDataBlock(pReader)) { - if (tqRetrieveDataBlock(block, pReader) < 0) { + SSDataBlock block = {0}; + if (tqRetrieveDataBlock(&block, pReader) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } - return block; + if (pRsp->withTbName) { + int64_t uid = pExec->pExecReader->msgIter.uid; + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + blockDataFreeRes(&block); + continue; + } + } + tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); + blockDataFreeRes(&block); + tqAddBlockSchemaToRsp(pExec, pRsp); + pRsp->blockNum++; } - } else if (subType == TOPIC_SUB_TYPE__DB) { - while (tqNextDataBlockFilterOut(pReader, pFilterOutTbUid)) { - if (tqRetrieveDataBlock(block, pReader) < 0) { + } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { + pRsp->withSchema = 1; + STqReader* pReader = pExec->pExecReader; + tqReaderSetDataMsg(pReader, pReq, 0); + while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { + SSDataBlock block = {0}; + if (tqRetrieveDataBlock(&block, pReader) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } - return block; + if (pRsp->withTbName) { + int64_t uid = pExec->pExecReader->msgIter.uid; + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + blockDataFreeRes(&block); + continue; + } + } + tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); + blockDataFreeRes(&block); + tqAddBlockSchemaToRsp(pExec, pRsp); + pRsp->blockNum++; } } - return NULL; + if (pRsp->blockNum == 0) { + return -1; + } + + return 0; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 58c3b7909b..6b6717ff57 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -269,12 +269,11 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ASSERT(handle.execHandle.pExecReader); } else { + handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); +// handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); - reader.tqReader = handle.execHandle.pExecReader; - reader.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid; handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 540759fc77..6e2a6fdb71 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -15,21 +15,22 @@ #include "tq.h" -int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, SWalCkHead** ppCkHead) { +int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { int32_t code = 0; - taosThreadMutexLock(&pWalReader->mutex); + taosThreadMutexLock(&pHandle->pWalReader->mutex); int64_t offset = *fetchOffset; while (1) { - if (walFetchHead(pWalReader, offset, *ppCkHead) < 0) { - tqDebug("tmq poll no more log to return"); + if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { + tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", + pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); *fetchOffset = offset - 1; code = -1; goto END; } if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { - code = walFetchBody(pWalReader, ppCkHead); + code = walFetchBody(pHandle->pWalReader, ppCkHead); if (code < 0) { ASSERT(0); @@ -41,10 +42,10 @@ int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, code = 0; goto END; } else { - if (fetchMeta) { + if (pHandle->fetchMeta) { SWalCont* pHead = &((*ppCkHead)->head); if (IS_META_MSG(pHead->msgType)) { - code = walFetchBody(pWalReader, ppCkHead); + code = walFetchBody(pHandle->pWalReader, ppCkHead); if (code < 0) { ASSERT(0); @@ -57,7 +58,7 @@ int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, goto END; } } - code = walSkipFetchBody(pWalReader, *ppCkHead); + code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead); if (code < 0) { ASSERT(0); *fetchOffset = offset; @@ -67,8 +68,8 @@ int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, offset++; } } -END: - taosThreadMutexUnlock(&pWalReader->mutex); + END: + taosThreadMutexUnlock(&pHandle->pWalReader->mutex); return code; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a06990bb81..d03c06790c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -499,14 +499,9 @@ typedef struct SStreamRawScanInfo{ // void *metaInfo; // void *dataInfo; SVnode* vnode; - SWalCkHead* pCkHead; - bool needFetchLog; - bool hasDataInOneFetchVer; SSDataBlock pRes; // result SSDataBlock STsdbReader* dataReader; SSnapContext* sContext; - STqReader* tqReader; - SHashObj* pFilterOutTbUid; }SStreamRawScanInfo; typedef struct SSysTableScanInfo { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 807b70b9e0..ffcc79e9c9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1533,10 +1533,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - SDataBlockInfo binfo = pBlock->info; - tsdbRetrieveDataBlockInfo(pInfo->dataReader, &binfo); - - pBlock->info = binfo; + tsdbRetrieveDataBlockInfo(pInfo->dataReader, &pBlock->info); SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); pBlock->pDataBlock = pCols; @@ -1597,78 +1594,52 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } return NULL; - }else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { - int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version; - - while(1){ - if(pInfo->needFetchLog){ - fetchVer++; - if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { - qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer); - pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; - pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; - return NULL; - } - SWalCont* pHead = &pInfo->pCkHead->head; - qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); - - if (pHead->msgType == TDMT_VND_SUBMIT) { - SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); - pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; - pTaskInfo->streamInfo.lastStatus.version = fetchVer; - pInfo->hasDataInOneFetchVer = false; - pInfo->pRes.pDataBlock = NULL; - } - } - - SWalCont* pHead = &pInfo->pCkHead->head; - - if (pHead->msgType == TDMT_VND_SUBMIT) { - blockDataFreeRes(&pInfo->pRes); - SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes); - if(block){ - qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); - pInfo->needFetchLog = false; - pInfo->hasDataInOneFetchVer = true; - return block; - }else{ - pInfo->needFetchLog = true; - - if(pInfo->hasDataInOneFetchVer){ - return block; - }else{ - continue; - } - } - } else if(pInfo->sContext->withMeta){ - ASSERT(IS_META_MSG(pHead->msgType)); - qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); - pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; - pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; - pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; - pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; - pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); - memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); - return NULL; - } - - pInfo->needFetchLog = true; - } } +// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { +// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1; +// +// while(1){ +// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { +// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer); +// pTaskInfo->streamInfo.lastStatus.version = fetchVer; +// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; +// return NULL; +// } +// SWalCont* pHead = &pInfo->pCkHead->head; +// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); +// +// if (pHead->msgType == TDMT_VND_SUBMIT) { +// SSubmitReq* pCont = (SSubmitReq*)&pHead->body; +// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); +// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes); +// if(block){ +// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; +// pTaskInfo->streamInfo.lastStatus.version = fetchVer; +// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); +// return block; +// }else{ +// fetchVer++; +// } +// } else{ +// ASSERT(pInfo->sContext->withMeta); +// ASSERT(IS_META_MSG(pHead->msgType)); +// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); +// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; +// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; +// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; +// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; +// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); +// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); +// return NULL; +// } +// } return NULL; } static void destroyRawScanOperatorInfo(void* param, int32_t numOfOutput) { SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param; - taosMemoryFreeClear(pRawScan->pCkHead); - if (pRawScan->tqReader) { - tqCloseReader(pRawScan->tqReader); - } - blockDataFreeRes(&pRawScan->pRes); tsdbReaderClose(pRawScan->dataReader); destroySnapContext(pRawScan->sContext); - taosHashCleanup(pRawScan->pFilterOutTbUid); taosMemoryFree(pRawScan); } @@ -1688,18 +1659,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT return NULL; } - pInfo->pCkHead = taosMemoryCalloc(1, sizeof(SWalCkHead) + 2048); - if (pInfo->pCkHead == NULL) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - pInfo->needFetchLog = true; - pInfo->hasDataInOneFetchVer = false; - pInfo->vnode = pHandle->vnode; - pInfo->pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - pInfo->tqReader = pHandle->tqReader; - walSetReaderCapacity(pInfo->tqReader->pWalReader, 2048); pInfo->sContext = pHandle->sContext; pOperator->name = "RawStreamScanOperator"; -- GitLab