提交 2a5e231f 编写于 作者: S Shengliang Guan

shm

上级 359482be
...@@ -77,6 +77,7 @@ SDnode *dndCreate(SDndCfg *pCfg) { ...@@ -77,6 +77,7 @@ SDnode *dndCreate(SDndCfg *pCfg) {
goto _OVER; goto _OVER;
} }
memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg));
dndSetStatus(pDnode, DND_STAT_INIT); dndSetStatus(pDnode, DND_STAT_INIT);
pDnode->rebootTime = taosGetTimestampMs(); pDnode->rebootTime = taosGetTimestampMs();
pDnode->pLockFile = dndCheckRunning(pCfg->dataDir); pDnode->pLockFile = dndCheckRunning(pCfg->dataDir);
...@@ -100,7 +101,6 @@ SDnode *dndCreate(SDndCfg *pCfg) { ...@@ -100,7 +101,6 @@ SDnode *dndCreate(SDndCfg *pCfg) {
qmGetMgmtFp(&pDnode->wrappers[QNODE]); qmGetMgmtFp(&pDnode->wrappers[QNODE]);
smGetMgmtFp(&pDnode->wrappers[SNODE]); smGetMgmtFp(&pDnode->wrappers[SNODE]);
bmGetMgmtFp(&pDnode->wrappers[BNODE]); bmGetMgmtFp(&pDnode->wrappers[BNODE]);
memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg));
if (dndInitMsgHandle(pDnode) != 0) { if (dndInitMsgHandle(pDnode) != 0) {
goto _OVER; goto _OVER;
...@@ -306,12 +306,13 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -306,12 +306,13 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER; goto _OVER;
} }
dTrace("msg:%p, is created, user:%s", pMsg, pMsg->user); dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
code = (*msgFp)(pWrapper, pMsg); code = (*msgFp)(pWrapper, pMsg);
_OVER: _OVER:
if (code != 0) { if (code != 0) {
dError("msg:%p, failed to process since %s", pMsg, terrstr());
bool isReq = (pRpc->msgType & 1U); bool isReq = (pRpc->msgType & 1U);
if (isReq) { if (isReq) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
......
...@@ -37,8 +37,8 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { ...@@ -37,8 +37,8 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (pHandle->msgFp != NULL) { if (pHandle->msgFp != NULL) {
dTrace("rsp:%s will be processed by %s, code:0x%x app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name, dTrace("rsp:%s will be processed by %s, app:%p code:0x%x:%s", TMSG_INFO(msgType), pHandle->pWrapper->name,
pRsp->code & 0XFFFF, pRsp->ahandle); pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
} else { } else {
dError("rsp:%s not processed, app:%p", TMSG_INFO(msgType), pRsp->ahandle); dError("rsp:%s not processed, app:%p", TMSG_INFO(msgType), pRsp->ahandle);
...@@ -51,7 +51,7 @@ int32_t dndInitClient(SDnode *pDnode) { ...@@ -51,7 +51,7 @@ int32_t dndInitClient(SDnode *pDnode) {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "CLI"; rpcInit.label = "DND";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dndProcessResponse; rpcInit.cfp = dndProcessResponse;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
...@@ -218,7 +218,7 @@ int32_t dndInitServer(SDnode *pDnode) { ...@@ -218,7 +218,7 @@ int32_t dndInitServer(SDnode *pDnode) {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pDnode->cfg.serverPort; rpcInit.localPort = pDnode->cfg.serverPort;
rpcInit.label = "SRV"; rpcInit.label = "DND";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dndProcessRequest; rpcInit.cfp = dndProcessRequest;
rpcInit.sessions = tsMaxShellConns; rpcInit.sessions = tsMaxShellConns;
......
...@@ -54,7 +54,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -54,7 +54,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
pMgmt->statusSent = 1; pMgmt->statusSent = 1;
dTrace("send status req to mnode, ahandle:%p", rpcMsg.ahandle); dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle);
dndSendReqToMnode(pMgmt->pDnode, &rpcMsg); dndSendReqToMnode(pMgmt->pDnode, &rpcMsg);
} }
......
...@@ -130,6 +130,7 @@ static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { ...@@ -130,6 +130,7 @@ static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
taosFreeQitem(pNodeMsg); taosFreeQitem(pNodeMsg);
dTrace("msg:%p, is freed", pNodeMsg);
} }
int32_t dmStartWorker(SDnodeMgmt *pMgmt) { int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
...@@ -174,5 +175,5 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -174,5 +175,5 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg)); return dndWriteMsgToWorker(pWorker, pMsg, 0);
} }
\ No newline at end of file
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DND_EXEC_H_ #ifndef _TD_DND_MAIN_H_
#define _TD_DND_EXEC_H_ #define _TD_DND_MAIN_H_
#include "dnode.h" #include "dnode.h"
...@@ -45,4 +45,4 @@ SDndCfg dndGetCfg(); ...@@ -45,4 +45,4 @@ SDndCfg dndGetCfg();
} }
#endif #endif
#endif /*_TD_DND_EXEC_H_*/ #endif /*_TD_DND_MAIN_H_*/
...@@ -270,4 +270,7 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { ...@@ -270,4 +270,7 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
int32_t mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} int32_t mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;}
int32_t mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} int32_t mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;}
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
\ No newline at end of file terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册