From 447b2cb9f1b8cd84e6e1c75890488c0ee9367e3d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 8 Aug 2022 12:56:03 +0800 Subject: [PATCH] fix(query): update the load data log. --- source/libs/executor/inc/executorimpl.h | 6 ++-- source/libs/executor/src/executorimpl.c | 37 ++++++++++++------------- source/libs/executor/src/scanoperator.c | 5 ++-- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 62b2cc0fa6..1b8be3b53c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -858,8 +858,10 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLi void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); -int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, - int32_t compLen, int32_t numOfOutput, uint64_t* total, SArray* pColList, char** pNextStart); +int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart); +void updateLoadRemoteInfo(SLoadRemoteDataInfo *pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, + SOperatorInfo* pOperator); + STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b0929a9acd..752b93bcea 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2035,8 +2035,16 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf return TSDB_CODE_SUCCESS; } -int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, - int32_t compLen, int32_t numOfOutput, uint64_t* total, SArray* pColList, char** pNextStart) { +void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, + SOperatorInfo* pOperator) { + pInfo->totalRows += numOfRows; + pInfo->totalSize += dataLen; + pInfo->totalElapsed += (taosGetTimestampUs() - startTs); + pOperator->resultInfo.totalRows += numOfRows; +} + +int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, + char** pNextStart) { if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); *pNextStart = (char*) blockDecode(pRes, pData); @@ -2064,24 +2072,16 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo } blockDecode(pBlock, pStart); - blockDataEnsureCapacity(pRes, numOfRows); + blockDataEnsureCapacity(pRes, pBlock->info.rows); // data from mnode - pRes->info.rows = numOfRows; + pRes->info.rows = pBlock->info.rows; relocateColumnData(pRes, pColList, pBlock->pDataBlock, false); blockDataDestroy(pBlock); } // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator. blockDataUpdateTsWindow(pRes, 0); - - pLoadInfo->totalRows += numOfRows; - pLoadInfo->totalSize += compLen; - - if (total != NULL) { - *total += numOfRows; - } - return TSDB_CODE_SUCCESS; } @@ -2148,9 +2148,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn while(index++ < pRetrieveRsp->numOfBlocks) { SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); blockDataEnsureCapacity(pb, pRetrieveRsp->numOfRows); - code = - extractDataBlockFromFetchRsp(pb, pLoadInfo, pRetrieveRsp->numOfRows, pStart, - pRetrieveRsp->compLen, pRetrieveRsp->numOfCols, &pDataInfo->totalRows, NULL, &pStart); + code = extractDataBlockFromFetchRsp(pb, pStart, pRetrieveRsp->numOfCols, NULL, &pStart); if (code != 0) { taosMemoryFreeClear(pDataInfo->pRsp); goto _error; @@ -2159,7 +2157,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn taosArrayPush(pExchangeInfo->pResultBlockList, &pb); } - pLoadInfo->totalElapsed += (taosGetTimestampUs() - startTs); + updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d" @@ -2272,8 +2270,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; char* pStart = pRetrieveRsp->data; - int32_t code = extractDataBlockFromFetchRsp(NULL, pLoadInfo, pRetrieveRsp->numOfRows, pStart, pRetrieveRsp->compLen, - pRetrieveRsp->numOfCols, &pDataInfo->totalRows, NULL, &pStart); + int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, pRetrieveRsp->numOfCols, NULL, &pStart); if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 @@ -2291,7 +2288,9 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pLoadInfo->totalRows, pLoadInfo->totalSize); } - pOperator->resultInfo.totalRows += pRetrieveRsp->numOfRows; + updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); + pDataInfo->totalRows += pRetrieveRsp->numOfRows; + taosMemoryFreeClear(pDataInfo->pRsp); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a2e0b68623..0bd0d68cae 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2303,10 +2303,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { } char* pStart = pRsp->data; - extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen, - pOperator->exprSupp.numOfExprs, NULL, pInfo->scanCols, &pStart); + extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pOperator->exprSupp.numOfExprs, pInfo->scanCols, &pStart); + updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); - //startTs, // todo log the filter info doFilterResult(pInfo); taosMemoryFree(pRsp); -- GitLab