diff --git a/include/common/tmsg.h b/include/common/tmsg.h index df76edffc974690057011fd5f32eb39a8e6d442f..dc997221e81d519eb5e0b292c925e546fa8d989f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3424,7 +3424,7 @@ typedef struct { int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq); void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag); -void tDestroySSubmitReq2(SSubmitReq2* pReq, int32_t flag); +void tDestroySSubmitReq(SSubmitReq2* pReq, int32_t flag); typedef struct { int32_t affectedRows; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f4d2ed01b079b11ff00c7d021a918f5dbe543240..3558feaa662b18728e50a8755fe25b469d65e4f5 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2388,7 +2388,7 @@ _end: if (terrno != 0) { *ppReq = NULL; if (pReq) { - tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFreeClear(pReq); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 189fa1326f1b42943300c176f2e7723147b4c7d4..155ef7a62a6c6c7c058a59526ff345d01e7fbf41 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7437,7 +7437,7 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) { } } -void tDestroySSubmitReq2(SSubmitReq2 *pReq, int32_t flag) { +void tDestroySSubmitReq(SSubmitReq2 *pReq, int32_t flag) { if (pReq->aSubmitTbData == NULL) return; int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index aecfb9c3e53b09fac6509478b095babe949249fd..3bcb1c9d2e4f0156619ce064423ae499f007512c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -259,17 +259,14 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); -void tqNextBlock(STqReader *pReader, SFetchRet *ret); +int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock); int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); -// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); -bool tqNextDataBlock(STqReader *pReader); -bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); +bool tqNextBlockImpl(STqReader *pReader); +bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); +int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); -// int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); -// int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ce987ca88ec930db76fcb560ff79f758e2bed1a2..69b3f9c3e02f9daa2fe645f751f9986000919700 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -684,7 +684,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma } if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { - tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8 " failed since %s", @@ -696,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); if (pReq) { - tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 5058a7fc76e16c85881a2f7fad30ca8109888263..2a26f65bf99db548532a20c86a2739d3f14eb8e5 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -332,7 +332,7 @@ _end: taosArrayDestroy(tagArray); taosArrayDestroy(pVals); if (pReq) { - tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2cda12c0e10fb02bd5928b4f349f75b7d1fad12b..0e9caf24ae4ed6b61de84e6a1c82dbb00f79e152 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -288,7 +288,7 @@ void tqCloseReader(STqReader* pReader) { } // free hash taosHashCleanup(pReader->tbIdHash); - tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); + tDestroySSubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); taosMemoryFree(pReader); } @@ -322,12 +322,11 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) { return 0; } -void tqNextBlock(STqReader* pReader, SFetchRet* ret) { +int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) { while (1) { if (pReader->msg2.msgStr == NULL) { if (walNextValidMsg(pReader->pWalReader) < 0) { - ret->fetchType = FETCH_TYPE__NONE; - return; + return FETCH_TYPE__NONE; } void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); @@ -337,15 +336,14 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) { tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); } - while (tqNextDataBlock(pReader)) { - memset(&ret->data, 0, sizeof(SSDataBlock)); - int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL); - if (code != 0 || ret->data.info.rows == 0) { + while (tqNextBlockImpl(pReader)) { + memset(pBlock, 0, sizeof(SSDataBlock)); + int32_t code = tqRetrieveDataBlock(pBlock, pReader, NULL); + if (code != TSDB_CODE_SUCCESS || pBlock->info.rows == 0) { continue; } - ret->fetchType = FETCH_TYPE__DATA; - return; + return FETCH_TYPE__DATA; } } } @@ -367,7 +365,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i return 0; } -bool tqNextDataBlock(STqReader* pReader) { +bool tqNextBlockImpl(STqReader* pReader) { if (pReader->msg2.msgStr == NULL) { return false; } @@ -387,20 +385,20 @@ bool tqNextDataBlock(STqReader* pReader) { tqDebug("tq reader block found, ver:%"PRId64", uid:%"PRId64, pReader->msg2.ver, pSubmitTbData->uid); return true; } else { - tqDebug("tq reader discard block, uid:%"PRId64", continue", pSubmitTbData->uid); + tqDebug("tq reader discard submit block, uid:%"PRId64", continue", pSubmitTbData->uid); } pReader->nextBlk++; } - tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); + tDestroySSubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->nextBlk = 0; pReader->msg2.msgStr = NULL; return false; } -bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { +bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) { if (pReader->msg2.msgStr == NULL) return false; int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); @@ -415,7 +413,7 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { pReader->nextBlk++; } - tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); + tDestroySSubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->nextBlk = 0; pReader->msg2.msgStr = NULL; @@ -451,7 +449,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap return 0; } -int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { +int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg2.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; @@ -560,7 +558,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD int32_t sourceIdx = 0; while (targetIdx < colActual) { if(sourceIdx >= numOfCols){ - tqError("tqRetrieveDataBlock2 sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); + tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); goto FAIL; } SColData* pCol = taosArrayGet(pCols, sourceIdx); @@ -568,7 +566,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD SColVal colVal; if(pCol->nVal != numOfRows){ - tqError("tqRetrieveDataBlock2 pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); + tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); goto FAIL; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 1c916d9adb2c6ef7a89d5020d5bf899e1e33857c..e049e2d390c05d017ddbc44819be58f82f427fdd 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -203,7 +203,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { STqReader* pReader = pExec->pTqReader; tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); - while (tqNextDataBlock(pReader)) { + while (tqNextBlockImpl(pReader)) { taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; @@ -262,7 +262,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { STqReader* pReader = pExec->pTqReader; tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); - while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) { + while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 62b81305b7233edd48e7dab796975b1ab69a7ad3..c2e6946b04ce9627f6c62beab3c833a57d001d44 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -695,7 +695,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* len += sizeof(SSubmitReq2Msg); pBuf = rpcMallocCont(len); if (NULL == pBuf) { - tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); goto _end; } ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode); @@ -707,11 +707,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* tqError("failed to encode submit req since %s", terrstr()); tEncoderClear(&encoder); rpcFreeCont(pBuf); - tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); continue; } tEncoderClear(&encoder); - tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index fad2e883aa01e3d80d87b8c43a7eb7b7f3fdc382..a46f7a7924c446b1f21daf2d45a390c9ee63b43a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -246,8 +246,6 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order static STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id); -static STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid); - static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, @@ -1354,16 +1352,40 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { return TSDB_CODE_SUCCESS; } +static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) { + ASSERT(pReader->pSchema == NULL); + + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema); + if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) { + terrno = code; + tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr); + return NULL; + } + + code = tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + tsdbError("failed to init merger, code:%s, %s", tstrerror(code), pReader->idStr); + return NULL; + } + + return pReader->pSchema; +} + static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData, uint64_t uid) { - int32_t code = 0; - int64_t st = taosGetTimestampUs(); + int32_t code = 0; + STSchema* pSchema = pReader->pSchema; + int64_t st = taosGetTimestampUs(); tBlockDataReset(pBlockData); - STSchema* pSchema = getLatestTableSchema(pReader, uid); - if (pSchema == NULL) { - tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr); - return code; + + if (pReader->pSchema == NULL) { + pSchema = getTableSchemaImpl(pReader, uid); + if (pSchema == NULL) { + tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr); + return code; + } } SBlockLoadSuppInfo* pSup = &pReader->suppInfo; @@ -1912,33 +1934,11 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas return code; } -STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) { - if (pReader->pSchema != NULL) { - return pReader->pSchema; - } - - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema); - if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) { - tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr); - } - - return pReader->pSchema; -} - static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { - int32_t code = 0; - // always set the newest schema version in pReader->pSchema if (pReader->pSchema == NULL) { - code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } - - code = tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema); - if (code != 0) { - terrno = code; + STSchema* ps = getTableSchemaImpl(pReader, uid); + if (ps == NULL) { return NULL; } } @@ -1953,7 +1953,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* } STSchema* ptr = NULL; - code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -1982,6 +1982,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* TSDBKEY k = TSDBROW_KEY(pRow); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + // merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized + if (pMerger->pArray == NULL) { + ASSERT(pReader->pSchema == NULL); + STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); + if (ps == NULL) { + return terrno; + } + } + int64_t minKey = 0; if (pReader->order == TSDB_ORDER_ASC) { minKey = INT64_MAX; // chosen the minimum value @@ -2011,13 +2020,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } + // todo remove init bool init = false; // ASC: file block ---> last block -----> imem -----> mem // DESC: mem -----> imem -----> last block -----> file block if (pReader->order == TSDB_ORDER_ASC) { if (minKey == key) { - init = true; // todo check if pReader->pSchema is null or not + init = true; int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2203,6 +2213,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SRowMerger* pMerger = &pReader->status.merger; + + // merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized + if (pMerger->pArray == NULL) { + ASSERT(pReader->pSchema == NULL); + STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); + if (ps == NULL) { + return terrno; + } + } if (hasDataInFileBlock(pBlockData, pDumpInfo)) { // no last block available, only data block exists @@ -2220,8 +2240,6 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key == ts) { SRow* pTSRow = NULL; - SRowMerger* pMerger = &pReader->status.merger; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2285,6 +2303,15 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } + // merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized + if (pMerger->pArray == NULL) { + ASSERT(pReader->pSchema == NULL); + STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); + if (ps == NULL) { + return terrno; + } + } + int64_t minKey = 0; if (ASCENDING_TRAVERSE(pReader->order)) { minKey = INT64_MAX; // let's find the minimum @@ -2596,6 +2623,7 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) { + SRowMerger* pMerger = &pReader->status.merger; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; bool copied = false; int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied); @@ -2603,6 +2631,15 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc return code; } + // merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized + if (pMerger->pArray == NULL) { + ASSERT(pReader->pSchema == NULL); + STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); + if (ps == NULL) { + return terrno; + } + } + if (copied) { pBlockScanInfo->lastKey = key; return TSDB_CODE_SUCCESS; @@ -2610,13 +2647,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); SRow* pTSRow = NULL; - code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); - code = tsdbRowMergerGetRow(&pReader->status.merger, &pTSRow); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2624,7 +2661,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear(&pReader->status.merger); + tsdbRowMergerClear(pMerger); return code; } } @@ -4412,6 +4449,10 @@ static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { pDst->pSchema = pSrc->pSchema; pDst->pSchemaMap = pSrc->pSchemaMap; pDst->pReadSnap = pSrc->pReadSnap; + + if (pDst->pSchema) { + tsdbRowMergerInit(&pDst->status.merger, pDst->pSchema); + } } void tsdbReaderClose(STsdbReader* pReader) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0dfde8f579cb97af21843eea13a9f0d18cd557a5..2e6d452e95335bd6e3cefb230fe22c03947ce153 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1388,7 +1388,7 @@ _exit: // clear taosArrayDestroy(newTbUids); - tDestroySSubmitReq2(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE); + tDestroySSubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE); tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE); if (code) terrno = code; diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 90d740bebd8658ced2346292f527e9ecf1ade2cf..22f388d40675243348a52d5d548d6d0ee2dc3ddd 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -301,7 +301,7 @@ _end: if (terrno != 0) { *ppReq = NULL; if (pReq) { - tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } return terrno; @@ -326,7 +326,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32 code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid); if (code) { if (pReq) { - tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } @@ -335,7 +335,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32 } code = submitReqToMsg(vgId, pReq, pMsg, msgLen); - tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); return code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2389c7252ef56265f0e695a97fd025b4a4a09675..8542481419d5dcbb4e4c6ebd638947396f48e378 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1646,10 +1646,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - while (tqNextDataBlock(pInfo->tqReader)) { + while (tqNextBlockImpl(pInfo->tqReader)) { SSDataBlock block = {0}; - int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); + int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; } @@ -1687,23 +1687,23 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { while (1) { - SFetchRet ret = {0}; - tqNextBlock(pInfo->tqReader, &ret); - tqOffsetResetToLog( - &pTaskInfo->streamInfo.currentOffset, - pInfo->tqReader->pWalReader->curVersion - 1); // curVersion move to next, so currentOffset = curVersion - 1 - - if (ret.fetchType == FETCH_TYPE__DATA) { - qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, ret.data.info.rows, + SSDataBlock block = {0}; + int32_t type = tqNextBlock(pInfo->tqReader, &block); + + // curVersion move to next, so currentOffset = curVersion - 1 + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); + + if (type == FETCH_TYPE__DATA) { + qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, block.info.rows, pTaskInfo->streamInfo.currentOffset.version); blockDataCleanup(pInfo->pRes); - setBlockIntoRes(pInfo, &ret.data, true); + setBlockIntoRes(pInfo, &block, true); if (pInfo->pRes->info.rows > 0) { qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); return pInfo->pRes; } - } else if (ret.fetchType == FETCH_TYPE__NONE) { + } else if (type == FETCH_TYPE__NONE) { qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); return NULL; } @@ -2072,11 +2072,10 @@ FETCH_NEXT_BLOCK: blockDataCleanup(pInfo->pRes); - while (tqNextDataBlock(pInfo->tqReader)) { + while (tqNextBlockImpl(pInfo->tqReader)) { SSDataBlock block = {0}; - int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); - + int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index ac504b9809ad495175d54025d881853d0f443ca9..a3b067b94dab042ef3707027b38135275071cf8b 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -324,7 +324,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) { return; } - tDestroySSubmitReq2(pVgCxt->pData, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pVgCxt->pData); taosMemoryFree(pVgCxt); }