From 5a601c83f137c982f0a265d6558acfa4f6180729 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 May 2020 01:35:26 +0000 Subject: [PATCH] [TD-335] optimize mnode queue --- src/dnode/src/dnodeMPeer.c | 2 +- src/dnode/src/dnodeMRead.c | 2 +- src/dnode/src/dnodeMWrite.c | 2 +- src/inc/dnode.h | 8 +++++++- src/mnode/src/mnodeDnode.c | 2 +- src/mnode/src/mnodeMain.c | 9 ++++++++- src/mnode/src/mnodePeer.c | 6 +++--- src/mnode/src/mnodeRead.c | 13 +++++++------ src/mnode/src/mnodeShow.c | 3 +++ src/mnode/src/mnodeWrite.c | 15 ++++++++------- 10 files changed, 40 insertions(+), 22 deletions(-) diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index 400215472d..dec4f5ef59 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -145,7 +145,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed", pPeerMsg->ahandle, taosMsg[pPeerMsg->msgType]); + dTrace("%p, msg:%s will be processed in mpeer queue", pPeerMsg->ahandle, taosMsg[pPeerMsg->msgType]); int32_t code = mnodeProcessPeerReq(pPeerMsg); dnodeSendRpcMnodePeerRsp(pPeerMsg, code); taosFreeQitem(pPeerMsg); diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 64375a3d7b..2ab5f48a9a 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -150,7 +150,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed", pReadMsg->ahandle, taosMsg[pReadMsg->msgType]); + dTrace("%p, msg:%s will be processed in mread queue", pReadMsg->ahandle, taosMsg[pReadMsg->msgType]); int32_t code = mnodeProcessRead(pReadMsg); dnodeSendRpcMnodeReadRsp(pReadMsg, code); taosFreeQitem(pReadMsg); diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 56022b4bf6..89c44d829b 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -148,7 +148,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed", pWriteMsg->ahandle, taosMsg[pWriteMsg->msgType]); + dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->ahandle, taosMsg[pWriteMsg->msgType]); int32_t code = mnodeProcessWrite(pWriteMsg); dnodeSendRpcMnodeWriteRsp(pWriteMsg, code); taosFreeQitem(pWriteMsg); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 54108af4b9..ff4cc7f81f 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -51,10 +51,16 @@ void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); -void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); +int32_t dnodeAllocateMnodeWqueue(); +void dnodeFreeMnodeWqueue(); +int32_t dnodeAllocateMnodeRqueue(); +void dnodeFreeMnodeRqueue(); +int32_t dnodeAllocateMnodePqueue(); +void dnodeFreeMnodePqueue(); + void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code); void dnodeReprocessMnodeWriteMsg(void *pMsg); void dnodeDelayReprocessMnodeWriteMsg(void *pMsg); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 57ce07dbcf..5872081d67 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -314,7 +314,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { if (pStatus->dnodeId == 0) { mTrace("dnode:%d %s, first access", pDnode->dnodeId, pDnode->dnodeEp); } else { - //mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); + mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); } int32_t openVnodes = htons(pStatus->openVnodes); diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index 6e3b3d24e9..f1be6a8a87 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -53,6 +53,10 @@ int32_t mnodeStartSystem() { mkdir(tsMnodeDir, 0755); } + dnodeAllocateMnodeWqueue(); + dnodeAllocateMnodeRqueue(); + dnodeAllocateMnodePqueue(); + if (mnodeInitAccts() < 0) { mError("failed to init accts"); return -1; @@ -125,6 +129,9 @@ void mnodeCleanupSystem() { mPrint("starting to clean up mgmt"); tsMgmtIsRunning = false; + dnodeFreeMnodeWqueue(); + dnodeFreeMnodeRqueue(); + dnodeFreeMnodePqueue(); mnodeCleanupTimer(); mnodeCleanUpShow(); grantCleanUp(); @@ -152,7 +159,7 @@ void mgmtStopSystem() { } static void mnodeInitTimer() { - if (tsMnodeTmr != NULL) { + if (tsMnodeTmr == NULL) { tsMnodeTmr = taosTmrInit((tsMaxShellConns)*3, 200, 3600000, "MND"); } } diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index e17c52a0b1..8acd12dce3 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -48,7 +48,7 @@ void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { if (pMsg->pCont == NULL) { - mError("msg:%s content is null", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mpeer queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_INVALID_MSG_LEN; } @@ -59,7 +59,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("msg:%s will be redireced, inUse:%d", taosMsg[pMsg->msgType], ipSet->inUse); + mTrace("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); } @@ -68,7 +68,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { } if (tsMnodeProcessPeerMsgFp[pMsg->msgType] == NULL) { - mError("msg:%s not processed, no handle exist", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mpeer queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_MSG_NOT_PROCESSED; } diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index 32790af03f..172a27a52f 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -43,7 +43,7 @@ void mnodeAddReadMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *pMsg)) { int32_t mnodeProcessRead(SMnodeMsg *pMsg) { if (pMsg->pCont == NULL) { - mError("msg:%s content is null", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mread queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_INVALID_MSG_LEN; } @@ -54,7 +54,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("msg:%s will be redireced, inUse:%d", taosMsg[pMsg->msgType], ipSet->inUse); + mTrace("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); } @@ -63,13 +63,14 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { } if (tsMnodeProcessReadMsgFp[pMsg->msgType] == NULL) { - mError("msg:%s not processed, no handle exist", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mread queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_MSG_NOT_PROCESSED; } - if (!mnodeInitMsg(pMsg)) { - mError("msg:%s not processed, reason:%s", taosMsg[pMsg->msgType], tstrerror(terrno)); - return terrno; + int32_t code = mnodeInitMsg(pMsg); + if (code != TSDB_CODE_SUCCESS) { + mError("%p, msg:%s in mread queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code)); + return code; } return (*tsMnodeProcessReadMsgFp[pMsg->msgType])(pMsg); diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 37bcd075a5..2a42ad869e 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -131,6 +131,9 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); pShow = mnodeSaveQhandle(pShow, showObjSize); + if (pShow == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } pShowRsp->qhandle = htobe64((uint64_t) pShow); mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index 33a5399fe2..1741d04fc6 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -42,7 +42,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg)) int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { if (pMsg->pCont == NULL) { - mError("msg:%s content is null", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mwrite queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_INVALID_MSG_LEN; } @@ -53,7 +53,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("msg:%s will be redireced, inUse:%d", taosMsg[pMsg->msgType], ipSet->inUse); + mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); } @@ -62,17 +62,18 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { } if (tsMnodeProcessWriteMsgFp[pMsg->msgType] == NULL) { - mError("msg:%s not processed, no handle exist", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mwrite queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_MSG_NOT_PROCESSED; } - if (!mnodeInitMsg(pMsg)) { - mError("msg:%s not processed, reason:%s", taosMsg[pMsg->msgType], tstrerror(terrno)); - return terrno; + int32_t code = mnodeInitMsg(pMsg); + if (code != TSDB_CODE_SUCCESS) { + mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code)); + return code; } if (!pMsg->pUser->writeAuth) { - mError("%p, msg:%s not processed, no rights", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_NO_RIGHTS; } -- GitLab