From c4306958c788d5b828336cfee34387e16e33ded2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 27 Nov 2021 17:23:57 +0800 Subject: [PATCH] TD-11265 fix transport in dnode-mgmt --- include/dnode/mnode/mnode.h | 12 +++---- source/dnode/mgmt/impl/src/dndDnode.c | 3 ++ source/dnode/mgmt/impl/src/dndMnode.c | 38 ++++++++++++++--------- source/dnode/mgmt/impl/src/dndTransport.c | 6 ++-- source/dnode/mnode/impl/src/mnode.c | 8 ++--- 5 files changed, 37 insertions(+), 30 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 824eb24191..2ffbd395e8 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -137,38 +137,34 @@ void mnodeSendRsp(SMnodeMsg *pMsg, int32_t code); /** * @brief Process the read request * - * @param pMnode The mnode object * @param pMsg The request msg * @return int32_t 0 for success, -1 for failure */ -void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg); +void mnodeProcessReadMsg(SMnodeMsg *pMsg); /** * @brief Process the write request * - * @param pMnode The mnode object * @param pMsg The request msg * @return int32_t 0 for success, -1 for failure */ -void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg); +void mnodeProcessWriteMsg(SMnodeMsg *pMsg); /** * @brief Process the sync request * - * @param pMnode The mnode object * @param pMsg The request msg * @return int32_t 0 for success, -1 for failure */ -void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg); +void mnodeProcessSyncMsg(SMnodeMsg *pMsg); /** * @brief Process the apply request * - * @param pMnode The mnode object * @param pMsg The request msg * @return int32_t 0 for success, -1 for failure */ -void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg); +void mnodeProcessApplyMsg(SMnodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 0f47c5b409..9e70bf1ae9 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -121,12 +121,14 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) { } pMgmt->mnodeEpSet.inUse = 0; + pMgmt->mnodeEpSet.numOfEps = 0; int32_t mIndex = 0; for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; if (!pDnodeEp->isMnode) continue; if (mIndex >= TSDB_MAX_REPLICA) continue; + pMgmt->mnodeEpSet.numOfEps++; strcpy(pMgmt->mnodeEpSet.fqdn[mIndex], pDnodeEp->fqdn); pMgmt->mnodeEpSet.port[mIndex] = pDnodeEp->port; mIndex++; @@ -279,6 +281,7 @@ PRASE_DNODE_OVER: if (pMgmt->dnodeEps == NULL) { pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); pMgmt->dnodeEps->num = 1; + pMgmt->dnodeEps->eps[0].isMnode = 1; pMgmt->dnodeEps->eps[0].port = pDnode->opt.serverPort; tstrncpy(pMgmt->dnodeEps->eps[0].fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); } diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index fe3accdd84..d42891f1e4 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -80,7 +80,9 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) { } taosRUnLockLatch(&pMgmt->latch); - dTrace("acquire mnode, refCount:%d", refCount); + if (pMnode != NULL) { + dTrace("acquire mnode, refCount:%d", refCount); + } return pMnode; } @@ -94,7 +96,9 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { } taosRUnLockLatch(&pMgmt->latch); - dTrace("release mnode, refCount:%d", refCount); + if (pMnode != NULL) { + dTrace("release mnode, refCount:%d", refCount); + } } static int32_t dndReadMnodeFile(SDnode *pDnode) { @@ -550,7 +554,7 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { - mnodeProcessReadMsg(pMnode, pMsg); + mnodeProcessReadMsg(pMsg); dndReleaseMnode(pDnode, pMnode); } else { mnodeSendRsp(pMsg, terrno); @@ -564,7 +568,7 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { - mnodeProcessWriteMsg(pMnode, pMsg); + mnodeProcessWriteMsg(pMsg); dndReleaseMnode(pDnode, pMnode); } else { mnodeSendRsp(pMsg, terrno); @@ -578,7 +582,7 @@ static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { - mnodeProcessApplyMsg(pMnode, pMsg); + mnodeProcessApplyMsg(pMsg); dndReleaseMnode(pDnode, pMnode); } else { mnodeSendRsp(pMsg, terrno); @@ -592,7 +596,7 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { - mnodeProcessSyncMsg(pMnode, pMsg); + mnodeProcessSyncMsg(pMsg); dndReleaseMnode(pDnode, pMnode); } else { mnodeSendRsp(pMsg, terrno); @@ -683,7 +687,7 @@ static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, NULL, (FProcessItem)dndProcessMnodeMgmtQueue); + pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue); if (pMgmt->pMgmtQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -708,18 +712,19 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { return -1; } + dDebug("mnode mgmt worker is initialized"); return 0; } static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->mgmtPool); - dDebug("mnode mgmt worker is stopped"); + dDebug("mnode mgmt worker is closed"); } static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, NULL, (FProcessItem)dndProcessMnodeReadQueue); + pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, pDnode, (FProcessItem)dndProcessMnodeReadQueue); if (pMgmt->pReadQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -745,18 +750,19 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { return -1; } + dDebug("mnode read worker is initialized"); return 0; } static void dndCleanupMnodeReadWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->readPool); - dDebug("mnode read worker is stopped"); + dDebug("mnode read worker is closed"); } static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, NULL, (FProcessItem)dndProcessMnodeWriteQueue); + pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeWriteQueue); if (pMgmt->pWriteQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -773,7 +779,7 @@ static void dndFreeMnodeWriteQueue(SDnode *pDnode) { static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, NULL, (FProcessItem)dndProcessMnodeApplyQueue); + pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeApplyQueue); if (pMgmt->pApplyQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -799,18 +805,19 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { return -1; } + dDebug("mnode write worker is initialized"); return 0; } static void dndCleanupMnodeWriteWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->writePool); - dDebug("mnode write worker is stopped"); + dDebug("mnode write worker is closed"); } static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, NULL, (FProcessItem)dndProcessMnodeSyncQueue); + pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, pDnode, (FProcessItem)dndProcessMnodeSyncQueue); if (pMgmt->pSyncQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -836,13 +843,14 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { return -1; } + dDebug("mnode sync worker is initialized"); return 0; } static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->syncPool); - dDebug("mnode sync worker is stopped"); + dDebug("mnode sync worker is closed"); } int32_t dndInitMnode(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 0364c4db38..eee2e36c0f 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -261,12 +261,12 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SDnode *pDnode = parent; if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) { - dTrace("get internal auth success"); + // dTrace("get internal auth success"); return 0; } if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) == 0) { - dTrace("get auth from internal mnode"); + // dTrace("get auth from internal mnode"); return 0; } @@ -275,7 +275,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char return -1; } - dDebug("user:%s, send auth msg to other mnodes", user); + // dDebug("user:%s, send auth msg to other mnodes", user); SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); tstrncpy(pMsg->user, user, TSDB_USER_LEN); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 8fc5de588f..128a834729 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -234,13 +234,13 @@ void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) { } } -void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } +void mnodeProcessReadMsg(SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } -void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } +void mnodeProcessWriteMsg(SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } -void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } +void mnodeProcessSyncMsg(SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } -void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg) {} +void mnodeProcessApplyMsg(SMnodeMsg *pMsg) {} #if 0 -- GitLab