diff --git a/include/dnode/bnode/bnode.h b/include/dnode/bnode/bnode.h index 23cc3ca6176eb9bd5ff88b48bc62befb56687f2d..3cc26861aba988856a2e951116765222adb4b8c8 100644 --- a/include/dnode/bnode/bnode.h +++ b/include/dnode/bnode/bnode.h @@ -23,9 +23,9 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SDnode SDnode; typedef struct SBnode SBnode; -typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef struct { int64_t numOfErrors; @@ -40,9 +40,9 @@ typedef struct { int64_t clusterId; SBnodeCfg cfg; SDnode *pDnode; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; - SendRedirectMsgFp sendRedirectMsgFp; + SendReqToDnodeFp sendReqToDnodeFp; + SendReqToMnodeFp sendReqToMnodeFp; + SendRedirectRspFp sendRedirectRspFp; } SBnodeOpt; /* ------------------------ SBnode ------------------------ */ diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index a288e3e6309ee64cd64579c5d72b7a43e447c72d..de4b4e4e89d1418391aa6daf519c77bea8013c72 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -24,9 +24,10 @@ extern "C" { typedef struct SDnode SDnode; typedef struct SMnode SMnode; typedef struct SMnodeMsg SMnodeMsg; -typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef struct SMnodeLoad { int64_t numOfDnode; @@ -62,9 +63,10 @@ typedef struct { SReplica replicas[TSDB_MAX_REPLICA]; SMnodeCfg cfg; SDnode *pDnode; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; - SendRedirectMsgFp sendRedirectMsgFp; + PutReqToMWriteQFp putReqToMWriteQFp; + SendReqToDnodeFp sendReqToDnodeFp; + SendReqToMnodeFp sendReqToMnodeFp; + SendRedirectRspFp sendRedirectRspFp; } SMnodeOpt; /* ------------------------ SMnode ------------------------ */ diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 8084175a907ff4c98b52a5f343956b8fbcc834fd..554c57a045436f9df54338a79bbb8d69ef4e3e7a 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -23,9 +23,9 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SDnode SDnode; typedef struct SQnode SQnode; -typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef struct { int64_t numOfStartTask; @@ -47,9 +47,9 @@ typedef struct { int64_t clusterId; SQnodeCfg cfg; SDnode *pDnode; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; - SendRedirectMsgFp sendRedirectMsgFp; + SendReqToDnodeFp sendReqToDnodeFp; + SendReqToMnodeFp sendReqToMnodeFp; + SendRedirectRspFp sendRedirectRspFp; } SQnodeOpt; /* ------------------------ SQnode ------------------------ */ diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 4913d2572fa201b23df12195e0004f55b8c4b866..43d3dd9b4b377fbee57f9cf9fd02c9f14699f77d 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -23,9 +23,9 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SDnode SDnode; typedef struct SSnode SSnode; -typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef struct { int64_t numOfErrors; @@ -40,9 +40,9 @@ typedef struct { int64_t clusterId; SSnodeCfg cfg; SDnode *pDnode; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; - SendRedirectMsgFp sendRedirectMsgFp; + SendReqToDnodeFp sendReqToDnodeFp; + SendReqToMnodeFp sendReqToMnodeFp; + SendRedirectRspFp sendRedirectRspFp; } SSnodeOpt; /* ------------------------ SSnode ------------------------ */ diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2dcc74213c1db6e6917e9a205d50488c3a88773b..03054049377e80e87fdc2dc1e0f47848b236762d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -253,7 +253,7 @@ int32_t* taosGetErrno(); // dnode #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) -#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401) +#define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401) #define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0402) #define TSDB_CODE_DND_DNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0410) #define TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0411) diff --git a/source/dnode/bnode/inc/bndInt.h b/source/dnode/bnode/inc/bndInt.h index d44c520a26e426bc5ea5f39441576e687bf50946..cddb1e50f14526df43e7e2234950a71ac74be864 100644 --- a/source/dnode/bnode/inc/bndInt.h +++ b/source/dnode/bnode/inc/bndInt.h @@ -33,9 +33,9 @@ typedef struct SBnode { int32_t dnodeId; int64_t clusterId; SBnodeCfg cfg; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; - SendRedirectMsgFp sendRedirectMsgFp; + SendReqToDnodeFp sendReqToDnodeFp; + SendReqToMnodeFp sendReqToMnodeFp; + SendRedirectRspFp sendRedirectRspFp; } SBnode; #ifdef __cplusplus diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index a2015913a7680789e81df4ee0e598547af755252..69c2aa1fbf39be66d066732c401192f7bd9eb016 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -29,8 +29,8 @@ int64_t dndGetClusterId(SDnode *pDnode); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); -void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); -void dndSendStatusMsg(SDnode *pDnode); +void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); +void dndSendStatusReq(SDnode *pDnode); void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet); void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/impl/inc/dndTransport.h b/source/dnode/mgmt/impl/inc/dndTransport.h index 312da69fa23d6accc7057965ad3e97e86a737e21..42fb379fc1bf201ec617ce6b7b3236a5eff23782 100644 --- a/source/dnode/mgmt/impl/inc/dndTransport.h +++ b/source/dnode/mgmt/impl/inc/dndTransport.h @@ -23,8 +23,8 @@ extern "C" { int32_t dndInitTrans(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode); -void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); -void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); +int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index c12d449517adcf5fbeafe1a5f727177000ab10b3..dfe4eda1451eb2da8af54556393c80f08b0b7819 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -179,9 +179,9 @@ static void dndStopBnodeWorker(SDnode *pDnode) { static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { pOption->pDnode = pDnode; - pOption->sendMsgToDnodeFp = dndSendMsgToDnode; - pOption->sendMsgToMnodeFp = dndSendMsgToMnode; - pOption->sendRedirectMsgFp = dndSendRedirectMsg; + pOption->sendReqToDnodeFp = dndSendReqToDnode; + pOption->sendReqToMnodeFp = dndSendReqToMnode; + pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); pOption->cfg.sver = pDnode->opt.sver; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 30b069f349c4e3cb3f33e31bd0d93df42df3fce2..f5f9bbf1b84367100b07e4c3216cbb3361a941c9 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -80,7 +80,7 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { taosRUnLockLatch(&pMgmt->latch); } -void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { +void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { tmsg_t msgType = pMsg->msgType; SEpSet epSet = {0}; @@ -354,7 +354,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { return 0; } -void dndSendStatusMsg(SDnode *pDnode) { +void dndSendStatusReq(SDnode *pDnode) { int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); SStatusMsg *pStatus = rpcMallocCont(contLen); @@ -391,7 +391,7 @@ void dndSendStatusMsg(SDnode *pDnode) { pMgmt->statusSent = 1; dTrace("pDnode:%p, send status msg to mnode", pDnode); - dndSendMsgToMnode(pDnode, &rpcMsg); + dndSendReqToMnode(pDnode, &rpcMsg); } static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { @@ -491,7 +491,7 @@ static void *dnodeThreadRoutine(void *param) { taosMsleep(ms); if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent && !pMgmt->dropped) { - dndSendStatusMsg(pDnode); + dndSendStatusReq(pDnode); } } } diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 577cb0c3b07115a1d4f3eb0932430a9671ebaa0d..a8bf26f133a3cd76a6e6b5d688b2598db04865b8 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -19,6 +19,7 @@ #include "dndTransport.h" #include "dndWorker.h" +static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg); static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg); static SMnode *dndAcquireMnode(SDnode *pDnode) { @@ -258,11 +259,16 @@ static bool dndNeedDeployMnode(SDnode *pDnode) { return true; } +static int32_t dndPutMsgToMWriteQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { + dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg); +} + static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { pOption->pDnode = pDnode; - pOption->sendMsgToDnodeFp = dndSendMsgToDnode; - pOption->sendMsgToMnodeFp = dndSendMsgToMnode; - pOption->sendRedirectMsgFp = dndSendRedirectMsg; + pOption->sendReqToDnodeFp = dndSendReqToDnode; + pOption->sendReqToMnodeFp = dndSendReqToMnode; + pOption->sendRedirectRspFp = dndSendRedirectRsp; + pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); pOption->cfg.sver = pDnode->opt.sver; diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 1f3d6ee371a41279b83bc52233ab30ce369e48a8..e73a0cb0d8f7412c42b28ed6e448e679530c4c0f 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -185,9 +185,9 @@ static void dndStopQnodeWorker(SDnode *pDnode) { static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { pOption->pDnode = pDnode; - pOption->sendMsgToDnodeFp = dndSendMsgToDnode; - pOption->sendMsgToMnodeFp = dndSendMsgToMnode; - pOption->sendRedirectMsgFp = dndSendRedirectMsg; + pOption->sendReqToDnodeFp = dndSendReqToDnode; + pOption->sendReqToMnodeFp = dndSendReqToMnode; + pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); pOption->cfg.sver = pDnode->opt.sver; diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index ab4ca191a9e74d63ba7f8d441009f5c5e8f0187c..070fc8663ebd76e70b68e0dcb6006490549b38a5 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -179,9 +179,9 @@ static void dndStopSnodeWorker(SDnode *pDnode) { static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { pOption->pDnode = pDnode; - pOption->sendMsgToDnodeFp = dndSendMsgToDnode; - pOption->sendMsgToMnodeFp = dndSendMsgToMnode; - pOption->sendRedirectMsgFp = dndSendRedirectMsg; + pOption->sendReqToDnodeFp = dndSendReqToDnode; + pOption->sendReqToMnodeFp = dndSendReqToMnode; + pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); pOption->cfg.sver = pDnode->opt.sver; diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index bb16ef0b77bc4021f7a060e19cd99e36a5ad46ae..cf0f5616096df39b020f401b3e76106bc4aac32e 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -105,8 +105,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; @@ -216,7 +214,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { if (dndGetStat(pDnode) == DND_STAT_STOPPED) { dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_EXITING}; + SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_OFFLINE}; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); return; @@ -383,14 +381,19 @@ void dndCleanupTrans(SDnode *pDnode) { dInfo("dnode-transport is cleaned up"); } -void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) { +int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) { STransMgmt *pMgmt = &pDnode->tmgmt; - if (pMgmt->clientRpc == NULL) return; + if (pMgmt->clientRpc == NULL) { + terrno = TSDB_CODE_DND_OFFLINE; + return -1; + } + rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL); + return 0; } -void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pMsg) { +int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pMsg) { SEpSet epSet = {0}; dndGetMnodeEpSet(pDnode, &epSet); - dndSendMsgToDnode(pDnode, &epSet, pMsg); + return dndSendReqToDnode(pDnode, &epSet, pMsg); } diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 88a96dadc4bd50f04c45c5960334b63bd211e422..33cec5ff4774a27c8118e8f7b938f9533e174f60 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -213,7 +213,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { } dndSetStat(pDnode, DND_STAT_RUNNING); - dndSendStatusMsg(pDnode); + dndSendStatusReq(pDnode); dndReportStartup(pDnode, "TDengine", "initialized successfully"); dInfo("TDengine is initialized successfully, pDnode:%p", pDnode); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 5c8d409d90cd7b43465fe555e272a2b9134174d1..89fb2e318929d050cd3c58346edd058211798e68 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -91,15 +91,16 @@ typedef struct SMnode { STelemMgmt telemMgmt; SSyncMgmt syncMgmt; MndMsgFp msgFp[TDMT_MAX]; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; - SendRedirectMsgFp sendRedirectMsgFp; + SendReqToDnodeFp sendReqToDnodeFp; + SendReqToMnodeFp sendReqToMnodeFp; + SendRedirectRspFp sendRedirectRspFp; + PutReqToMWriteQFp putReqToMWriteQFp; } SMnode; -void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); -void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg); -void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg); -void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); +int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); +int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg); +void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg); +void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); uint64_t mndGenerateUid(char *name, int32_t len) ; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 201fcde1a932b2ff6da777ceb15a733b4f32cb0a..bd053d91b6743310fc7075d44a9f12be5efa69e3 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -45,6 +45,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransProcessRsp(SMnodeMsg *pMsg); +void mndTransPullup(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 1d78359015d8ec7fd4a9e59582695b5f74f0e671..2cca70b04ecb10ec4dc3cf43f2044fbd9dc2b41c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -557,7 +557,7 @@ static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { .ahandle = pMsg->rpcMsg.ahandle}; mInfo("dnode:%d, app:%p config:%s req send to dnode", pCfg->dnodeId, rpcMsg.ahandle, pCfg->config); - mndSendMsgToDnode(pMnode, &epSet, &rpcMsg); + mndSendReqToDnode(pMnode, &epSet, &rpcMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 19fb89454e136528639553f0005b95960f83e71f..bf1697fb19016803096f87aae367240f167f6eb2 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -83,6 +83,8 @@ static int32_t mndRestoreWal(SMnode *pMnode) { int64_t sdbVer = sdbUpdateVer(pSdb, 0); mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer); + mndTransPullup(pMnode); + if (walBeginSnapshot(pWal, sdbVer) < 0) { goto WAL_RESTORE_OVER; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index bf472a504c01efec09dc5d71a4134b1914b0d9ad..ee4a49ffdc981b4c4e6194ccd0a6c06806d9637d 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -52,7 +52,6 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans); static int32_t mndProcessTransMsg(SMnodeMsg *pMsg); -static int32_t mndProcessTransRsp(SMnodeMsg *pMsg); int32_t mndInitTrans(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_TRANS, @@ -64,7 +63,6 @@ int32_t mndInitTrans(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndTransActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg); - mndSetMsgHandle(pMnode, TDMT_MND_TRANS_RSP, mndProcessTransRsp); return sdbSetTable(pMnode->pSdb, table); } @@ -615,12 +613,15 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr } memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); - pAction->msgSent = 1; - pAction->msgReceived = 0; - pAction->errCode = 0; - - mDebug("trans:%d, action:%d is sent", pTrans->id, action); - mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg); + if (mndSendReqToDnode(pMnode, &pAction->epSet, &rpcMsg) == 0) { + mDebug("trans:%d, action:%d is sent", pTrans->id, action); + pAction->msgSent = 1; + pAction->msgReceived = 0; + pAction->errCode = 0; + } else { + mDebug("trans:%d, action:%d not sent since %s", pTrans->id, action, terrstr()); + return -1; + } } return 0; @@ -885,7 +886,11 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { } static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + mndTransPullup(pMsg->pMnode); + return 0; +} + +void mndTransPullup(SMnode *pMnode) { STrans *pTrans = NULL; void *pIter = NULL; @@ -897,5 +902,3 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { sdbRelease(pMnode->pSdb, pTrans); } } - -static int32_t mndProcessTransRsp(SMnodeMsg *pMsg) { return 0; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index abc86a7d358a01f7a72e1f52cf895728949422d2..0653a0f5d4500662cf88db675a3fe780c7fe72b3 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -34,21 +34,27 @@ #include "mndUser.h" #include "mndVgroup.h" -void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { - if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) { - (*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg); +int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { + if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) { + terrno = TSDB_CODE_MND_NOT_READY; + return -1; } + + return (*pMnode->sendReqToDnodeFp)(pMnode->pDnode, pEpSet, pMsg); } -void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg) { - if (pMnode != NULL && pMnode->sendMsgToMnodeFp != NULL) { - (*pMnode->sendMsgToMnodeFp)(pMnode->pDnode, pMsg); +int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) { + if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) { + terrno = TSDB_CODE_MND_NOT_READY; + return -1; } + + return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg); } -void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) { - if (pMnode != NULL && pMnode->sendRedirectMsgFp != NULL) { - (*pMnode->sendRedirectMsgFp)(pMnode->pDnode, pMsg); +void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) { + if (pMnode != NULL && pMnode->sendRedirectRspFp != NULL) { + (*pMnode->sendRedirectRspFp)(pMnode->pDnode, pMsg); } } @@ -56,11 +62,8 @@ static void mndTransReExecute(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg)); - SEpSet epSet = {.inUse = 0, .numOfEps = 1}; - epSet.port[0] = pMnode->replicas[pMnode->selfIndex].port; - memcpy(epSet.fqdn[0], pMnode->replicas[pMnode->selfIndex].fqdn, TSDB_FQDN_LEN); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)}; - mndSendMsgToDnode(pMnode, &epSet, &rpcMsg); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)}; + pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer); @@ -76,7 +79,7 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } - if (taosTmrReset(mndTransReExecute, 1000, pMnode, pMnode->timer, &pMnode->transTimer)) { + if (taosTmrReset(mndTransReExecute, 6000, pMnode, pMnode->timer, &pMnode->transTimer)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -223,9 +226,10 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->selfIndex = pOption->selfIndex; memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->pDnode = pOption->pDnode; - pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; - pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; - pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; + pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp; + pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp; + pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp; + pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp; pMnode->cfg.sver = pOption->cfg.sver; pMnode->cfg.enableTelem = pOption->cfg.enableTelem; pMnode->cfg.statusInterval = pOption->cfg.statusInterval; @@ -236,8 +240,9 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->cfg.gitinfo = strdup(pOption->cfg.gitinfo); pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo); - if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || - pMnode->dnodeId < 0 || pMnode->clusterId < 0 || pMnode->cfg.statusInterval < 1) { + if (pMnode->sendReqToDnodeFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL || + pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || + pMnode->cfg.statusInterval < 1) { terrno = TSDB_CODE_MND_INVALID_OPTIONS; return -1; } @@ -433,7 +438,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) { PROCESS_RPC_END: if (isReq) { if (code == TSDB_CODE_APP_NOT_READY) { - mndSendRedirectMsg(pMnode, &pMsg->rpcMsg); + mndSendRedirectRsp(pMnode, &pMsg->rpcMsg); } else if (code != 0) { SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; rpcSendResponse(&rpcRsp); diff --git a/source/dnode/mnode/impl/test/trans/trans.cpp b/source/dnode/mnode/impl/test/trans/trans.cpp index 1be38b3289d998a9fc3c2fd658325fc9bd8c79d3..d2bf92ad13934bb360856f348cfa684282270c57 100644 --- a/source/dnode/mnode/impl/test/trans/trans.cpp +++ b/source/dnode/mnode/impl/test/trans/trans.cpp @@ -2,7 +2,7 @@ * @file trans.cpp * @author slguan (slguan@taosdata.com) * @brief MNODE module trans tests - * @version 1.0 + * @version 0.1 * @date 2022-01-04 * * @copyright Copyright (c) 2022 @@ -75,10 +75,10 @@ TEST_F(DndTestTrans, 01_CreateUser_Crash) { test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 2); + CheckBinary("u1", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN); - CheckBinary("u2", TSDB_USER_LEN); - CheckBinary("super", 10); CheckBinary("normal", 10); + CheckBinary("super", 10); CheckTimestamp(); CheckTimestamp(); CheckBinary("root", TSDB_USER_LEN); diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index e9f1229a9da3336cf307d9b39942f6dd15e93cf0..529c407efa41c69cc0a9fc563f4a6464ea8c162c 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -32,9 +32,9 @@ typedef struct SQnode { int32_t dnodeId; int64_t clusterId; SQnodeCfg cfg; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; - SendRedirectMsgFp sendRedirectMsgFp; + SendReqToDnodeFp sendReqToDnodeFp; + SendReqToMnodeFp sendReqToMnodeFp; + SendRedirectRspFp sendRedirectRspFp; } SQnode; #ifdef __cplusplus diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 8827c92eef3f10f555df1caaf725f5f015262885..3b41c7f4b170c6ae3ede30d9b034548073b8492f 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -32,9 +32,9 @@ typedef struct SSnode { int32_t dnodeId; int64_t clusterId; SSnodeCfg cfg; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; - SendRedirectMsgFp sendRedirectMsgFp; + SendReqToDnodeFp sendReqToDnodeFp; + SendReqToMnodeFp sendReqToMnodeFp; + SendRedirectRspFp sendRedirectRspFp; } SSnode; #ifdef __cplusplus diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 310944e9b6cdb7c57d59a626b1a73aaa2b47bd12..242bdb15583b91a69c703f5024cd9bc8792eba6f 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -1195,7 +1195,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } rpcSendReqToServer(pRpc, pContext); rpcFreeCont(rpcMsg.pCont); - } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_EXITING) { + } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_OFFLINE) { pContext->code = pHead->code; rpcProcessConnError(pContext, NULL); rpcFreeCont(rpcMsg.pCont); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e821f1f803b830c9de06525d946d02b94f340f3e..0bf56dbaaf7a18f179c8167b36e9dca53e66ef10 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -253,7 +253,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exis // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline") TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message length") TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_READ_FILE_ERROR, "Read dnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR, "Write dnode.json error")