提交 447b2cb9 编写于 作者: H Haojun Liao

fix(query): update the load data log.

上级 ebe8192d
......@@ -858,8 +858,10 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLi
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, uint64_t* total, SArray* pColList, char** pNextStart);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo *pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
SOperatorInfo* pOperator);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
......
......@@ -2035,8 +2035,16 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
return TSDB_CODE_SUCCESS;
}
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, uint64_t* total, SArray* pColList, char** pNextStart) {
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
SOperatorInfo* pOperator) {
pInfo->totalRows += numOfRows;
pInfo->totalSize += dataLen;
pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
pOperator->resultInfo.totalRows += numOfRows;
}
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList,
char** pNextStart) {
if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes);
*pNextStart = (char*) blockDecode(pRes, pData);
......@@ -2064,24 +2072,16 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
}
blockDecode(pBlock, pStart);
blockDataEnsureCapacity(pRes, numOfRows);
blockDataEnsureCapacity(pRes, pBlock->info.rows);
// data from mnode
pRes->info.rows = numOfRows;
pRes->info.rows = pBlock->info.rows;
relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
blockDataDestroy(pBlock);
}
// todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
blockDataUpdateTsWindow(pRes, 0);
pLoadInfo->totalRows += numOfRows;
pLoadInfo->totalSize += compLen;
if (total != NULL) {
*total += numOfRows;
}
return TSDB_CODE_SUCCESS;
}
......@@ -2148,9 +2148,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
while(index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
blockDataEnsureCapacity(pb, pRetrieveRsp->numOfRows);
code =
extractDataBlockFromFetchRsp(pb, pLoadInfo, pRetrieveRsp->numOfRows, pStart,
pRetrieveRsp->compLen, pRetrieveRsp->numOfCols, &pDataInfo->totalRows, NULL, &pStart);
code = extractDataBlockFromFetchRsp(pb, pStart, pRetrieveRsp->numOfCols, NULL, &pStart);
if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error;
......@@ -2159,7 +2157,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
}
pLoadInfo->totalElapsed += (taosGetTimestampUs() - startTs);
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d"
......@@ -2272,8 +2270,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
char* pStart = pRetrieveRsp->data;
int32_t code = extractDataBlockFromFetchRsp(NULL, pLoadInfo, pRetrieveRsp->numOfRows, pStart, pRetrieveRsp->compLen,
pRetrieveRsp->numOfCols, &pDataInfo->totalRows, NULL, &pStart);
int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, pRetrieveRsp->numOfCols, NULL, &pStart);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
......@@ -2291,7 +2288,9 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
pLoadInfo->totalRows, pLoadInfo->totalSize);
}
pOperator->resultInfo.totalRows += pRetrieveRsp->numOfRows;
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
pDataInfo->totalRows += pRetrieveRsp->numOfRows;
taosMemoryFreeClear(pDataInfo->pRsp);
return TSDB_CODE_SUCCESS;
}
......
......@@ -2303,10 +2303,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
char* pStart = pRsp->data;
extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
pOperator->exprSupp.numOfExprs, NULL, pInfo->scanCols, &pStart);
extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pOperator->exprSupp.numOfExprs, pInfo->scanCols, &pStart);
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
//startTs,
// todo log the filter info
doFilterResult(pInfo);
taosMemoryFree(pRsp);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册