提交 21a9367d 编写于 作者: S Shengliang Guan

refactor: transport

上级 6e523a47
......@@ -398,8 +398,7 @@ static void *dmConsumParentQueue(void *param) {
rpcFreeCont(pBody);
} else if (ftype == PROC_FUNC_RELEASE) {
pRsp = pHead;
dTrace("node:%s, release msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->handle);
dTrace("node:%s, release msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
dmRemoveProcRpcHandle(proc, pRsp->handle);
rpcReleaseHandle(pRsp->handle, (int8_t)pRsp->code);
rpcFreeCont(pBody);
......
......@@ -41,15 +41,6 @@ static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
taosWUnLockLatch(&pData->latch);
}
static inline NodeMsgFp dmGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
return msgFp;
}
static inline int32_t dmBuildNodeMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
......@@ -62,66 +53,124 @@ static inline int32_t dmBuildNodeMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
pMsg->clientIp = connInfo.clientIp;
pMsg->clientPort = connInfo.clientPort;
memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
if ((pRpc->msgType & 1u)) {
assert(pRpc->refId != 0);
}
return 0;
}
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->rpcMsg.msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}
if (InParentProc(pWrapper->proc.ptype)) {
dTrace("msg:%p, created and put into child queue, type:%s handle:%p user:%s code:0x%04x contLen:%d", pMsg,
TMSG_INFO(pRpc->msgType), pRpc->handle, pMsg->user, pRpc->code & 0XFFFF, pRpc->contLen);
return dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
((pRpc->msgType & 1U) && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId,
PROC_FUNC_REQ);
dTrace("msg:%p, will be processed, handle:%p", pMsg, pMsg->rpcMsg.handle);
return (*msgFp)(pWrapper->pMgmt, pMsg);
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1;
SNodeMsg *pMsg = NULL;
tmsg_t msgType = pRpc->msgType;
bool isReq = msgType & 1u;
bool needRelease = false;
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMgmtWrapper *pWrapper = NULL;
if (msgType == TDMT_DND_NET_TEST) {
dmProcessNetTestReq(pDnode, pRpc);
code = 0;
goto _OVER;
} else if (msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || msgType == TDMT_VND_FETCH_RSP) {
code = qWorkerProcessFetchRsp(NULL, NULL, pRpc);
pRpc->pCont = NULL; // will be freed in qworker
code = 0;
goto _OVER;
} else {
dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, pMsg->user);
NodeMsgFp msgFp = dmGetMsgFp(pWrapper, &pMsg->rpcMsg);
if (msgFp == NULL) return -1;
return (*msgFp)(pWrapper->pMgmt, pMsg);
}
}
static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t code = -1;
SNodeMsg *pMsg = NULL;
uint16_t msgType = pRpc->msgType;
bool needRelease = false;
bool isReq = msgType & 1U;
if (pDnode->status != DND_STAT_RUNNING) {
if (msgType == TDMT_DND_SERVER_STATUS) {
dmProcessServerStartupStatus(pDnode, pRpc);
code = 0;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
goto _OVER;
}
if (isReq && pRpc->pCont == NULL) {
terrno = TSDB_CODE_INVALID_MSG_LEN;
goto _OVER;
}
if (pHandle->defaultNtype == NODE_END) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
goto _OVER;
} else {
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
if (pHandle->needCheckVgId) {
SMsgHead *pHead = pRpc->pCont;
int32_t vgId = ntohl(pHead->vgId);
if (vgId == QNODE_HANDLE) {
pWrapper = &pDnode->wrappers[QNODE];
} else if (vgId == MNODE_HANDLE) {
pWrapper = &pDnode->wrappers[MNODE];
} else {
}
}
}
if (dmMarkWrapper(pWrapper) != 0) {
goto _OVER;
} else {
needRelease = true;
}
if (dmMarkWrapper(pWrapper) != 0) goto _OVER;
needRelease = true;
dTrace("msg:%s is received, handle:%p app:%p", TMSG_INFO(msgType), pRpc->handle, pRpc->ahandle);
pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
if (pMsg == NULL) {
goto _OVER;
}
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM)) == NULL) goto _OVER;
if (dmBuildNodeMsg(pMsg, pRpc) != 0) goto _OVER;
if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
goto _OVER;
}
code = dmProcessNodeMsg(pWrapper, pMsg);
if (InParentProc(pWrapper->proc.ptype)) {
dTrace("msg:%p, put into child queue, handle:%p", pMsg, pRpc->handle);
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
(isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ);
} else {
code = dmProcessNodeMsg(pWrapper, pMsg);
}
_OVER:
if (code == 0) {
if (InParentProc(pWrapper->proc.ptype)) {
if (pWrapper != NULL && InParentProc(pWrapper->proc.ptype)) {
dTrace("msg:%p, freed in parent process", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
} else {
dError("msg:%p, type:%s handle:%p failed to process since 0x%04x:%s", pMsg, TMSG_INFO(msgType), pRpc->handle,
code & 0XFFFF, terrstr());
dError("msg:%p, failed to process since %s", pMsg, terrstr());
if (terrno != 0) code = terrno;
if (isReq) {
if (terrno != 0) code = terrno;
if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
if (msgType > TDMT_MND_MSG && msgType < TDMT_VND_MSG) {
code = TSDB_CODE_NODE_REDIRECT;
}
}
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId};
tmsgSendRsp(&rsp);
SRpcMsg rspMsg = {
.handle = pRpc->handle,
.code = code,
.ahandle = pRpc->ahandle,
.refId = pRpc->refId,
};
tmsgSendRsp(&rspMsg);
}
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
......@@ -132,83 +181,6 @@ _OVER:
}
}
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
tmsg_t msgType = pMsg->msgType;
bool isReq = msgType & 1u;
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMgmtWrapper *pWrapper = NULL;
switch (msgType) {
case TDMT_DND_SERVER_STATUS:
if (pDnode->status != DND_STAT_RUNNING) {
dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
dmProcessServerStartupStatus(pDnode, pMsg);
return;
} else {
break;
}
case TDMT_DND_NET_TEST:
dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
dmProcessNetTestReq(pDnode, pMsg);
return;
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
case TDMT_VND_FETCH_RSP:
dTrace("retrieve rsp is received");
qWorkerProcessFetchRsp(NULL, NULL, pMsg);
pMsg->pCont = NULL; // already freed in qworker
return;
}
if (pDnode->status != DND_STAT_RUNNING) {
dError("msg:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
if (isReq) {
SRpcMsg rspMsg = {
.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
return;
}
if (isReq && pMsg->pCont == NULL) {
dError("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
SRpcMsg rspMsg = {
.handle = pMsg->handle, .code = TSDB_CODE_INVALID_MSG_LEN, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
rpcSendResponse(&rspMsg);
return;
}
if (pHandle->defaultNtype == NODE_END) {
dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
if (isReq) {
SRpcMsg rspMsg = {
.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
return;
}
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
if (pHandle->needCheckVgId) {
SMsgHead *pHead = pMsg->pCont;
int32_t vgId = ntohl(pHead->vgId);
if (vgId == QNODE_HANDLE) {
pWrapper = &pDnode->wrappers[QNODE];
} else if (vgId == MNODE_HANDLE) {
pWrapper = &pDnode->wrappers[MNODE];
} else {
}
}
dTrace("msg:%s will be processed by %s, app:%p", TMSG_INFO(msgType), pWrapper->name, pMsg->ahandle);
if (isReq) {
assert(pMsg->refId != 0);
}
dmProcessRpcMsg(pWrapper, pMsg, pEpSet);
}
int32_t dmInitMsgHandle(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
......@@ -248,17 +220,19 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
epSet.eps[i].port = htons(epSet.eps[i].port);
}
SRpcMsg resp;
SMEpSet msg = {.epSet = epSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
resp.pCont = rpcMallocCont(len);
resp.contLen = len;
tSerializeSMEpSet(resp.pCont, len, &msg);
resp.code = TSDB_CODE_RPC_REDIRECT;
resp.handle = pReq->handle;
resp.refId = pReq->refId;
rpcSendResponse(&resp);
SRpcMsg rsp = {
.code = TSDB_CODE_RPC_REDIRECT,
.handle = pReq->handle,
.refId = pReq->refId,
.contLen = len,
};
rsp.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rsp.pCont, len, &msg);
rpcSendResponse(&rsp);
}
static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
......@@ -309,17 +283,17 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
if (!InChildProc(pWrapper->proc.ptype)) {
SRpcMsg resp = {0};
SRpcMsg rsp = {0};
SMEpSet msg = {.epSet = *pNewEpSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
resp.pCont = rpcMallocCont(len);
resp.contLen = len;
tSerializeSMEpSet(resp.pCont, len, &msg);
resp.code = TSDB_CODE_RPC_REDIRECT;
resp.handle = pRsp->handle;
resp.refId = pRsp->refId;
rpcSendResponse(&resp);
rsp.pCont = rpcMallocCont(len);
rsp.contLen = len;
tSerializeSMEpSet(rsp.pCont, len, &msg);
rsp.code = TSDB_CODE_RPC_REDIRECT;
rsp.handle = pRsp->handle;
rsp.refId = pRsp->refId;
rpcSendResponse(&rsp);
} else {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
}
......@@ -356,7 +330,7 @@ int32_t dmInitClient(SDnode *pDnode) {
SRpcInit rpcInit = {0};
rpcInit.label = "DND";
rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)dmProcessMsg;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
......@@ -459,7 +433,7 @@ int32_t dmInitServer(SDnode *pDnode) {
rpcInit.localPort = pDnode->data.serverPort;
rpcInit.label = "DND";
rpcInit.numOfThreads = tsNumOfRpcThreads;
rpcInit.cfp = (RpcCfp)dmProcessMsg;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
......
......@@ -3,7 +3,7 @@ enable_testing()
add_subdirectory(acct)
add_subdirectory(bnode)
add_subdirectory(db)
add_subdirectory(dnode)
#add_subdirectory(dnode)
add_subdirectory(func)
#add_subdirectory(mnode)
add_subdirectory(profile)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册