提交 5a601c83 编写于 作者: S Shengliang Guan

[TD-335] optimize mnode queue

上级 d1003fed
...@@ -145,7 +145,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) { ...@@ -145,7 +145,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) {
break; break;
} }
dTrace("%p, msg:%s will be processed", pPeerMsg->ahandle, taosMsg[pPeerMsg->msgType]); dTrace("%p, msg:%s will be processed in mpeer queue", pPeerMsg->ahandle, taosMsg[pPeerMsg->msgType]);
int32_t code = mnodeProcessPeerReq(pPeerMsg); int32_t code = mnodeProcessPeerReq(pPeerMsg);
dnodeSendRpcMnodePeerRsp(pPeerMsg, code); dnodeSendRpcMnodePeerRsp(pPeerMsg, code);
taosFreeQitem(pPeerMsg); taosFreeQitem(pPeerMsg);
......
...@@ -150,7 +150,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) { ...@@ -150,7 +150,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) {
break; break;
} }
dTrace("%p, msg:%s will be processed", pReadMsg->ahandle, taosMsg[pReadMsg->msgType]); dTrace("%p, msg:%s will be processed in mread queue", pReadMsg->ahandle, taosMsg[pReadMsg->msgType]);
int32_t code = mnodeProcessRead(pReadMsg); int32_t code = mnodeProcessRead(pReadMsg);
dnodeSendRpcMnodeReadRsp(pReadMsg, code); dnodeSendRpcMnodeReadRsp(pReadMsg, code);
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);
......
...@@ -148,7 +148,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) { ...@@ -148,7 +148,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) {
break; break;
} }
dTrace("%p, msg:%s will be processed", pWriteMsg->ahandle, taosMsg[pWriteMsg->msgType]); dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->ahandle, taosMsg[pWriteMsg->msgType]);
int32_t code = mnodeProcessWrite(pWriteMsg); int32_t code = mnodeProcessWrite(pWriteMsg);
dnodeSendRpcMnodeWriteRsp(pWriteMsg, code); dnodeSendRpcMnodeWriteRsp(pWriteMsg, code);
taosFreeQitem(pWriteMsg); taosFreeQitem(pWriteMsg);
......
...@@ -51,10 +51,16 @@ void * dnodeGetMnodeInfos(); ...@@ -51,10 +51,16 @@ void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
int32_t dnodeAllocateMnodeWqueue();
void dnodeFreeMnodeWqueue();
int32_t dnodeAllocateMnodeRqueue();
void dnodeFreeMnodeRqueue();
int32_t dnodeAllocateMnodePqueue();
void dnodeFreeMnodePqueue();
void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code); void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code);
void dnodeReprocessMnodeWriteMsg(void *pMsg); void dnodeReprocessMnodeWriteMsg(void *pMsg);
void dnodeDelayReprocessMnodeWriteMsg(void *pMsg); void dnodeDelayReprocessMnodeWriteMsg(void *pMsg);
......
...@@ -314,7 +314,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { ...@@ -314,7 +314,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mTrace("dnode:%d %s, first access", pDnode->dnodeId, pDnode->dnodeEp); mTrace("dnode:%d %s, first access", pDnode->dnodeId, pDnode->dnodeEp);
} else { } else {
//mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess);
} }
int32_t openVnodes = htons(pStatus->openVnodes); int32_t openVnodes = htons(pStatus->openVnodes);
......
...@@ -53,6 +53,10 @@ int32_t mnodeStartSystem() { ...@@ -53,6 +53,10 @@ int32_t mnodeStartSystem() {
mkdir(tsMnodeDir, 0755); mkdir(tsMnodeDir, 0755);
} }
dnodeAllocateMnodeWqueue();
dnodeAllocateMnodeRqueue();
dnodeAllocateMnodePqueue();
if (mnodeInitAccts() < 0) { if (mnodeInitAccts() < 0) {
mError("failed to init accts"); mError("failed to init accts");
return -1; return -1;
...@@ -125,6 +129,9 @@ void mnodeCleanupSystem() { ...@@ -125,6 +129,9 @@ void mnodeCleanupSystem() {
mPrint("starting to clean up mgmt"); mPrint("starting to clean up mgmt");
tsMgmtIsRunning = false; tsMgmtIsRunning = false;
dnodeFreeMnodeWqueue();
dnodeFreeMnodeRqueue();
dnodeFreeMnodePqueue();
mnodeCleanupTimer(); mnodeCleanupTimer();
mnodeCleanUpShow(); mnodeCleanUpShow();
grantCleanUp(); grantCleanUp();
...@@ -152,7 +159,7 @@ void mgmtStopSystem() { ...@@ -152,7 +159,7 @@ void mgmtStopSystem() {
} }
static void mnodeInitTimer() { static void mnodeInitTimer() {
if (tsMnodeTmr != NULL) { if (tsMnodeTmr == NULL) {
tsMnodeTmr = taosTmrInit((tsMaxShellConns)*3, 200, 3600000, "MND"); tsMnodeTmr = taosTmrInit((tsMaxShellConns)*3, 200, 3600000, "MND");
} }
} }
......
...@@ -48,7 +48,7 @@ void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { ...@@ -48,7 +48,7 @@ void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
if (pMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
mError("msg:%s content is null", taosMsg[pMsg->msgType]); mError("%p, msg:%s in mpeer queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]);
return TSDB_CODE_INVALID_MSG_LEN; return TSDB_CODE_INVALID_MSG_LEN;
} }
...@@ -59,7 +59,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -59,7 +59,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = ipSet; rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet); rpcRsp->len = sizeof(SRpcIpSet);
mTrace("msg:%s will be redireced, inUse:%d", taosMsg[pMsg->msgType], ipSet->inUse); mTrace("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) { for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
} }
...@@ -68,7 +68,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -68,7 +68,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
} }
if (tsMnodeProcessPeerMsgFp[pMsg->msgType] == NULL) { if (tsMnodeProcessPeerMsgFp[pMsg->msgType] == NULL) {
mError("msg:%s not processed, no handle exist", taosMsg[pMsg->msgType]); mError("%p, msg:%s in mpeer queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]);
return TSDB_CODE_MSG_NOT_PROCESSED; return TSDB_CODE_MSG_NOT_PROCESSED;
} }
......
...@@ -43,7 +43,7 @@ void mnodeAddReadMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *pMsg)) { ...@@ -43,7 +43,7 @@ void mnodeAddReadMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *pMsg)) {
int32_t mnodeProcessRead(SMnodeMsg *pMsg) { int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
if (pMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
mError("msg:%s content is null", taosMsg[pMsg->msgType]); mError("%p, msg:%s in mread queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]);
return TSDB_CODE_INVALID_MSG_LEN; return TSDB_CODE_INVALID_MSG_LEN;
} }
...@@ -54,7 +54,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -54,7 +54,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
rpcRsp->rsp = ipSet; rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet); rpcRsp->len = sizeof(SRpcIpSet);
mTrace("msg:%s will be redireced, inUse:%d", taosMsg[pMsg->msgType], ipSet->inUse); mTrace("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) { for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
} }
...@@ -63,13 +63,14 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -63,13 +63,14 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
} }
if (tsMnodeProcessReadMsgFp[pMsg->msgType] == NULL) { if (tsMnodeProcessReadMsgFp[pMsg->msgType] == NULL) {
mError("msg:%s not processed, no handle exist", taosMsg[pMsg->msgType]); mError("%p, msg:%s in mread queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]);
return TSDB_CODE_MSG_NOT_PROCESSED; return TSDB_CODE_MSG_NOT_PROCESSED;
} }
if (!mnodeInitMsg(pMsg)) { int32_t code = mnodeInitMsg(pMsg);
mError("msg:%s not processed, reason:%s", taosMsg[pMsg->msgType], tstrerror(terrno)); if (code != TSDB_CODE_SUCCESS) {
return terrno; mError("%p, msg:%s in mread queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code));
return code;
} }
return (*tsMnodeProcessReadMsgFp[pMsg->msgType])(pMsg); return (*tsMnodeProcessReadMsgFp[pMsg->msgType])(pMsg);
......
...@@ -131,6 +131,9 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { ...@@ -131,6 +131,9 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
pShow = mnodeSaveQhandle(pShow, showObjSize); pShow = mnodeSaveQhandle(pShow, showObjSize);
if (pShow == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
pShowRsp->qhandle = htobe64((uint64_t) pShow); pShowRsp->qhandle = htobe64((uint64_t) pShow);
mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type));
......
...@@ -42,7 +42,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg)) ...@@ -42,7 +42,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg))
int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
if (pMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
mError("msg:%s content is null", taosMsg[pMsg->msgType]); mError("%p, msg:%s in mwrite queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]);
return TSDB_CODE_INVALID_MSG_LEN; return TSDB_CODE_INVALID_MSG_LEN;
} }
...@@ -53,7 +53,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -53,7 +53,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = ipSet; rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet); rpcRsp->len = sizeof(SRpcIpSet);
mTrace("msg:%s will be redireced, inUse:%d", taosMsg[pMsg->msgType], ipSet->inUse); mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) { for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
} }
...@@ -62,17 +62,18 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -62,17 +62,18 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
} }
if (tsMnodeProcessWriteMsgFp[pMsg->msgType] == NULL) { if (tsMnodeProcessWriteMsgFp[pMsg->msgType] == NULL) {
mError("msg:%s not processed, no handle exist", taosMsg[pMsg->msgType]); mError("%p, msg:%s in mwrite queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]);
return TSDB_CODE_MSG_NOT_PROCESSED; return TSDB_CODE_MSG_NOT_PROCESSED;
} }
if (!mnodeInitMsg(pMsg)) { int32_t code = mnodeInitMsg(pMsg);
mError("msg:%s not processed, reason:%s", taosMsg[pMsg->msgType], tstrerror(terrno)); if (code != TSDB_CODE_SUCCESS) {
return terrno; mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code));
return code;
} }
if (!pMsg->pUser->writeAuth) { if (!pMsg->pUser->writeAuth) {
mError("%p, msg:%s not processed, no rights", taosMsg[pMsg->msgType]); mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->ahandle, taosMsg[pMsg->msgType]);
return TSDB_CODE_NO_RIGHTS; return TSDB_CODE_NO_RIGHTS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册