提交 83ee5536 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 93dfebe0
...@@ -268,6 +268,7 @@ typedef struct SSourceDataInfo { ...@@ -268,6 +268,7 @@ typedef struct SSourceDataInfo {
uint64_t totalRows; uint64_t totalRows;
int32_t code; int32_t code;
EX_SOURCE_STATUS status; EX_SOURCE_STATUS status;
const char* id;
} SSourceDataInfo; } SSourceDataInfo;
typedef struct SLoadRemoteDataInfo { typedef struct SLoadRemoteDataInfo {
......
...@@ -2432,7 +2432,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) ...@@ -2432,7 +2432,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
ASSERT(pRsp != NULL); ASSERT(pRsp != NULL);
qDebug("fetch rsp received, index:%d, rows:%d", index, pRsp->numOfRows); qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->id, index, pRsp->numOfRows);
} else { } else {
pSourceDataInfo->code = code; pSourceDataInfo->code = code;
} }
...@@ -2630,6 +2630,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2630,6 +2630,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
while (1) { while (1) {
int32_t completed = 0; int32_t completed = 0;
qDebug("%s current ready sources:%ld", GET_TASKID(pTaskInfo), pExchangeInfo->ready.__align);
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
for (int32_t i = 0; i < totalSources; ++i) { for (int32_t i = 0; i < totalSources; ++i) {
...@@ -2654,10 +2655,10 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2654,10 +2655,10 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SSDataBlock* pRes = pExchangeInfo->pResult; SSDataBlock* pRes = pExchangeInfo->pResult;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
" try next", ", completed:%d try next %d/%"PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows); pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1; completed += 1;
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);
...@@ -2673,10 +2674,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2673,10 +2674,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
} }
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources); pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
completed += 1;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
...@@ -2696,10 +2698,30 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2696,10 +2698,30 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
} }
} }
int32_t comp = 0;
for(int32_t j = 0; j < totalSources; ++j) {
SSourceDataInfo* pDataInfo1 = taosArrayGet(pExchangeInfo->pSourceDataInfo, j);
if (pDataInfo1->status == EX_SOURCE_DATA_EXHAUSTED) {
comp += 1;
}
}
if (comp == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
}
return pExchangeInfo->pResult; return pExchangeInfo->pResult;
} }
if (completed == totalSources) { int32_t comp = 0;
for(int32_t j = 0; j < totalSources; ++j) {
SSourceDataInfo* pDataInfo1 = taosArrayGet(pExchangeInfo->pSourceDataInfo, j);
if (pDataInfo1->status == EX_SOURCE_DATA_EXHAUSTED) {
comp += 1;
}
}
if (comp == totalSources) {
return setAllSourcesCompleted(pOperator, startTs); return setAllSourcesCompleted(pOperator, startTs);
} }
} }
...@@ -2847,7 +2869,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2847,7 +2869,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
} }
} }
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL) { if (pInfo->pSourceDataInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -2856,9 +2878,9 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { ...@@ -2856,9 +2878,9 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
for (int32_t i = 0; i < numOfSources; ++i) { for (int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0}; SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY; dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.pEx = pInfo; dataInfo.pEx = pInfo;
dataInfo.index = i; dataInfo.index = i;
dataInfo.id = id;
void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (ret == NULL) { if (ret == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo); taosArrayDestroy(pInfo->pSourceDataInfo);
...@@ -2888,7 +2910,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* ...@@ -2888,7 +2910,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
taosArrayPush(pInfo->pSources, pNode); taosArrayPush(pInfo->pSources, pNode);
} }
return initDataSource(numOfSources, pInfo); return initDataSource(numOfSources, pInfo, id);
} }
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
...@@ -4710,11 +4732,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4710,11 +4732,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
qDebug("[******]create Semi");
int32_t children = 0; int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
qDebug("[******]create Final");
int32_t children = 1; int32_t children = 1;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册