未验证 提交 dc131301 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #14221 from taosdata/feature/3_liaohj

fix(query): fix memory leak
......@@ -1284,6 +1284,7 @@ void tFreeSShowRsp(SShowRsp* pRsp);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
char tb[TSDB_TABLE_NAME_LEN];
char user[TSDB_USER_LEN];
int64_t showId;
} SRetrieveTableReq;
......
......@@ -3017,6 +3017,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -3032,6 +3033,8 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......
......@@ -122,6 +122,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
int32_t size = sizeof(SShowObj);
SShowObj showObj = {0};
showObj.id = showId;
showObj.pMnode = pMnode;
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
......
......@@ -358,6 +358,7 @@ typedef struct SSysTableScanInfo {
tsem_t ready;
SReadHandle readHandle;
int32_t accountId;
const char* pUser;
bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
SMTbCursor* pCur; // cursor for iterate the local table meta store.
......@@ -713,7 +714,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
......
......@@ -4043,7 +4043,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo) {
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, const char* pUser) {
int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
......@@ -4103,8 +4103,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTaskInfo);
return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
......@@ -4167,7 +4166,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pUser);
if (ops[i] == NULL) {
return NULL;
}
......@@ -4600,7 +4599,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
(*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
(*pTaskInfo)->pRoot =
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList);
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code;
......
......@@ -1067,9 +1067,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
// TODO refactor @liao
taosArrayDestroy(block.pDataBlock);
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
pOperator->status = OP_EXEC_DONE;
......@@ -1574,6 +1572,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
while (1) {
int64_t startTs = taosGetTimestampUs();
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
strcpy(pInfo->req.user, pInfo->pUser);
if (pInfo->showRewrite) {
char dbName[TSDB_DB_NAME_LEN] = {0};
......@@ -1706,7 +1705,7 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT
}
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
SExecTaskInfo* pTaskInfo) {
const char* pUser, SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -1719,13 +1718,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t num = 0;
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
pInfo->accountId = pScanPhyNode->accountId;
pInfo->accountId = pScanPhyNode->accountId;
pInfo->pUser = taosMemoryStrDup((void*) pUser);
pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = pResBlock;
pInfo->pCondition = pScanNode->node.pConditions;
pInfo->scanCols = colList;
pInfo->pRes = pResBlock;
pInfo->pCondition = pScanNode->node.pConditions;
pInfo->scanCols = colList;
initResultSizeInfo(pOperator, 4096);
......@@ -1741,13 +1741,13 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->readHandle = *(SReadHandle*)readHandle;
}
pOperator->name = "SysTableScanOperator";
pOperator->name = "SysTableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pResBlock->pDataBlock);
pOperator->pTaskInfo = pTaskInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
......
......@@ -68,7 +68,7 @@
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim
#./test.sh -f tsim/mnode/basic5.sim
# ---- show
./test.sh -f tsim/show/basic.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册