From 4315758ef31e58fbc9bd082025f9d398014ed5ae Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 30 Apr 2020 10:32:10 +0800 Subject: [PATCH] fix possible memroy link while process show message --- src/mnode/inc/mgmtProfile.h | 8 --- src/mnode/inc/mgmtShell.h | 8 +++ src/mnode/src/mgmtMain.c | 8 +-- src/mnode/src/mgmtProfile.c | 60 ---------------- src/mnode/src/mgmtShell.c | 135 +++++++++++++++++++++++++++++------- 5 files changed, 120 insertions(+), 99 deletions(-) diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 07ed3f0f13..f33ff9c3fa 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -24,14 +24,6 @@ extern "C" { int32_t mgmtInitProfile(); void mgmtCleanUpProfile(); -bool mgmtCheckQhandle(uint64_t qhandle); -void mgmtSaveQhandle(void *qhandle); -void mgmtFreeQhandle(void *qhandle); - -void * mgmtMallocQueuedMsg(SRpcMsg *rpcMsg); -void * mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg); -void mgmtFreeQueuedMsg(SQueuedMsg *pMsg); - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index 3941a0a87d..c3ae3e96e8 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -34,6 +34,14 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg); void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg); void mgmtSendSimpleResp(void *thandle, int32_t code); +bool mgmtCheckQhandle(uint64_t qhandle); +void *mgmtSaveQhandle(void *qhandle, int32_t size); +void mgmtFreeQhandle(void *qhandle, bool forceRemove); + +void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg); +void *mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg); +void mgmtFreeQueuedMsg(SQueuedMsg *pMsg); + #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index dd57a650c8..0f18c95539 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -36,7 +36,7 @@ #include "mgmtTable.h" #include "mgmtShell.h" -void *tsMgmtTmr = NULL; +extern void *tsMgmtTmr; static bool tsMgmtIsRunning = false; int32_t mgmtStartSystem() { @@ -51,12 +51,6 @@ int32_t mgmtStartSystem() { mkdir(tsMnodeDir, 0755); } - tsMgmtTmr = taosTmrInit((tsMaxShellConns) * 3, 200, 3600000, "MND"); - if (tsMgmtTmr == NULL) { - mError("failed to init timer"); - return -1; - } - if (mgmtInitAccts() < 0) { mError("failed to init accts"); return -1; diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index b52a43569a..754be9a1d0 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -561,17 +561,6 @@ int32_t mgmtKillConnection(char *qidstr, void *pConn) { return TSDB_CODE_INVALID_CONNECTION; } -bool mgmtCheckQhandle(uint64_t qhandle) { - return true; -} - -void mgmtSaveQhandle(void *qhandle) { - mTrace("qhandle:%p is allocated", qhandle); -} - -void mgmtFreeQhandle(void *qhandle) { - mTrace("qhandle:%p is freed", qhandle); -} int mgmtGetConns(SShowObj *pShow, void *pConn) { // SAcctObj * pAcct = pConn->pAcct; @@ -771,52 +760,3 @@ int32_t mgmtInitProfile() { void mgmtCleanUpProfile() { } - -void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) { - bool usePublicIp = false; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp); - if (pUser == NULL) { - return NULL; - } - - SQueuedMsg *pMsg = calloc(1, sizeof(SQueuedMsg)); - pMsg->thandle = rpcMsg->handle; - pMsg->msgType = rpcMsg->msgType; - pMsg->contLen = rpcMsg->contLen; - pMsg->pCont = rpcMsg->pCont; - pMsg->pUser = pUser; - pMsg->usePublicIp = usePublicIp; - - return pMsg; -} - -void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { - if (pMsg != NULL) { - rpcFreeCont(pMsg->pCont); - if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser); - if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb); - if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup); - if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable); - if (pMsg->pAcct) mgmtDecAcctRef(pMsg->pAcct); - if (pMsg->pDnode) mgmtDecDnodeRef(pMsg->pDnode); - free(pMsg); - } -} - -void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) { - SQueuedMsg *pDestMsg = calloc(1, sizeof(SQueuedMsg)); - - pDestMsg->thandle = pSrcMsg->thandle; - pDestMsg->msgType = pSrcMsg->msgType; - pDestMsg->pCont = pSrcMsg->pCont; - pDestMsg->contLen = pSrcMsg->contLen; - pDestMsg->retry = pSrcMsg->retry; - pDestMsg->maxRetry= pSrcMsg->maxRetry; - pDestMsg->pUser = pSrcMsg->pUser; - pDestMsg->usePublicIp = pSrcMsg->usePublicIp; - - pSrcMsg->pCont = NULL; - pSrcMsg->pUser = NULL; - - return pDestMsg; -} \ No newline at end of file diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 48066ea8a5..fdf2ea6953 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -23,6 +23,7 @@ #include "ttimer.h" #include "tgrant.h" #include "tglobal.h" +#include "tcache.h" #include "dnode.h" #include "mgmtDef.h" #include "mgmtLog.h" @@ -50,10 +51,11 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg); static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg); static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg); -extern void *tsMgmtTmr; +void *tsMgmtTmr; static void *tsMgmtShellRpc = NULL; static void *tsMgmtTranQhandle = NULL; static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0}; +static void *tsQhandleCache = NULL; static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; @@ -64,7 +66,9 @@ int32_t mgmtInitShell() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg); + tsMgmtTmr = taosTmrInit((tsMaxShellConns) * 3, 200, 3600000, "MND"); tsMgmtTranQhandle = taosInitScheduler(tsMaxShellConns, 1, "mnodeT"); + tsQhandleCache = taosCacheInit(tsMgmtTmr, 2); int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0; if (numOfThreads < 1) { @@ -102,6 +106,12 @@ void mgmtCleanUpShell() { tsMgmtShellRpc = NULL; mPrint("server connection to shell is closed"); } + + if (tsQhandleCache) { + taosCacheEmpty(tsQhandleCache); + taosCacheCleanup(tsQhandleCache); + tsQhandleCache = NULL; + } } void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SQueuedMsg *queuedMsg)) { @@ -233,14 +243,15 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { return; } - SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen)); + int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen); + SShowObj *pShow = (SShowObj *) calloc(1, showObjSize); pShow->signature = pShow; pShow->type = pShowMsg->type; pShow->payloadLen = htons(pShowMsg->payloadLen); strcpy(pShow->db, pShowMsg->db); memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); - mgmtSaveQhandle(pShow); + pShow = mgmtSaveQhandle(pShow, showObjSize); pShowRsp->qhandle = htobe64((uint64_t) pShow); mTrace("show:%p, type:%s, start to get meta", pShow, mgmtGetShowTypeStr(pShowMsg->type)); @@ -255,10 +266,10 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { rpcSendResponse(&rpcRsp); } else { mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mgmtGetShowTypeStr(pShowMsg->type), tstrerror(code)); - mgmtFreeQhandle(pShow); + mgmtFreeQhandle(pShow, false); SRpcMsg rpcRsp = { - .handle = pMsg->thandle, - .code = code + .handle = pMsg->thandle, + .code = code }; rpcSendResponse(&rpcRsp); } @@ -284,25 +295,19 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; mTrace("show:%p, type:%s, retrieve data", pShow, mgmtGetShowTypeStr(pShow->type)); - if (!mgmtCheckQhandle(pRetrieve->qhandle)) { - mError("pShow:%p, query memory is corrupted", pShow); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED); - return; - } else { - if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { - rowsToRead = pShow->numOfRows - pShow->numOfReads; - } + if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { + rowsToRead = pShow->numOfRows - pShow->numOfReads; + } - /* return no more than 100 meters in one round trip */ - if (rowsToRead > 100) rowsToRead = 100; + /* return no more than 100 meters in one round trip */ + if (rowsToRead > 100) rowsToRead = 100; - /* - * the actual number of table may be larger than the value of pShow->numOfRows, if a query is - * issued during a continuous create table operation. Therefore, rowToRead may be less than 0. - */ - if (rowsToRead < 0) rowsToRead = 0; - size = pShow->rowSize * rowsToRead; - } + /* + * the actual number of table may be larger than the value of pShow->numOfRows, if a query is + * issued during a continuous create table operation. Therefore, rowToRead may be less than 0. + */ + if (rowsToRead < 0) rowsToRead = 0; + size = pShow->rowSize * rowsToRead; size += 100; SRetrieveTableRsp *pRsp = rpcMallocCont(size); @@ -313,6 +318,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { if (rowsRead < 0) { // TSDB_CODE_ACTION_IN_PROGRESS; rpcFreeCont(pRsp); + mgmtFreeQhandle(pShow, false); return; } @@ -329,7 +335,9 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { rpcSendResponse(&rpcRsp); if (rowsToRead == 0) { - mgmtFreeQhandle(pShow); + mgmtFreeQhandle(pShow, true); + } else { + mgmtFreeQhandle(pShow, false); } } @@ -511,3 +519,82 @@ void mgmtSendSimpleResp(void *thandle, int32_t code) { }; rpcSendResponse(&rpcRsp); } + +bool mgmtCheckQhandle(uint64_t qhandle) { + void *pSaved = taosCacheAcquireByData(tsQhandleCache, (void *)qhandle); + if (pSaved == (void *)qhandle) { + mTrace("qhandle:%p is retrived", qhandle); + return true; + } else { + mTrace("qhandle:%p is already freed", qhandle); + return false; + } +} + +void* mgmtSaveQhandle(void *qhandle, int32_t size) { + if (tsQhandleCache != NULL) { + char key[24]; + sprintf(key, "show:%p", qhandle); + void *newQhandle = taosCachePut(tsQhandleCache, key, qhandle, size, 60); + free(qhandle); + + mTrace("qhandle:%p is saved", newQhandle); + return newQhandle; + } + + return NULL; +} + +void mgmtFreeQhandle(void *qhandle, bool forceRemove) { + mTrace("qhandle:%p is freed", qhandle); + taosCacheRelease(tsQhandleCache, &qhandle, forceRemove); +} + +void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) { + bool usePublicIp = false; + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp); + if (pUser == NULL) { + return NULL; + } + + SQueuedMsg *pMsg = calloc(1, sizeof(SQueuedMsg)); + pMsg->thandle = rpcMsg->handle; + pMsg->msgType = rpcMsg->msgType; + pMsg->contLen = rpcMsg->contLen; + pMsg->pCont = rpcMsg->pCont; + pMsg->pUser = pUser; + pMsg->usePublicIp = usePublicIp; + + return pMsg; +} + +void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { + if (pMsg != NULL) { + rpcFreeCont(pMsg->pCont); + if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser); + if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb); + if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup); + if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable); + if (pMsg->pAcct) mgmtDecAcctRef(pMsg->pAcct); + if (pMsg->pDnode) mgmtDecDnodeRef(pMsg->pDnode); + free(pMsg); + } +} + +void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) { + SQueuedMsg *pDestMsg = calloc(1, sizeof(SQueuedMsg)); + + pDestMsg->thandle = pSrcMsg->thandle; + pDestMsg->msgType = pSrcMsg->msgType; + pDestMsg->pCont = pSrcMsg->pCont; + pDestMsg->contLen = pSrcMsg->contLen; + pDestMsg->retry = pSrcMsg->retry; + pDestMsg->maxRetry= pSrcMsg->maxRetry; + pDestMsg->pUser = pSrcMsg->pUser; + pDestMsg->usePublicIp = pSrcMsg->usePublicIp; + + pSrcMsg->pCont = NULL; + pSrcMsg->pUser = NULL; + + return pDestMsg; +} \ No newline at end of file -- GitLab