From 59d8eda4c82659bea91bd262d15e29e97e8aa2d9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 1 Dec 2021 18:59:38 +0800 Subject: [PATCH] TD-10431 fix invalid write errors --- source/dnode/mgmt/impl/src/dndMnode.c | 2 +- source/dnode/mgmt/impl/src/dndTransport.c | 19 +++++++------- source/dnode/mgmt/impl/src/dndVnodes.c | 10 +++++++- source/dnode/mnode/impl/src/mnode.c | 31 ++++++++++++++--------- source/dnode/mnode/sdb/src/sdb.c | 30 +++++++++++++++++----- source/dnode/mnode/sdb/src/sdbFile.c | 7 +++-- 6 files changed, 68 insertions(+), 31 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 1f02a3799b..c261d1a322 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -289,7 +289,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) { taosWLockLatch(&pMgmt->latch); pMgmt->deployed = 0; - pMgmt->pMnode = NULL; taosWUnLockLatch(&pMgmt->latch); while (pMgmt->refCount > 1) taosMsleep(10); @@ -920,6 +919,7 @@ void dndCleanupMnode(SDnode *pDnode) { dndStopMnodeWorker(pDnode); dndCleanupMnodeMgmtWorker(pDnode); tfree(pMgmt->file); + mndClose(pMgmt->pMnode); dInfo("dnode-mnode is cleaned up"); } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index eee2e36c0f..7e2bc69ea8 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -131,17 +131,18 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (dndGetStat(pDnode) == DND_STAT_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; - dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]); + dTrace("RPC %p, rsp:%s app:%p is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType], pMsg->ahandle); rpcFreeCont(pMsg->pCont); return; } DndMsgFp fp = pMgmt->msgFp[msgType]; if (fp != NULL) { - dTrace("RPC %p, rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code)); + dTrace("RPC %p, rsp:%s app:%p will be processed, result:%s", pMsg->handle, taosMsg[msgType], pMsg->ahandle, + tstrerror(pMsg->code)); (*fp)(pDnode, pMsg, pEpSet); } else { - dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]); + dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle); rpcFreeCont(pMsg->pCont); } } @@ -187,19 +188,19 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { int32_t msgType = pMsg->msgType; if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { - dTrace("RPC %p, network test req will be processed", pMsg->handle); + dTrace("RPC %p, network test req, app:%p will be processed", pMsg->handle, pMsg->ahandle); dndProcessDnodeReq(pDnode, pMsg, pEpSet); return; } if (dndGetStat(pDnode) == DND_STAT_STOPPED) { - dError("RPC %p, req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]); + dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, taosMsg[msgType], pMsg->ahandle); SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_EXITING}; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); return; } else if (dndGetStat(pDnode) != DND_STAT_RUNNING) { - dError("RPC %p, req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); + dError("RPC %p, req:%s app:%p is ignored since dnode not running", pMsg->handle, taosMsg[msgType], pMsg->ahandle); SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY}; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); @@ -207,7 +208,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { } if (pMsg->pCont == NULL) { - dTrace("RPC %p, req:%s not processed since content is null", pMsg->handle, taosMsg[msgType]); + dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, taosMsg[msgType], pMsg->ahandle); SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN}; rpcSendResponse(&rspMsg); return; @@ -215,10 +216,10 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { DndMsgFp fp = pMgmt->msgFp[msgType]; if (fp != NULL) { - dTrace("RPC %p, req:%s will be processed", pMsg->handle, taosMsg[msgType]); + dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle); (*fp)(pDnode, pMsg, pEpSet); } else { - dError("RPC %p, req:%s is not processed", pMsg->handle, taosMsg[msgType]); + dError("RPC %p, req:%s app:%p is not processed since no handle", pMsg->handle, taosMsg[msgType], pMsg->ahandle); SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index fd66695e32..34d37d2320 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -895,6 +895,7 @@ static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) { return -1; } + dDebug("vnode mgmt worker is initialized"); return 0; } @@ -903,6 +904,7 @@ static void dndCleanupVnodeMgmtWorker(SDnode *pDnode) { tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ); tWorkerCleanup(&pMgmt->mgmtPool); pMgmt->pMgmtQ = NULL; + dDebug("vnode mgmt worker is closed"); } static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { @@ -963,6 +965,7 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) { return -1; } + dDebug("vnode read worker is initialized"); return 0; } @@ -970,6 +973,7 @@ static void dndCleanupVnodeReadWorker(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tWorkerCleanup(&pMgmt->fetchPool); tWorkerCleanup(&pMgmt->queryPool); + dDebug("vnode close worker is initialized"); } static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) { @@ -1016,12 +1020,14 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { return -1; } + dDebug("vnode write worker is initialized"); return 0; } static void dndCleanupVnodeWriteWorker(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tMWorkerCleanup(&pMgmt->writePool); + dDebug("vnode write worker is closed"); } static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { @@ -1046,7 +1052,7 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { if (maxThreads < 1) maxThreads = 1; SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SMWorkerPool *pPool = &pMgmt->writePool; + SMWorkerPool *pPool = &pMgmt->syncPool; pPool->name = "vnode-sync"; pPool->max = maxThreads; if (tMWorkerInit(pPool) != 0) { @@ -1054,12 +1060,14 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { return -1; } + dDebug("vnode sync worker is initialized"); return 0; } static void dndCleanupVnodeSyncWorker(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tMWorkerCleanup(&pMgmt->syncPool); + dDebug("vnode sync worker is closed"); } int32_t dndInitVnodes(SDnode *pDnode) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index f1b46098d7..84acabccb5 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -178,11 +178,11 @@ static void mndCleanupSteps(SMnode *pMnode, int32_t pos) { if (pMnode->pSteps == NULL) return; if (pos == -1) { - pos = taosArrayGetSize(pMnode->pSteps); + pos = taosArrayGetSize(pMnode->pSteps) - 1; } for (int32_t s = pos; s >= 0; s--) { - SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos); + SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s); mDebug("step:%s will cleanup", pStep->name); if (pStep->cleanupFp != NULL) { (*pStep->cleanupFp)(pMnode); @@ -267,7 +267,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { } int32_t code = mnodeCreateDir(pMnode, path); - if (mnodeCreateDir(pMnode, path) != 0) { + if (code != 0) { mError("failed to open mnode since %s", tstrerror(code)); mndClose(pMnode); terrno = code; @@ -306,6 +306,10 @@ void mndClose(SMnode *pMnode) { if (pMnode != NULL) { mDebug("start to close mnode"); mndCleanupSteps(pMnode, -1); + if (pMnode->pSteps != NULL) { + taosArrayDestroy(pMnode->pSteps); + pMnode->pSteps = NULL; + } tfree(pMnode->path); tfree(pMnode->charset); tfree(pMnode->locale); @@ -354,7 +358,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { mndCleanupMsg(pMsg); terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - mError("failed to create msg since %s",terrstr()); + mError("failed to create msg since %s", terrstr()); return NULL; } memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); @@ -372,7 +376,10 @@ void mndCleanupMsg(SMnodeMsg *pMsg) { mTrace("msg:%p, is destroyed", pMsg); } -void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {} +void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { + SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; + rpcSendResponse(&rpcRsp); +} static void mndProcessRpcMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; @@ -381,32 +388,34 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { void *ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType % 2 == 1); - mTrace("msg:%p, type:%s will be processed", pMsg, taosMsg[msgType]); + mTrace("msg:%p, app:%p will be processed", pMsg, ahandle); if (isReq && !mndIsMaster(pMnode)) { code = TSDB_CODE_APP_NOT_READY; - mDebug("msg:%p, failed to process since %s", pMsg, terrstr()); + mDebug("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); goto PROCESS_RPC_END; } if (isReq && pMsg->rpcMsg.pCont == NULL) { code = TSDB_CODE_MND_INVALID_MSG_LEN; - mError("msg:%p, failed to process since %s", pMsg, terrstr()); + mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); goto PROCESS_RPC_END; } MndMsgFp fp = pMnode->msgFp[msgType]; if (fp == NULL) { code = TSDB_CODE_MSG_NOT_PROCESSED; - mError("msg:%p, failed to process since not handle", pMsg); + mError("msg:%p, app:%p failed to process since not handle", pMsg, ahandle); goto PROCESS_RPC_END; } code = (*fp)(pMnode, pMsg); if (code != 0) { code = terrno; - mError("msg:%p, failed to process since %s", pMsg, terrstr()); + mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); goto PROCESS_RPC_END; + } else { + mTrace("msg:%p, app:%p is processed", pMsg, ahandle); } PROCESS_RPC_END: @@ -421,8 +430,6 @@ PROCESS_RPC_END: rpcSendResponse(&rpcRsp); } } - - mndCleanupMsg(pMsg); } void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) { diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 1ceb3862ee..48c911a93b 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -40,7 +40,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { return NULL; } - for (int32_t i = 0; i < SDB_MAX; ++i) { + for (ESdbType i = 0; i < SDB_MAX; ++i) { taosInitRWLatch(&pSdb->locks[i]); } @@ -69,12 +69,30 @@ void sdbCleanup(SSdb *pSdb) { tfree(pSdb->tmpDir); } - for (int32_t i = 0; i < SDB_MAX; ++i) { + for (ESdbType i = 0; i < SDB_MAX; ++i) { SHashObj *hash = pSdb->hashObjs[i]; - if (hash != NULL) { - taosHashClear(hash); - taosHashCleanup(hash); + if (hash == NULL) continue; + + SdbDeleteFp deleteFp = pSdb->deleteFps[i]; + SSdbRow **ppRow = taosHashIterate(hash, ppRow); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow == NULL) continue; + + if (deleteFp != NULL) { + (*deleteFp)(pSdb, pRow->pObj); + } + sdbFreeRow(pRow); + ppRow = taosHashIterate(hash, ppRow); } + } + + for (ESdbType i = 0; i < SDB_MAX; ++i) { + SHashObj *hash = pSdb->hashObjs[i]; + if (hash == NULL) continue; + + taosHashClear(hash); + taosHashCleanup(hash); pSdb->hashObjs[i] = NULL; } @@ -91,7 +109,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { pSdb->encodeFps[sdb] = table.encodeFp; pSdb->decodeFps[sdb] = table.decodeFp; - for (int32_t i = 0; i < SDB_MAX; ++i) { + for (ESdbType i = 0; i < SDB_MAX; ++i) { int32_t type; if (pSdb->keyTypes[i] == SDB_KEY_INT32) { type = TSDB_DATA_TYPE_INT; diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index b285675b85..6f88f08b2c 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -42,7 +42,7 @@ static int32_t sdbCreateDir(SSdb *pSdb) { static int32_t sdbRunDeployFp(SSdb *pSdb) { mDebug("start to deploy sdb"); - for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { + for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { SdbDeployFp fp = pSdb->deployFps[i]; if (fp == NULL) continue; @@ -150,7 +150,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { return -1; } - for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { + for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { SdbEncodeFp encodeFp = pSdb->encodeFps[i]; if (encodeFp == NULL) continue; @@ -173,6 +173,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { if (taosWriteFile(fd, pRaw, writeLen) != writeLen) { code = TAOS_SYSTEM_ERROR(terrno); taosHashCancelIterate(hash, ppRow); + free(pRaw); break; } @@ -180,6 +181,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) { code = TAOS_SYSTEM_ERROR(terrno); taosHashCancelIterate(hash, ppRow); + free(pRaw); break; } } else { @@ -188,6 +190,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { break; } + free(pRaw); ppRow = taosHashIterate(hash, ppRow); } taosWUnLockLatch(pLock); -- GitLab