提交 1da67a56 编写于 作者: H Haojun Liao

fix(query): set correct status.

上级 b1685398
......@@ -1853,21 +1853,32 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
int64_t startTs = taosGetTimestampUs();
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int32_t completed = 0;
for (int32_t k = 0; k < totalSources; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
if (completed == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
return;
}
while (1) {
// printf("1\n");
tsem_wait(&pExchangeInfo->ready);
// printf("2\n");
// int32_t completed = 0;
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
// printf("========:%d is completed\n", i);
// completed += 1;
continue;
}
// printf("index:%d --------3\n", i);
// printf("index:%d - status:%d\n", i, pDataInfo->status);
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status);
continue;
......@@ -1885,33 +1896,27 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
int32_t completed = 0;
for (int32_t k = 0; k < totalSources; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
// printf("%d completed, try next\n", i);
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources);
taosMemoryFreeClear(pDataInfo->pRsp);
if (completed == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
return;
} else {
break;
}
// if (completed == totalSources) {
// return;
// } else {
// break;
// }
break;
}
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
int32_t index = 0;
char* pStart = pRetrieveRsp->data;
while (index++ < pRetrieveRsp->numOfBlocks) {
printf("results, numOfBLock: %d\n", pRetrieveRsp->numOfBlocks);
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
if (code != 0) {
......@@ -1924,16 +1929,16 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
int32_t completed = 0;
// int32_t completed = 0;
if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
for (int32_t k = 0; k < totalSources; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
// for (int32_t k = 0; k < totalSources; ++k) {
// SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
// if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
// completed += 1;
// }
// }
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
......@@ -1959,9 +1964,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
}
}
if (completed == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
}
// if (completed == totalSources) {
// setAllSourcesCompleted(pOperator, startTs);
// }
return;
}
......@@ -1975,7 +1980,6 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
}
if (completed == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
return;
}
}
......
......@@ -186,6 +186,7 @@ endi
sql select t1, count(*), first(c9) from $stb partition by t1 order by t1 asc slimit 3
if $rows != 3 then
print expect 3, actual: $rows
return -1
endi
if $data(1)[1] != 1 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册