diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9b9a1ef259936881ec80e5a0a406053eed9ce34c..f179c7bd41e732d6b97a10b98109b71a8004b71d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -239,6 +239,7 @@ typedef struct SSourceDataInfo { int32_t index; SRetrieveTableRsp* pRsp; uint64_t totalRows; + int64_t startTime; int32_t code; EX_SOURCE_STATUS status; const char* taskId; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c57a1b38eba63f08c3c44b4a2c87ff10f061e414..a28066003a2a60ca9f62326ff055f5193e366695 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -44,7 +44,7 @@ typedef struct SFetchRspHandleWrapper { static void destroyExchangeOperatorInfo(void* param); static void freeBlock(void* pParam); static void freeSourceDataInfo(void* param); -static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs); +static void* setAllSourcesCompleted(SOperatorInfo* pOperator); static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code); static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex); @@ -59,7 +59,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo); int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo); if (completed == totalSources) { - setAllSourcesCompleted(pOperator, pExchangeInfo->openedTs); + setAllSourcesCompleted(pOperator); return; } @@ -113,7 +113,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn taosArrayPush(pExchangeInfo->pResultBlockList, &pb); } - updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator); + updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator); + pDataInfo->totalRows += pRetrieveRsp->numOfRows; if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; @@ -388,6 +389,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); + pDataInfo->startTime = taosGetTimestampUs(); ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); @@ -493,18 +495,14 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo return TSDB_CODE_SUCCESS; } -void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { +void* setAllSourcesCompleted(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int64_t el = taosGetTimestampUs() - startTs; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; - - pLoadInfo->totalElapsed += el; - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms", - GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize, + qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms", + GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, pLoadInfo->totalElapsed / 1000.0); setOperatorCompleted(pOperator); @@ -566,7 +564,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { while (1) { if (pExchangeInfo->current >= totalSources) { - setAllSourcesCompleted(pOperator, startTs); + setAllSourcesCompleted(pOperator); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index add580ce7c89d03e9ca85f01a738ed80e6663722..14e316345576b665c015288797d886d30cd44136 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -690,6 +690,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } + qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo)); + SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -754,7 +756,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); - initResultSizeInfo(&pOperator->resultInfo, 1024); + initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); pInfo->groupSort = pMergePhyNode->groupSort;