未验证 提交 159d6c47 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #14911 from taosdata/feature/3_liaohj

fix(query): fix memory leak.
......@@ -34,7 +34,6 @@ typedef enum EFunctionType {
FUNCTION_TYPE_ELAPSED,
FUNCTION_TYPE_IRATE,
FUNCTION_TYPE_LAST_ROW,
FUNCTION_TYPE_LAST_ROWT, // TODO: removed
FUNCTION_TYPE_MAX,
FUNCTION_TYPE_MIN,
FUNCTION_TYPE_MODE,
......
......@@ -224,12 +224,12 @@ typedef struct SRequestObj {
SArray* tableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry;
bool syncQuery; // todo refactor: async query object
bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry;
} SRequestObj;
typedef struct SSyncQueryParam {
......
......@@ -320,6 +320,10 @@ void doDestroyRequest(void *p) {
deregisterRequest(pRequest);
}
if (pRequest->syncQuery) {
taosMemoryFree(pRequest->body.param);
}
taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
}
......
......@@ -307,7 +307,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosThreadMutexUnlock(&appInfo.mutex);
tFreeClientHbBatchRsp(&pRsp);
taosMemoryFree(pMsg->pData);
return code;
}
......
......@@ -2047,6 +2047,7 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
void syncQueryFn(void* param, void* res, int32_t code) {
SSyncQueryParam* pParam = param;
pParam->pRequest = res;
if (pParam->pRequest) {
pParam->pRequest->code = code;
}
......@@ -2093,6 +2094,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
tsem_wait(&param->sem);
param->pRequest->syncQuery = true;
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
......
......@@ -3127,14 +3127,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
}
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
hasNext = (pBlockIter->numOfBlocks > 0);
}
/*
hasNext = blockIteratorNext(&pStatus->blockIter);
*/
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
}
return code;
......
......@@ -857,8 +857,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId,
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <common/ttime.h>
#include "ttime.h"
#include "function.h"
#include "functionMgt.h"
#include "index.h"
......@@ -603,13 +603,15 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
}
for (int32_t i = 0; i < numOfOutput; ++i) {
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0 ||
strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
if ((strcmp(pName, "_select_value") == 0) ||
(strcmp(pName, "_group_key") == 0)) {
pValCtx[num++] = &pCtx[i];
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
p = &pCtx[i];
}
}
#ifdef BUF_PAGE_DEBUG
qDebug("page_setSelect num:%d", num);
#endif
......
......@@ -3330,7 +3330,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
return (rows > 0) ? pInfo->pRes : NULL;
}
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
......@@ -3341,25 +3341,26 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity);
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
pInfo->existNewGroupBlock = NULL;
// *newgroup = true;
}
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
// *newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity);
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
if (pInfo->pRes->info.rows > pResultInfo->threshold) {
return;
}
}
// handle the cached new group data block
if (pInfo->existNewGroupBlock) {
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
}
}
......@@ -3372,8 +3373,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock);
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) {
return pResBlock;
}
......@@ -3407,7 +3408,9 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity);
int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
// current group has no more result to return
if (pResBlock->info.rows > 0) {
......@@ -3417,13 +3420,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
return pResBlock;
}
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo);
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
return pResBlock;
}
} else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, NULL, pTaskInfo);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold) {
return pResBlock;
}
......@@ -4032,8 +4035,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
}
}
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -4065,7 +4067,6 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
}
pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult;
pInfo->pCondition = pPhyFillNode->node.pConditions;
pInfo->pColMatchColInfo = pColMatchColInfo;
pOperator->name = "FillOperator";
......@@ -4422,6 +4423,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
cond.twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
cond.suid = pBlockNode->suid;
cond.type = BLOCK_LOAD_OFFSET_ORDER;
cond.startVersion = -1;
cond.endVersion = -1;
}
STsdbReader* pReader = NULL;
......@@ -4605,7 +4608,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
......
......@@ -106,7 +106,7 @@ bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
int32_t irateFunction(SqlFunctionCtx *pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t lastrowFunction(SqlFunctionCtx* pCtx);
int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx);
......@@ -120,7 +120,6 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getFirstLastInfoSize(int32_t resBytes);
int32_t lastRowFunction(SqlFunctionCtx *pCtx);
int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
......
......@@ -1370,11 +1370,6 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// todo
return TSDB_CODE_SUCCESS;
}
static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (3 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -2226,7 +2221,17 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastrowFunction,
.processFunc = lastRowFunction,
.finalizeFunc = firstLastFinalize
},
{
.name = "_cache_last_row",
.type = FUNCTION_TYPE_CACHE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = cacheLastRowFunction,
.finalizeFunc = firstLastFinalize
},
{
......
......@@ -5993,7 +5993,7 @@ int32_t interpFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
......
......@@ -444,14 +444,12 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
trans.pHandle = pMsg->handle;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
_return:
tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param);
taosMemoryFree(pMsg->pData);
SCH_RET(code);
}
......
......@@ -103,7 +103,8 @@ $ms2 = 1601481600000 - $cc
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0)
print ===> $rows
if $rows < 30 then
if $rows < 30 then
print expect greater than 30, actual: $rows
return -1
endi
if $rows > 50 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册