提交 6abfd9fe 编写于 作者: S Shengliang Guan

refact transport

上级 745fa09b
...@@ -37,7 +37,6 @@ void dndClose(SDnode *pDnode); ...@@ -37,7 +37,6 @@ void dndClose(SDnode *pDnode);
void dndHandleEvent(SDnode *pDnode, EDndEvent event); void dndHandleEvent(SDnode *pDnode, EDndEvent event);
// dndMsg.c // dndMsg.c
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndFile.c // dndFile.c
......
...@@ -57,3 +57,63 @@ void dndCleanup() { ...@@ -57,3 +57,63 @@ void dndCleanup() {
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up"); dInfo("dnode env is cleaned up");
} }
const char *dndStatStr(EDndStatus status) {
switch (status) {
case DND_STAT_INIT:
return "init";
case DND_STAT_RUNNING:
return "running";
case DND_STAT_STOPPED:
return "stopped";
default:
return "UNKNOWN";
}
}
const char *dndNodeLogStr(ENodeType ntype) {
switch (ntype) {
case VNODES:
return "vnode";
case QNODE:
return "qnode";
case SNODE:
return "snode";
case MNODE:
return "mnode";
case BNODE:
return "bnode";
default:
return "taosd";
}
}
const char *dndNodeProcStr(ENodeType ntype) {
switch (ntype) {
case VNODES:
return "taosv";
case QNODE:
return "taosq";
case SNODE:
return "taoss";
case MNODE:
return "taosm";
case BNODE:
return "taosb";
default:
return "taosd";
}
}
const char *dndEventStr(EDndEvent ev) {
switch (ev) {
case DND_EVENT_START:
return "start";
case DND_EVENT_STOP:
return "stop";
case DND_EVENT_CHILD:
return "child";
default:
return "UNKNOWN";
}
}
\ No newline at end of file
...@@ -16,86 +16,6 @@ ...@@ -16,86 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
dndReleaseWrapper(pWrapper);
}
}
static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
return msgFp;
}
static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
return -1;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->clientIp = connInfo.clientIp;
pMsg->clientPort = connInfo.clientPort;
memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
return 0;
}
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t code = -1;
SNodeMsg *pMsg = NULL;
NodeMsgFp msgFp = NULL;
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
}
if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
if (pWrapper->procType == PROC_SINGLE) {
dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
code = (*msgFp)(pWrapper, pMsg);
} else if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is created and put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
pMsg->user);
code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ);
} else {
dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
pMsg->user);
ASSERT(1);
}
_OVER:
if (code == 0) {
if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is freed in parent process", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
} else {
dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
if (pRpc->msgType & 1U) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
tmsgSendRsp(&rsp);
}
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
dndReleaseWrapper(pWrapper);
}
static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) { static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) { if (pWrapper != NULL) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndInt.h"
const char *dndStatStr(EDndStatus status) {
switch (status) {
case DND_STAT_INIT:
return "init";
case DND_STAT_RUNNING:
return "running";
case DND_STAT_STOPPED:
return "stopped";
default:
return "UNKNOWN";
}
}
const char *dndNodeLogStr(ENodeType ntype) {
switch (ntype) {
case VNODES:
return "vnode";
case QNODE:
return "qnode";
case SNODE:
return "snode";
case MNODE:
return "mnode";
case BNODE:
return "bnode";
default:
return "taosd";
}
}
const char *dndNodeProcStr(ENodeType ntype) {
switch (ntype) {
case VNODES:
return "taosv";
case QNODE:
return "taosq";
case SNODE:
return "taoss";
case MNODE:
return "taosm";
case BNODE:
return "taosb";
default:
return "taosd";
}
}
const char *dndEventStr(EDndEvent ev) {
switch (ev) {
case DND_EVENT_START:
return "start";
case DND_EVENT_STOP:
return "stop";
case DND_EVENT_CHILD:
return "child";
default:
return "UNKNOWN";
}
}
\ No newline at end of file
...@@ -20,45 +20,133 @@ ...@@ -20,45 +20,133 @@
#define INTERNAL_CKEY "_key" #define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd" #define INTERNAL_SECRET "_pwd"
static inline void dndProcessQMVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SMsgHead *pHead = pMsg->pCont; SMgmtWrapper *pWrapper = &pDnode->wrappers[DNODE];
int32_t vgId = htonl(pHead->vgId); dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
}
SMgmtWrapper *pWrapper = pHandle->pWrapper; static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
if (vgId == QND_VGID) { NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
pWrapper = pHandle->pQndWrapper; if (msgFp == NULL) {
} else if (vgId == MND_VGID) { terrno = TSDB_CODE_MSG_NOT_PROCESSED;
pWrapper = pHandle->pMndWrapper;
} }
dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name, return msgFp;
pMsg->handle, pMsg->ahandle, vgId);
dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
} }
static void dndProcessResponse(SDnode *pDnode, SRpcMsg *pRsp, SEpSet *pEpSet) { static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
STransMgmt *pMgmt = &pDnode->trans; SRpcConnInfo connInfo = {0};
tmsg_t msgType = pRsp->msgType; if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->ahandle, pRpc->handle);
return -1;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->clientIp = connInfo.clientIp;
pMsg->clientPort = connInfo.clientPort;
memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
return 0;
}
static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t code = -1;
SNodeMsg *pMsg = NULL;
NodeMsgFp msgFp = NULL;
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
}
if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
if (pWrapper->procType == PROC_SINGLE) {
dTrace("msg:%p, is created, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
code = (*msgFp)(pWrapper, pMsg);
} else if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ);
} else {
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
ASSERT(1);
}
_OVER:
if (code == 0) {
if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is freed in parent process", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
} else {
dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
if (pRpc->msgType & 1U) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
tmsgSendRsp(&rsp);
}
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
dndReleaseWrapper(pWrapper);
}
static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pMsg->msgType;
bool isReq = msgType & 1u;
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
SMgmtWrapper *pWrapper = pHandle->pWrapper;
if (msgType == TDMT_DND_NETWORK_TEST) {
dTrace("network test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
dndProcessStartupReq(pDnode, pMsg);
return;
}
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle); dError("msg:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
rpcFreeCont(pRsp->pCont); if (isReq) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
return; return;
} }
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; if (isReq && pMsg->pCont == NULL) {
if (pHandle->pWrapper != NULL) { dError("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) { SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN, .ahandle = pMsg->ahandle};
dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType), rpcSendResponse(&rspMsg);
pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code)); return;
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); }
if (pWrapper == NULL) {
dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
if (isReq) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
}
if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) {
SMsgHead *pHead = pMsg->pCont;
int32_t vgId = ntohl(pHead->vgId);
if (vgId == QND_VGID) {
pWrapper = pHandle->pQndWrapper;
} else if (vgId == MND_VGID) {
pWrapper = pHandle->pMndWrapper;
} else { } else {
dndProcessQMVnodeRpcMsg(pHandle, pRsp, pEpSet);
} }
} else {
dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
rpcFreeCont(pRsp->pCont);
} }
dTrace("msg:%s will be processed by %s, app:%p", TMSG_INFO(msgType), pWrapper->name, pMsg->ahandle);
dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
} }
static int32_t dndInitClient(SDnode *pDnode) { static int32_t dndInitClient(SDnode *pDnode) {
...@@ -68,7 +156,7 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -68,7 +156,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND"; rpcInit.label = "DND";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)dndProcessResponse; rpcInit.cfp = (RpcCfp)dndProcessMsg;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
...@@ -100,48 +188,6 @@ static void dndCleanupClient(SDnode *pDnode) { ...@@ -100,48 +188,6 @@ static void dndCleanupClient(SDnode *pDnode) {
} }
} }
static void dndProcessRequest(SDnode *pDnode, SRpcMsg *pReq, SEpSet *pEpSet) {
STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pReq->msgType;
if (msgType == TDMT_DND_NETWORK_TEST) {
dTrace("network test req will be processed, handle:%p, app:%p", pReq->handle, pReq->ahandle);
dndProcessStartupReq(pDnode, pReq);
return;
}
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
dError("req:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pReq->ahandle};
rpcSendResponse(&rspMsg);
rpcFreeCont(pReq->pCont);
return;
}
if (pReq->pCont == NULL) {
dTrace("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN, .ahandle = pReq->ahandle};
rpcSendResponse(&rspMsg);
return;
}
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (pHandle->pWrapper != NULL) {
if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) {
dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
pReq->handle, pReq->ahandle);
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
} else {
dndProcessQMVnodeRpcMsg(pHandle, pReq, pEpSet);
}
} else {
dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
rpcSendResponse(&rspMsg);
rpcFreeCont(pReq->pCont);
}
}
static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) { static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
SEpSet epSet = {0}; SEpSet epSet = {0};
SMgmtWrapper *pWrapper = &pDnode->wrappers[DNODE]; SMgmtWrapper *pWrapper = &pDnode->wrappers[DNODE];
...@@ -149,7 +195,8 @@ static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg ...@@ -149,7 +195,8 @@ static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg
rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp); rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
} }
static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
char *ckey) {
int32_t code = 0; int32_t code = 0;
char pass[TSDB_PASSWORD_LEN + 1] = {0}; char pass[TSDB_PASSWORD_LEN + 1] = {0};
...@@ -219,7 +266,7 @@ static int32_t dndInitServer(SDnode *pDnode) { ...@@ -219,7 +266,7 @@ static int32_t dndInitServer(SDnode *pDnode) {
rpcInit.localPort = pDnode->serverPort; rpcInit.localPort = pDnode->serverPort;
rpcInit.label = "DND"; rpcInit.label = "DND";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = (RpcCfp)dndProcessRequest; rpcInit.cfp = (RpcCfp)dndProcessMsg;
rpcInit.sessions = tsMaxShellConns; rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册