diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a743e79ce729cd040fe3dbbf87661791f26944a4..3ed879ed1b49c6da83956ec464681c131d02e0fe 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2420,17 +2420,19 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param; + int32_t index = pSourceDataInfo->index; + if (code == TSDB_CODE_SUCCESS) { pSourceDataInfo->pRsp = pMsg->pData; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; pRsp->numOfRows = htonl(pRsp->numOfRows); - pRsp->compLen = htonl(pRsp->compLen); + pRsp->compLen = htonl(pRsp->compLen); pRsp->numOfCols = htonl(pRsp->numOfCols); - pRsp->useconds = htobe64(pRsp->useconds); + pRsp->useconds = htobe64(pRsp->useconds); - ASSERT(pSourceDataInfo->pRsp != NULL); - qDebug("fetch rsp received, index:%d, rows:%d", pSourceDataInfo->index, pRsp->numOfRows); + ASSERT(pRsp != NULL); + qDebug("fetch rsp received, index:%d, rows:%d", index, pRsp->numOfRows); } else { pSourceDataInfo->code = code; } @@ -2628,6 +2630,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx while (1) { int32_t completed = 0; + tsem_wait(&pExchangeInfo->ready); + for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { @@ -2725,7 +2729,6 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs); - tsem_wait(&pExchangeInfo->ready); pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs;