diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 557dfbab5e30f79384cb805900ecacb21166fa97..19467d35d8bdb8309974d5f50738b9fecac9c2f6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -268,6 +268,7 @@ typedef struct SSourceDataInfo { uint64_t totalRows; int32_t code; EX_SOURCE_STATUS status; + const char* id; } SSourceDataInfo; typedef struct SLoadRemoteDataInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3ed879ed1b49c6da83956ec464681c131d02e0fe..5442cce625d7e085c5a2ade44cd0c0136d0dae10 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2432,7 +2432,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) pRsp->useconds = htobe64(pRsp->useconds); ASSERT(pRsp != NULL); - qDebug("fetch rsp received, index:%d, rows:%d", index, pRsp->numOfRows); + qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->id, index, pRsp->numOfRows); } else { pSourceDataInfo->code = code; } @@ -2630,6 +2630,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx while (1) { int32_t completed = 0; + qDebug("%s current ready sources:%ld", GET_TASKID(pTaskInfo), pExchangeInfo->ready.__align); tsem_wait(&pExchangeInfo->ready); for (int32_t i = 0; i < totalSources; ++i) { @@ -2654,10 +2655,10 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx SSDataBlock* pRes = pExchangeInfo->pResult; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { - qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - " try next", - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, - pExchangeInfo->loadInfo.totalRows); + qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + ", completed:%d try next %d/%"PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows, + pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; taosMemoryFreeClear(pDataInfo->pRsp); @@ -2673,10 +2674,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, - pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources); + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 ", completed:%d try next %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows, + pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources); + completed += 1; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 @@ -2696,10 +2698,30 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } } + int32_t comp = 0; + for(int32_t j = 0; j < totalSources; ++j) { + SSourceDataInfo* pDataInfo1 = taosArrayGet(pExchangeInfo->pSourceDataInfo, j); + if (pDataInfo1->status == EX_SOURCE_DATA_EXHAUSTED) { + comp += 1; + } + } + + if (comp == totalSources) { + setAllSourcesCompleted(pOperator, startTs); + } + return pExchangeInfo->pResult; } - if (completed == totalSources) { + int32_t comp = 0; + for(int32_t j = 0; j < totalSources; ++j) { + SSourceDataInfo* pDataInfo1 = taosArrayGet(pExchangeInfo->pSourceDataInfo, j); + if (pDataInfo1->status == EX_SOURCE_DATA_EXHAUSTED) { + comp += 1; + } + } + + if (comp == totalSources) { return setAllSourcesCompleted(pOperator, startTs); } } @@ -2847,7 +2869,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { } } -static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { +static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) { pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -2856,9 +2878,9 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; - dataInfo.pEx = pInfo; + dataInfo.pEx = pInfo; dataInfo.index = i; - + dataInfo.id = id; void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); if (ret == NULL) { taosArrayDestroy(pInfo->pSourceDataInfo); @@ -2888,7 +2910,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* taosArrayPush(pInfo->pSources, pNode); } - return initDataSource(numOfSources, pInfo); + return initDataSource(numOfSources, pInfo, id); } SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) { @@ -4710,11 +4732,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { - qDebug("[******]create Semi"); int32_t children = 0; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { - qDebug("[******]create Final"); int32_t children = 1; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {