diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 2c79b77e16262538a8d6078e2fa2281569d1b7ba..cf6263148a1947c76ce141f272da63d66247d741 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -63,6 +63,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator); static int32_t seqLoadRemoteData(SOperatorInfo* pOperator); static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator); static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); +static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo); static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo) { @@ -112,20 +113,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn break; } - SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; - int32_t index = 0; - char* pStart = pRetrieveRsp->data; - while (index++ < pRetrieveRsp->numOfBlocks) { - SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); - code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); - if (code != 0) { - taosMemoryFreeClear(pDataInfo->pRsp); - goto _error; - } - - taosArrayPush(pExchangeInfo->pResultBlockList, &pb); + code = doExtractResultBlocks(pExchangeInfo, pDataInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } + SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator); pDataInfo->totalRows += pRetrieveRsp->numOfRows; @@ -571,10 +564,32 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) { + SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; + + char* pStart = pRetrieveRsp->data; + int32_t index = 0; + int32_t code = 0; + while (index++ < pRetrieveRsp->numOfBlocks) { + SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); + + code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); + if (code != 0) { + taosMemoryFreeClear(pDataInfo->pRsp); + return code; + } + + taosArrayPush(pExchangeInfo->pResultBlockList, &pb); + } + + return code; +} + int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = 0; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); int64_t startTs = taosGetTimestampUs(); @@ -614,11 +629,12 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { continue; } - SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; - - char* pStart = pRetrieveRsp->data; - int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, NULL, &pStart); + code = doExtractResultBlocks(pExchangeInfo, pDataInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, @@ -641,6 +657,10 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { taosMemoryFreeClear(pDataInfo->pRsp); return TSDB_CODE_SUCCESS; } + + _error: + pTaskInfo->code = code; + return code; } int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {