diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index 400215472d2738b256d037f0eb33562ba4c23a64..dec4f5ef593d8a5bbf962f37ca221562f25a07c9 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -145,7 +145,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed", pPeerMsg->ahandle, taosMsg[pPeerMsg->msgType]); + dTrace("%p, msg:%s will be processed in mpeer queue", pPeerMsg->ahandle, taosMsg[pPeerMsg->msgType]); int32_t code = mnodeProcessPeerReq(pPeerMsg); dnodeSendRpcMnodePeerRsp(pPeerMsg, code); taosFreeQitem(pPeerMsg); diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 64375a3d7b8ccb207337c78360eac4ee5986ab3e..2ab5f48a9ab50af13773cc72c3150a63c23c7630 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -150,7 +150,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed", pReadMsg->ahandle, taosMsg[pReadMsg->msgType]); + dTrace("%p, msg:%s will be processed in mread queue", pReadMsg->ahandle, taosMsg[pReadMsg->msgType]); int32_t code = mnodeProcessRead(pReadMsg); dnodeSendRpcMnodeReadRsp(pReadMsg, code); taosFreeQitem(pReadMsg); diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 56022b4bf62d9bb3b6f5b73ce7f32f60d3d55fab..89c44d829b46a756d66e920780d825906c288562 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -148,7 +148,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed", pWriteMsg->ahandle, taosMsg[pWriteMsg->msgType]); + dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->ahandle, taosMsg[pWriteMsg->msgType]); int32_t code = mnodeProcessWrite(pWriteMsg); dnodeSendRpcMnodeWriteRsp(pWriteMsg, code); taosFreeQitem(pWriteMsg); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 54108af4b9392fb48563e1e5f0163f8fddcfcbaa..ff4cc7f81fd73881395c8cddec04e45f628a33f9 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -51,10 +51,16 @@ void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); -void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); +int32_t dnodeAllocateMnodeWqueue(); +void dnodeFreeMnodeWqueue(); +int32_t dnodeAllocateMnodeRqueue(); +void dnodeFreeMnodeRqueue(); +int32_t dnodeAllocateMnodePqueue(); +void dnodeFreeMnodePqueue(); + void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code); void dnodeReprocessMnodeWriteMsg(void *pMsg); void dnodeDelayReprocessMnodeWriteMsg(void *pMsg); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 57ce07dbcffa04d7f7746c56dbd6d41a48ec23d6..5872081d67bbc2b57c068e7bbb2b8f1ff9a5cea1 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -314,7 +314,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { if (pStatus->dnodeId == 0) { mTrace("dnode:%d %s, first access", pDnode->dnodeId, pDnode->dnodeEp); } else { - //mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); + mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); } int32_t openVnodes = htons(pStatus->openVnodes); diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index 6e3b3d24e97e72c6ea88c5a6eda8f5edf74edb4e..f1be6a8a87d297ea2a3433940867cf7a55245506 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -53,6 +53,10 @@ int32_t mnodeStartSystem() { mkdir(tsMnodeDir, 0755); } + dnodeAllocateMnodeWqueue(); + dnodeAllocateMnodeRqueue(); + dnodeAllocateMnodePqueue(); + if (mnodeInitAccts() < 0) { mError("failed to init accts"); return -1; @@ -125,6 +129,9 @@ void mnodeCleanupSystem() { mPrint("starting to clean up mgmt"); tsMgmtIsRunning = false; + dnodeFreeMnodeWqueue(); + dnodeFreeMnodeRqueue(); + dnodeFreeMnodePqueue(); mnodeCleanupTimer(); mnodeCleanUpShow(); grantCleanUp(); @@ -152,7 +159,7 @@ void mgmtStopSystem() { } static void mnodeInitTimer() { - if (tsMnodeTmr != NULL) { + if (tsMnodeTmr == NULL) { tsMnodeTmr = taosTmrInit((tsMaxShellConns)*3, 200, 3600000, "MND"); } } diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index e17c52a0b1ce13266a1d384ed885353e285044fc..8acd12dce305c10c28a108063ea8eff193edfdc8 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -48,7 +48,7 @@ void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { if (pMsg->pCont == NULL) { - mError("msg:%s content is null", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mpeer queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_INVALID_MSG_LEN; } @@ -59,7 +59,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("msg:%s will be redireced, inUse:%d", taosMsg[pMsg->msgType], ipSet->inUse); + mTrace("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); } @@ -68,7 +68,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { } if (tsMnodeProcessPeerMsgFp[pMsg->msgType] == NULL) { - mError("msg:%s not processed, no handle exist", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mpeer queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_MSG_NOT_PROCESSED; } diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index 32790af03f348cd1e93610173f0ae7c35acc377c..172a27a52f759c7469c216e28851fce3aaf0f08b 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -43,7 +43,7 @@ void mnodeAddReadMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *pMsg)) { int32_t mnodeProcessRead(SMnodeMsg *pMsg) { if (pMsg->pCont == NULL) { - mError("msg:%s content is null", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mread queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_INVALID_MSG_LEN; } @@ -54,7 +54,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("msg:%s will be redireced, inUse:%d", taosMsg[pMsg->msgType], ipSet->inUse); + mTrace("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); } @@ -63,13 +63,14 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { } if (tsMnodeProcessReadMsgFp[pMsg->msgType] == NULL) { - mError("msg:%s not processed, no handle exist", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mread queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_MSG_NOT_PROCESSED; } - if (!mnodeInitMsg(pMsg)) { - mError("msg:%s not processed, reason:%s", taosMsg[pMsg->msgType], tstrerror(terrno)); - return terrno; + int32_t code = mnodeInitMsg(pMsg); + if (code != TSDB_CODE_SUCCESS) { + mError("%p, msg:%s in mread queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code)); + return code; } return (*tsMnodeProcessReadMsgFp[pMsg->msgType])(pMsg); diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 37bcd075a52c98b8aaffbd3fab3dfd49dbfa6076..2a42ad869e3013c56d2922dc4a69421236d5276e 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -131,6 +131,9 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); pShow = mnodeSaveQhandle(pShow, showObjSize); + if (pShow == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } pShowRsp->qhandle = htobe64((uint64_t) pShow); mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index 33a5399fe26d3bc4567f8b02242c867835f34b62..1741d04fc6703c4466f9fcb6440f2fd62fae41be 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -42,7 +42,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg)) int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { if (pMsg->pCont == NULL) { - mError("msg:%s content is null", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mwrite queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_INVALID_MSG_LEN; } @@ -53,7 +53,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("msg:%s will be redireced, inUse:%d", taosMsg[pMsg->msgType], ipSet->inUse); + mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); } @@ -62,17 +62,18 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { } if (tsMnodeProcessWriteMsgFp[pMsg->msgType] == NULL) { - mError("msg:%s not processed, no handle exist", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mwrite queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_MSG_NOT_PROCESSED; } - if (!mnodeInitMsg(pMsg)) { - mError("msg:%s not processed, reason:%s", taosMsg[pMsg->msgType], tstrerror(terrno)); - return terrno; + int32_t code = mnodeInitMsg(pMsg); + if (code != TSDB_CODE_SUCCESS) { + mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code)); + return code; } if (!pMsg->pUser->writeAuth) { - mError("%p, msg:%s not processed, no rights", taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->ahandle, taosMsg[pMsg->msgType]); return TSDB_CODE_NO_RIGHTS; }