提交 98638799 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 1da67a56
...@@ -265,6 +265,7 @@ typedef struct SExchangeInfo { ...@@ -265,6 +265,7 @@ typedef struct SExchangeInfo {
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
uint64_t self; uint64_t self;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp
} SExchangeInfo; } SExchangeInfo;
typedef struct SScanInfo { typedef struct SScanInfo {
......
...@@ -1847,40 +1847,41 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { ...@@ -1847,40 +1847,41 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
return NULL; return NULL;
} }
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
SExecTaskInfo* pTaskInfo) { static int32_t getCompletedSources(const SArray* pArray) {
int32_t code = 0; size_t total = taosArrayGetSize(pArray);
int64_t startTs = taosGetTimestampUs();
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int32_t completed = 0; int32_t completed = 0;
for (int32_t k = 0; k < totalSources; ++k) { for (int32_t k = 0; k < total; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); SSourceDataInfo* p = taosArrayGet(pArray, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) { if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1; completed += 1;
} }
} }
return completed;
}
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
SExecTaskInfo* pTaskInfo) {
int32_t code = 0;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo);
if (completed == totalSources) { if (completed == totalSources) {
setAllSourcesCompleted(pOperator, startTs); setAllSourcesCompleted(pOperator, pExchangeInfo->openedTs);
return; return;
} }
while (1) { while (1) {
// printf("1\n");
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
// printf("2\n");
for (int32_t i = 0; i < totalSources; ++i) { for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
// printf("========:%d is completed\n", i);
continue; continue;
} }
// printf("index:%d - status:%d\n", i, pDataInfo->status);
if (pDataInfo->status != EX_SOURCE_DATA_READY) { if (pDataInfo->status != EX_SOURCE_DATA_READY) {
// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status);
continue; continue;
} }
...@@ -1896,27 +1897,18 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -1896,27 +1897,18 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
// printf("%d completed, try next\n", i);
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources); pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);
break;
// if (completed == totalSources) {
// return;
// } else {
// break;
// }
break;
} }
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
int32_t index = 0; int32_t index = 0;
char* pStart = pRetrieveRsp->data; char* pStart = pRetrieveRsp->data;
while (index++ < pRetrieveRsp->numOfBlocks) { while (index++ < pRetrieveRsp->numOfBlocks) {
printf("results, numOfBLock: %d\n", pRetrieveRsp->numOfBlocks);
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
if (code != 0) { if (code != 0) {
...@@ -1927,25 +1919,16 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -1927,25 +1919,16 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
taosArrayPush(pExchangeInfo->pResultBlockList, &pb); taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
} }
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator);
// int32_t completed = 0;
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
// for (int32_t k = 0; k < totalSources; ++k) {
// SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
// if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
// completed += 1;
// }
// }
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb, completed:%d try next %d/%" PRIzu, ", total:%.2f Kb, try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
completed, i + 1, totalSources); i + 1, totalSources);
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
...@@ -1963,23 +1946,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -1963,23 +1946,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
goto _error; goto _error;
} }
} }
// if (completed == totalSources) {
// setAllSourcesCompleted(pOperator, startTs);
// }
return; return;
} } // end loop
int32_t completed = 0;
for (int32_t k = 0; k < totalSources; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
if (completed == totalSources) { int32_t complete1 = getCompletedSources(pExchangeInfo->pSourceDataInfo);
if (complete1 == totalSources) {
qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
return; return;
} }
} }
...@@ -2098,6 +2070,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2098,6 +2070,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
pExchangeInfo->openedTs = taosGetTimestampUs();
} }
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册