diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cd9f29978d644755f39adf2c44fdf6c5a9da57ff..94361554d09ba4cc61f95234d9adc6ed91a5f44d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -265,6 +265,7 @@ typedef struct SExchangeInfo { SLoadRemoteDataInfo loadInfo; uint64_t self; SLimitInfo limitInfo; + int64_t openedTs; // start exec time stamp } SExchangeInfo; typedef struct SScanInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b7c3eed069e3d1c339a01fcd55e7c20c43a8f97c..00733fa21f85bb0b9eeaef0b22145f8e5559b479 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1847,40 +1847,41 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { return NULL; } -static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, - SExecTaskInfo* pTaskInfo) { - int32_t code = 0; - int64_t startTs = taosGetTimestampUs(); - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + +static int32_t getCompletedSources(const SArray* pArray) { + size_t total = taosArrayGetSize(pArray); int32_t completed = 0; - for (int32_t k = 0; k < totalSources; ++k) { - SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); + for (int32_t k = 0; k < total; ++k) { + SSourceDataInfo* p = taosArrayGet(pArray, k); if (p->status == EX_SOURCE_DATA_EXHAUSTED) { completed += 1; } } + return completed; +} + +static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, + SExecTaskInfo* pTaskInfo) { + int32_t code = 0; + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo); + int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo); if (completed == totalSources) { - setAllSourcesCompleted(pOperator, startTs); + setAllSourcesCompleted(pOperator, pExchangeInfo->openedTs); return; } while (1) { -// printf("1\n"); tsem_wait(&pExchangeInfo->ready); -// printf("2\n"); for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { -// printf("========:%d is completed\n", i); continue; } -// printf("index:%d - status:%d\n", i, pDataInfo->status); if (pDataInfo->status != EX_SOURCE_DATA_READY) { -// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status); continue; } @@ -1896,27 +1897,18 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; -// printf("%d completed, try next\n", i); - qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, - pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources); + pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp); - -// if (completed == totalSources) { -// return; -// } else { -// break; -// } - break; + break; } SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; int32_t index = 0; char* pStart = pRetrieveRsp->data; while (index++ < pRetrieveRsp->numOfBlocks) { - printf("results, numOfBLock: %d\n", pRetrieveRsp->numOfBlocks); SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); if (code != 0) { @@ -1927,25 +1919,16 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn taosArrayPush(pExchangeInfo->pResultBlockList, &pb); } - updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); + updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator); -// int32_t completed = 0; if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - -// for (int32_t k = 0; k < totalSources; ++k) { -// SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); -// if (p->status == EX_SOURCE_DATA_EXHAUSTED) { -// completed += 1; -// } -// } - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - ", total:%.2f Kb, completed:%d try next %d/%" PRIzu, + ", total:%.2f Kb, try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, - completed, i + 1, totalSources); + i + 1, totalSources); } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", @@ -1963,23 +1946,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn goto _error; } } - -// if (completed == totalSources) { -// setAllSourcesCompleted(pOperator, startTs); -// } - return; - } - - int32_t completed = 0; - for (int32_t k = 0; k < totalSources; ++k) { - SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); - if (p->status == EX_SOURCE_DATA_EXHAUSTED) { - completed += 1; - } - } + } // end loop - if (completed == totalSources) { + int32_t complete1 = getCompletedSources(pExchangeInfo->pSourceDataInfo); + if (complete1 == totalSources) { + qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo)); return; } } @@ -2098,6 +2070,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { if (code != TSDB_CODE_SUCCESS) { return code; } + pExchangeInfo->openedTs = taosGetTimestampUs(); } OPTR_SET_OPENED(pOperator);