提交 bcad6bb5 编写于 作者: S Shengliang Guan

restore from wal

上级 5f70d7b2
...@@ -23,9 +23,9 @@ extern "C" { ...@@ -23,9 +23,9 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct SBnode SBnode; typedef struct SBnode SBnode;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef struct { typedef struct {
int64_t numOfErrors; int64_t numOfErrors;
...@@ -40,9 +40,9 @@ typedef struct { ...@@ -40,9 +40,9 @@ typedef struct {
int64_t clusterId; int64_t clusterId;
SBnodeCfg cfg; SBnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
SendMsgToDnodeFp sendMsgToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectRspFp sendRedirectRspFp;
} SBnodeOpt; } SBnodeOpt;
/* ------------------------ SBnode ------------------------ */ /* ------------------------ SBnode ------------------------ */
......
...@@ -24,9 +24,10 @@ extern "C" { ...@@ -24,9 +24,10 @@ extern "C" {
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct SMnode SMnode; typedef struct SMnode SMnode;
typedef struct SMnodeMsg SMnodeMsg; typedef struct SMnodeMsg SMnodeMsg;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef struct SMnodeLoad { typedef struct SMnodeLoad {
int64_t numOfDnode; int64_t numOfDnode;
...@@ -62,9 +63,10 @@ typedef struct { ...@@ -62,9 +63,10 @@ typedef struct {
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
SMnodeCfg cfg; SMnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
SendMsgToDnodeFp sendMsgToDnodeFp; PutReqToMWriteQFp putReqToMWriteQFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectRspFp sendRedirectRspFp;
} SMnodeOpt; } SMnodeOpt;
/* ------------------------ SMnode ------------------------ */ /* ------------------------ SMnode ------------------------ */
......
...@@ -23,9 +23,9 @@ extern "C" { ...@@ -23,9 +23,9 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct SQnode SQnode; typedef struct SQnode SQnode;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef struct { typedef struct {
int64_t numOfStartTask; int64_t numOfStartTask;
...@@ -47,9 +47,9 @@ typedef struct { ...@@ -47,9 +47,9 @@ typedef struct {
int64_t clusterId; int64_t clusterId;
SQnodeCfg cfg; SQnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
SendMsgToDnodeFp sendMsgToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectRspFp sendRedirectRspFp;
} SQnodeOpt; } SQnodeOpt;
/* ------------------------ SQnode ------------------------ */ /* ------------------------ SQnode ------------------------ */
......
...@@ -23,9 +23,9 @@ extern "C" { ...@@ -23,9 +23,9 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct SSnode SSnode; typedef struct SSnode SSnode;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef struct { typedef struct {
int64_t numOfErrors; int64_t numOfErrors;
...@@ -40,9 +40,9 @@ typedef struct { ...@@ -40,9 +40,9 @@ typedef struct {
int64_t clusterId; int64_t clusterId;
SSnodeCfg cfg; SSnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
SendMsgToDnodeFp sendMsgToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectRspFp sendRedirectRspFp;
} SSnodeOpt; } SSnodeOpt;
/* ------------------------ SSnode ------------------------ */ /* ------------------------ SSnode ------------------------ */
......
...@@ -253,7 +253,7 @@ int32_t* taosGetErrno(); ...@@ -253,7 +253,7 @@ int32_t* taosGetErrno();
// dnode // dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401) #define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401)
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0402) #define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0402)
#define TSDB_CODE_DND_DNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0410) #define TSDB_CODE_DND_DNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0410)
#define TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0411) #define TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0411)
......
...@@ -33,9 +33,9 @@ typedef struct SBnode { ...@@ -33,9 +33,9 @@ typedef struct SBnode {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SBnodeCfg cfg; SBnodeCfg cfg;
SendMsgToDnodeFp sendMsgToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectRspFp sendRedirectRspFp;
} SBnode; } SBnode;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -29,8 +29,8 @@ int64_t dndGetClusterId(SDnode *pDnode); ...@@ -29,8 +29,8 @@ int64_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
void dndSendStatusMsg(SDnode *pDnode); void dndSendStatusReq(SDnode *pDnode);
void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet); void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
......
...@@ -23,8 +23,8 @@ extern "C" { ...@@ -23,8 +23,8 @@ extern "C" {
int32_t dndInitTrans(SDnode *pDnode); int32_t dndInitTrans(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode);
void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -179,9 +179,9 @@ static void dndStopBnodeWorker(SDnode *pDnode) { ...@@ -179,9 +179,9 @@ static void dndStopBnodeWorker(SDnode *pDnode) {
static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
pOption->pDnode = pDnode; pOption->pDnode = pDnode;
pOption->sendMsgToDnodeFp = dndSendMsgToDnode; pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendMsgToMnodeFp = dndSendMsgToMnode; pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectMsgFp = dndSendRedirectMsg; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->opt.sver; pOption->cfg.sver = pDnode->opt.sver;
......
...@@ -80,7 +80,7 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { ...@@ -80,7 +80,7 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
} }
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
tmsg_t msgType = pMsg->msgType; tmsg_t msgType = pMsg->msgType;
SEpSet epSet = {0}; SEpSet epSet = {0};
...@@ -354,7 +354,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { ...@@ -354,7 +354,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
return 0; return 0;
} }
void dndSendStatusMsg(SDnode *pDnode) { void dndSendStatusReq(SDnode *pDnode) {
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen); SStatusMsg *pStatus = rpcMallocCont(contLen);
...@@ -391,7 +391,7 @@ void dndSendStatusMsg(SDnode *pDnode) { ...@@ -391,7 +391,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
pMgmt->statusSent = 1; pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status msg to mnode", pDnode); dTrace("pDnode:%p, send status msg to mnode", pDnode);
dndSendMsgToMnode(pDnode, &rpcMsg); dndSendReqToMnode(pDnode, &rpcMsg);
} }
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
...@@ -491,7 +491,7 @@ static void *dnodeThreadRoutine(void *param) { ...@@ -491,7 +491,7 @@ static void *dnodeThreadRoutine(void *param) {
taosMsleep(ms); taosMsleep(ms);
if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent && !pMgmt->dropped) { if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent && !pMgmt->dropped) {
dndSendStatusMsg(pDnode); dndSendStatusReq(pDnode);
} }
} }
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "dndTransport.h" #include "dndTransport.h"
#include "dndWorker.h" #include "dndWorker.h"
static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg);
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static SMnode *dndAcquireMnode(SDnode *pDnode) { static SMnode *dndAcquireMnode(SDnode *pDnode) {
...@@ -258,11 +259,16 @@ static bool dndNeedDeployMnode(SDnode *pDnode) { ...@@ -258,11 +259,16 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
return true; return true;
} }
static int32_t dndPutMsgToMWriteQ(SDnode *pDnode, SRpcMsg *pRpcMsg) {
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg);
}
static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->pDnode = pDnode; pOption->pDnode = pDnode;
pOption->sendMsgToDnodeFp = dndSendMsgToDnode; pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendMsgToMnodeFp = dndSendMsgToMnode; pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectMsgFp = dndSendRedirectMsg; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->opt.sver; pOption->cfg.sver = pDnode->opt.sver;
......
...@@ -185,9 +185,9 @@ static void dndStopQnodeWorker(SDnode *pDnode) { ...@@ -185,9 +185,9 @@ static void dndStopQnodeWorker(SDnode *pDnode) {
static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
pOption->pDnode = pDnode; pOption->pDnode = pDnode;
pOption->sendMsgToDnodeFp = dndSendMsgToDnode; pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendMsgToMnodeFp = dndSendMsgToMnode; pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectMsgFp = dndSendRedirectMsg; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->opt.sver; pOption->cfg.sver = pDnode->opt.sver;
......
...@@ -179,9 +179,9 @@ static void dndStopSnodeWorker(SDnode *pDnode) { ...@@ -179,9 +179,9 @@ static void dndStopSnodeWorker(SDnode *pDnode) {
static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
pOption->pDnode = pDnode; pOption->pDnode = pDnode;
pOption->sendMsgToDnodeFp = dndSendMsgToDnode; pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendMsgToMnodeFp = dndSendMsgToMnode; pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectMsgFp = dndSendRedirectMsg; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->opt.sver; pOption->cfg.sver = pDnode->opt.sver;
......
...@@ -105,8 +105,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -105,8 +105,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
...@@ -216,7 +214,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -216,7 +214,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (dndGetStat(pDnode) == DND_STAT_STOPPED) { if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_EXITING}; SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_OFFLINE};
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
return; return;
...@@ -383,14 +381,19 @@ void dndCleanupTrans(SDnode *pDnode) { ...@@ -383,14 +381,19 @@ void dndCleanupTrans(SDnode *pDnode) {
dInfo("dnode-transport is cleaned up"); dInfo("dnode-transport is cleaned up");
} }
void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
if (pMgmt->clientRpc == NULL) return; if (pMgmt->clientRpc == NULL) {
terrno = TSDB_CODE_DND_OFFLINE;
return -1;
}
rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL); rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL);
return 0;
} }
void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pMsg) { int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0}; SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet); dndGetMnodeEpSet(pDnode, &epSet);
dndSendMsgToDnode(pDnode, &epSet, pMsg); return dndSendReqToDnode(pDnode, &epSet, pMsg);
} }
...@@ -213,7 +213,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { ...@@ -213,7 +213,7 @@ SDnode *dndInit(SDnodeOpt *pOption) {
} }
dndSetStat(pDnode, DND_STAT_RUNNING); dndSetStat(pDnode, DND_STAT_RUNNING);
dndSendStatusMsg(pDnode); dndSendStatusReq(pDnode);
dndReportStartup(pDnode, "TDengine", "initialized successfully"); dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully, pDnode:%p", pDnode); dInfo("TDengine is initialized successfully, pDnode:%p", pDnode);
......
...@@ -91,15 +91,16 @@ typedef struct SMnode { ...@@ -91,15 +91,16 @@ typedef struct SMnode {
STelemMgmt telemMgmt; STelemMgmt telemMgmt;
SSyncMgmt syncMgmt; SSyncMgmt syncMgmt;
MndMsgFp msgFp[TDMT_MAX]; MndMsgFp msgFp[TDMT_MAX];
SendMsgToDnodeFp sendMsgToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectRspFp sendRedirectRspFp;
PutReqToMWriteQFp putReqToMWriteQFp;
} SMnode; } SMnode;
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg); int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg); void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg);
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
uint64_t mndGenerateUid(char *name, int32_t len) ; uint64_t mndGenerateUid(char *name, int32_t len) ;
......
...@@ -45,6 +45,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); ...@@ -45,6 +45,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SMnodeMsg *pMsg); void mndTransProcessRsp(SMnodeMsg *pMsg);
void mndTransPullup(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -557,7 +557,7 @@ static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { ...@@ -557,7 +557,7 @@ static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) {
.ahandle = pMsg->rpcMsg.ahandle}; .ahandle = pMsg->rpcMsg.ahandle};
mInfo("dnode:%d, app:%p config:%s req send to dnode", pCfg->dnodeId, rpcMsg.ahandle, pCfg->config); mInfo("dnode:%d, app:%p config:%s req send to dnode", pCfg->dnodeId, rpcMsg.ahandle, pCfg->config);
mndSendMsgToDnode(pMnode, &epSet, &rpcMsg); mndSendReqToDnode(pMnode, &epSet, &rpcMsg);
return 0; return 0;
} }
......
...@@ -83,6 +83,8 @@ static int32_t mndRestoreWal(SMnode *pMnode) { ...@@ -83,6 +83,8 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
int64_t sdbVer = sdbUpdateVer(pSdb, 0); int64_t sdbVer = sdbUpdateVer(pSdb, 0);
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer); mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
mndTransPullup(pMnode);
if (walBeginSnapshot(pWal, sdbVer) < 0) { if (walBeginSnapshot(pWal, sdbVer) < 0) {
goto WAL_RESTORE_OVER; goto WAL_RESTORE_OVER;
} }
......
...@@ -52,7 +52,6 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); ...@@ -52,7 +52,6 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
static void mndTransSendRpcRsp(STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans);
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg); static int32_t mndProcessTransMsg(SMnodeMsg *pMsg);
static int32_t mndProcessTransRsp(SMnodeMsg *pMsg);
int32_t mndInitTrans(SMnode *pMnode) { int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TRANS, SSdbTable table = {.sdbType = SDB_TRANS,
...@@ -64,7 +63,6 @@ int32_t mndInitTrans(SMnode *pMnode) { ...@@ -64,7 +63,6 @@ int32_t mndInitTrans(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndTransActionDelete}; .deleteFp = (SdbDeleteFp)mndTransActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg); mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_RSP, mndProcessTransRsp);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
...@@ -615,12 +613,15 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -615,12 +613,15 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
} }
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
pAction->msgSent = 1; if (mndSendReqToDnode(pMnode, &pAction->epSet, &rpcMsg) == 0) {
pAction->msgReceived = 0; mDebug("trans:%d, action:%d is sent", pTrans->id, action);
pAction->errCode = 0; pAction->msgSent = 1;
pAction->msgReceived = 0;
mDebug("trans:%d, action:%d is sent", pTrans->id, action); pAction->errCode = 0;
mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg); } else {
mDebug("trans:%d, action:%d not sent since %s", pTrans->id, action, terrstr());
return -1;
}
} }
return 0; return 0;
...@@ -885,7 +886,11 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { ...@@ -885,7 +886,11 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
} }
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; mndTransPullup(pMsg->pMnode);
return 0;
}
void mndTransPullup(SMnode *pMnode) {
STrans *pTrans = NULL; STrans *pTrans = NULL;
void *pIter = NULL; void *pIter = NULL;
...@@ -897,5 +902,3 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { ...@@ -897,5 +902,3 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
sdbRelease(pMnode->pSdb, pTrans); sdbRelease(pMnode->pSdb, pTrans);
} }
} }
static int32_t mndProcessTransRsp(SMnodeMsg *pMsg) { return 0; }
\ No newline at end of file
...@@ -34,21 +34,27 @@ ...@@ -34,21 +34,27 @@
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) { if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
(*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg); terrno = TSDB_CODE_MND_NOT_READY;
return -1;
} }
return (*pMnode->sendReqToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
} }
void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg) { int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendMsgToMnodeFp != NULL) { if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
(*pMnode->sendMsgToMnodeFp)(pMnode->pDnode, pMsg); terrno = TSDB_CODE_MND_NOT_READY;
return -1;
} }
return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg);
} }
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) { void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendRedirectMsgFp != NULL) { if (pMnode != NULL && pMnode->sendRedirectRspFp != NULL) {
(*pMnode->sendRedirectMsgFp)(pMnode->pDnode, pMsg); (*pMnode->sendRedirectRspFp)(pMnode->pDnode, pMsg);
} }
} }
...@@ -56,11 +62,8 @@ static void mndTransReExecute(void *param, void *tmrId) { ...@@ -56,11 +62,8 @@ static void mndTransReExecute(void *param, void *tmrId) {
SMnode *pMnode = param; SMnode *pMnode = param;
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg)); STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg));
SEpSet epSet = {.inUse = 0, .numOfEps = 1}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)};
epSet.port[0] = pMnode->replicas[pMnode->selfIndex].port; pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
memcpy(epSet.fqdn[0], pMnode->replicas[pMnode->selfIndex].fqdn, TSDB_FQDN_LEN);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)};
mndSendMsgToDnode(pMnode, &epSet, &rpcMsg);
} }
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer); taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
...@@ -76,7 +79,7 @@ static int32_t mndInitTimer(SMnode *pMnode) { ...@@ -76,7 +79,7 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1; return -1;
} }
if (taosTmrReset(mndTransReExecute, 1000, pMnode, pMnode->timer, &pMnode->transTimer)) { if (taosTmrReset(mndTransReExecute, 6000, pMnode, pMnode->timer, &pMnode->transTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -223,9 +226,10 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -223,9 +226,10 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->selfIndex = pOption->selfIndex; pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pDnode = pOption->pDnode; pMnode->pDnode = pOption->pDnode;
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp;
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp;
pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp;
pMnode->cfg.sver = pOption->cfg.sver; pMnode->cfg.sver = pOption->cfg.sver;
pMnode->cfg.enableTelem = pOption->cfg.enableTelem; pMnode->cfg.enableTelem = pOption->cfg.enableTelem;
pMnode->cfg.statusInterval = pOption->cfg.statusInterval; pMnode->cfg.statusInterval = pOption->cfg.statusInterval;
...@@ -236,8 +240,9 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -236,8 +240,9 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->cfg.gitinfo = strdup(pOption->cfg.gitinfo); pMnode->cfg.gitinfo = strdup(pOption->cfg.gitinfo);
pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo); pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo);
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || if (pMnode->sendReqToDnodeFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL ||
pMnode->dnodeId < 0 || pMnode->clusterId < 0 || pMnode->cfg.statusInterval < 1) { pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
pMnode->cfg.statusInterval < 1) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS; terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1; return -1;
} }
...@@ -433,7 +438,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) { ...@@ -433,7 +438,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) {
PROCESS_RPC_END: PROCESS_RPC_END:
if (isReq) { if (isReq) {
if (code == TSDB_CODE_APP_NOT_READY) { if (code == TSDB_CODE_APP_NOT_READY) {
mndSendRedirectMsg(pMnode, &pMsg->rpcMsg); mndSendRedirectRsp(pMnode, &pMsg->rpcMsg);
} else if (code != 0) { } else if (code != 0) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
* @file trans.cpp * @file trans.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief MNODE module trans tests * @brief MNODE module trans tests
* @version 1.0 * @version 0.1
* @date 2022-01-04 * @date 2022-01-04
* *
* @copyright Copyright (c) 2022 * @copyright Copyright (c) 2022
...@@ -75,10 +75,10 @@ TEST_F(DndTestTrans, 01_CreateUser_Crash) { ...@@ -75,10 +75,10 @@ TEST_F(DndTestTrans, 01_CreateUser_Crash) {
test.SendShowRetrieveMsg(); test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2); EXPECT_EQ(test.GetShowRows(), 2);
CheckBinary("u1", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
CheckBinary("u2", TSDB_USER_LEN);
CheckBinary("super", 10);
CheckBinary("normal", 10); CheckBinary("normal", 10);
CheckBinary("super", 10);
CheckTimestamp(); CheckTimestamp();
CheckTimestamp(); CheckTimestamp();
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
......
...@@ -32,9 +32,9 @@ typedef struct SQnode { ...@@ -32,9 +32,9 @@ typedef struct SQnode {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SQnodeCfg cfg; SQnodeCfg cfg;
SendMsgToDnodeFp sendMsgToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectRspFp sendRedirectRspFp;
} SQnode; } SQnode;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -32,9 +32,9 @@ typedef struct SSnode { ...@@ -32,9 +32,9 @@ typedef struct SSnode {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SSnodeCfg cfg; SSnodeCfg cfg;
SendMsgToDnodeFp sendMsgToDnodeFp; SendReqToDnodeFp sendReqToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectRspFp sendRedirectRspFp;
} SSnode; } SSnode;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -1195,7 +1195,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1195,7 +1195,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
} }
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_EXITING) { } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_OFFLINE) {
pContext->code = pHead->code; pContext->code = pHead->code;
rpcProcessConnError(pContext, NULL); rpcProcessConnError(pContext, NULL);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
......
...@@ -253,7 +253,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exis ...@@ -253,7 +253,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exis
// dnode // dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message length") TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message length")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_READ_FILE_ERROR, "Read dnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_READ_FILE_ERROR, "Read dnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR, "Write dnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR, "Write dnode.json error")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册