提交 b1685398 编写于 作者: H Haojun Liao

fix(query): count the correct number of completed sources.

上级 d016fc9d
......@@ -1854,17 +1854,22 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
while (1) {
// printf("1\n");
tsem_wait(&pExchangeInfo->ready);
// printf("2\n");
int32_t completed = 0;
// int32_t completed = 0;
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
// printf("========:%d is completed\n", i);
// completed += 1;
continue;
}
// printf("index:%d --------3\n", i);
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status);
continue;
}
......@@ -1879,15 +1884,28 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
// todo
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
int32_t completed = 0;
for (int32_t k = 0; k < totalSources; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1;
taosMemoryFreeClear(pDataInfo->pRsp);
break;
// continue;
if (completed == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
return;
} else {
break;
}
}
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
......@@ -1906,15 +1924,23 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
int32_t completed = 0;
if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
for (int32_t k = 0; k < totalSources; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb, completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
completed + 1, i + 1, totalSources);
completed += 1;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed, i + 1, totalSources);
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
......@@ -1940,12 +1966,18 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
return;
}
int32_t completed = 0;
for (int32_t k = 0; k < totalSources; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
if (completed == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
return;
}
// sched_yield();
}
_error:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册