提交 52232cd2 编写于 作者: A Alex Duan

[TS-1132]<fix>(query): V24->windows client query slowest fixed

上级 7059180a
...@@ -902,7 +902,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -902,7 +902,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
// not belongs to the same group, return the result of current group; // not belongs to the same group, return the result of current group;
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv, true);
{ // reset output buffer { // reset output buffer
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
...@@ -954,7 +954,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -954,7 +954,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
// not belongs to the same group, return the result of current group // not belongs to the same group, return the result of current group
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv, true);
doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock); doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock);
savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData); savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData);
...@@ -985,6 +985,8 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -985,6 +985,8 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
} }
} }
// shrink output memory on end
shrinkOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity);
return (pRes->info.rows != 0)? pRes:NULL; return (pRes->info.rows != 0)? pRes:NULL;
} }
......
...@@ -664,7 +664,8 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil ...@@ -664,7 +664,8 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv); void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv, bool extendLarge);
void shrinkOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
......
...@@ -2170,6 +2170,10 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { ...@@ -2170,6 +2170,10 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
// set the corresponding tag data for each record // set the corresponding tag data for each record
// todo check malloc failure // todo check malloc failure
if (pCtx->tagInfo.numOfTagCols == 0) {
return ;
}
char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES); char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES);
for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) {
pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput;
...@@ -4694,6 +4698,10 @@ static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) { ...@@ -4694,6 +4698,10 @@ static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) {
pOutput += pCtx->outputBytes; pOutput += pCtx->outputBytes;
pTimestamp++; pTimestamp++;
} }
if (pCtx->tagInfo.numOfTagCols == 0) {
return ;
}
char **tagOutputs = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES); char **tagOutputs = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES);
for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) {
......
...@@ -3682,31 +3682,24 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i ...@@ -3682,31 +3682,24 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols);
} }
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv) { void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv, bool extendLarge) {
SSDataBlock* pDataBlock = pBInfo->pRes; SSDataBlock* pDataBlock = pBInfo->pRes;
int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer
if ((*bufCapacity) < newSize) { if ((*bufCapacity) < newSize) {
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
if (!extendColCapacity(pColInfo, newSize, &pBInfo->pCtx[i], bufCapacity, extendLarge)) {
char* p = realloc(pColInfo->pData, ((size_t)newSize) * pColInfo->info.bytes); // error throw except
if (p != NULL) {
pColInfo->pData = p;
// it starts from the tail of the previously generated results.
pBInfo->pCtx[i].pOutput = pColInfo->pData;
(*bufCapacity) = newSize;
} else {
size_t allocateSize = ((size_t)(newSize)) * pColInfo->info.bytes; size_t allocateSize = ((size_t)(newSize)) * pColInfo->info.bytes;
qError("can not allocate %zu bytes for output. Rows: %d, colBytes %d", qError("can not allocate %zu bytes for output. Rows: %d, colBytes %d",
allocateSize, newSize, pColInfo->info.bytes); allocateSize, newSize, pColInfo->info.bytes);
longjmp(runtimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(runtimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
return ;
} }
} }
} }
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
pBInfo->pCtx[i].pOutput = pColInfo->pData + (size_t)pColInfo->info.bytes * pDataBlock->info.rows; pBInfo->pCtx[i].pOutput = pColInfo->pData + (size_t)pColInfo->info.bytes * pDataBlock->info.rows;
...@@ -3726,6 +3719,26 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3726,6 +3719,26 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
} }
} }
// shrink pBInfo->pRes memory
void shrinkOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) {
SSDataBlock* pDataBlock = pBInfo->pRes;
int32_t rows = pDataBlock->info.rows + 5; // remain 5 buffer
// shrink if only too large blank space
if (*bufCapacity - rows <= 200) {
return ; // no need shrink
}
// bufCapcaity shrink to rows
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
void* pNew = realloc(pColInfo->pData, rows * pColInfo->info.bytes);
if (pNew)
pColInfo->pData = pNew;
}
*bufCapacity = rows;
}
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput) { void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
bool interpQuery = false; bool interpQuery = false;
int32_t tsNum = 0; int32_t tsNum = 0;
...@@ -6019,7 +6032,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -6019,7 +6032,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv, false);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) { if (pTableQueryInfo != NULL) {
...@@ -6088,7 +6101,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -6088,7 +6101,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv, false);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) { if (pTableQueryInfo != NULL) {
...@@ -6603,7 +6616,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa ...@@ -6603,7 +6616,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa
break; break;
} }
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv, false);
} }
} }
} }
...@@ -6623,7 +6636,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6623,7 +6636,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
pRes->info.rows = 0; pRes->info.rows = 0;
if (!pEveryInfo->groupDone) { if (!pEveryInfo->groupDone) {
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv, false);
doTimeEveryImpl(pOperator, pInfo->pCtx, pEveryInfo->lastBlock, false); doTimeEveryImpl(pOperator, pInfo->pCtx, pEveryInfo->lastBlock, false);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
...@@ -6659,7 +6672,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6659,7 +6672,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv, false);
doTimeEveryImpl(pOperator, pInfo->pCtx, pBlock, *newgroup); doTimeEveryImpl(pOperator, pInfo->pCtx, pBlock, *newgroup);
if (pEveryInfo->groupDone && pOperator->upstream[0]->notify) { if (pEveryInfo->groupDone && pOperator->upstream[0]->notify) {
...@@ -6685,7 +6698,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6685,7 +6698,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
if (!pEveryInfo->groupDone) { if (!pEveryInfo->groupDone) {
pEveryInfo->allDone = true; pEveryInfo->allDone = true;
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv, false);
doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
break; break;
...@@ -6706,7 +6719,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6706,7 +6719,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
// Return result of the previous group in the firstly. // Return result of the previous group in the firstly.
if (*newgroup) { if (*newgroup) {
if (!pEveryInfo->groupDone) { if (!pEveryInfo->groupDone) {
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv, false);
doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
pEveryInfo->existDataBlock = pBlock; pEveryInfo->existDataBlock = pBlock;
...@@ -6742,7 +6755,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6742,7 +6755,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv, false);
pEveryInfo->groupDone = false; pEveryInfo->groupDone = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册