提交 5832b677 编写于 作者: S Shengliang Guan

shm

上级 148af69b
...@@ -2271,10 +2271,11 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p ...@@ -2271,10 +2271,11 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
struct SRpcMsg; struct SRpcMsg;
struct SEpSet; struct SEpSet;
typedef int32_t (*PutToQueueFp)(void *pMgmt, struct SRpcMsg *pReq); struct SMgmtWrapper;
typedef int32_t (*SendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(void *pMgmt, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* rpcMsg);
typedef int32_t (*SendRspFp)(void *pMgmt, struct SRpcMsg *rpcMsg); typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg);
typedef int32_t (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,24 +21,21 @@ extern "C" { ...@@ -21,24 +21,21 @@ extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SBnode SBnode; typedef struct SBnode SBnode;
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg);
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef struct { typedef struct {
int64_t numOfErrors; int64_t numOfErrors;
} SBnodeLoad; } SBnodeLoad;
typedef struct { typedef struct {
int32_t sver; int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SDnode *pDnode; SMgmtWrapper *pWrapper;
SendReqToDnodeFp sendReqFp; SendReqFp sendReqFp;
SendReqToMnodeFp sendReqToMnodeFp; SendMnodeReqFp sendMnodeReqFp;
SendRedirectRspFp sendRedirectRspFp; SendRspFp sendRspFp;
} SBnodeOpt; } SBnodeOpt;
/* ------------------------ SBnode ------------------------ */ /* ------------------------ SBnode ------------------------ */
......
...@@ -23,13 +23,8 @@ extern "C" { ...@@ -23,13 +23,8 @@ extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SMnode SMnode; typedef struct SMnode SMnode;
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
...@@ -37,11 +32,12 @@ typedef struct { ...@@ -37,11 +32,12 @@ typedef struct {
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
SDnode *pDnode; SMgmtWrapper *pWrapper;
PutToQueueFp putReqToMWriteQFp; PutToQueueFp putToWriteQFp;
PutToQueueFp putReqToMReadQFp; PutToQueueFp putToReadQFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendMnodeReqFp sendReqToMnodeFp; SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SMnodeOpt; } SMnodeOpt;
/* ------------------------ SMnode ------------------------ */ /* ------------------------ SMnode ------------------------ */
......
...@@ -21,11 +21,8 @@ extern "C" { ...@@ -21,11 +21,8 @@ extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SQnode SQnode; typedef struct SQnode SQnode;
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg);
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef struct { typedef struct {
int64_t numOfStartTask; int64_t numOfStartTask;
...@@ -39,13 +36,13 @@ typedef struct { ...@@ -39,13 +36,13 @@ typedef struct {
} SQnodeLoad; } SQnodeLoad;
typedef struct { typedef struct {
int32_t sver; int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SDnode *pDnode; SMgmtWrapper *pWrapper;
SendReqToDnodeFp sendReqFp; SendReqFp sendReqFp;
SendReqToMnodeFp sendReqToMnodeFp; SendMnodeReqFp sendMnodeReqFp;
SendRedirectRspFp sendRedirectRspFp; SendRspFp sendRspFp;
} SQnodeOpt; } SQnodeOpt;
/* ------------------------ SQnode ------------------------ */ /* ------------------------ SQnode ------------------------ */
......
...@@ -24,24 +24,21 @@ extern "C" { ...@@ -24,24 +24,21 @@ extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SSnode SSnode; typedef struct SSnode SSnode;
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg);
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
typedef struct { typedef struct {
int64_t numOfErrors; int64_t numOfErrors;
} SSnodeLoad; } SSnodeLoad;
typedef struct { typedef struct {
int32_t sver; int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SDnode *pDnode; SMgmtWrapper *pWrapper;
SendReqToDnodeFp sendReqFp; SendReqFp sendReqFp;
SendReqToMnodeFp sendReqToMnodeFp; SendMnodeReqFp sendMnodeReqFp;
SendRedirectRspFp sendRedirectRspFp; SendRspFp sendRspFp;
} SSnodeOpt; } SSnodeOpt;
/* ------------------------ SSnode ------------------------ */ /* ------------------------ SSnode ------------------------ */
......
...@@ -177,7 +177,7 @@ static void dndStopBnodeWorker(SDnode *pDnode) { ...@@ -177,7 +177,7 @@ static void dndStopBnodeWorker(SDnode *pDnode) {
static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
pOption->pDnode = pDnode; pOption->pDnode = pDnode;
pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dmSendRedirectRsp; pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId; pOption->clusterId = pDnode->clusterId;
......
...@@ -149,9 +149,9 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp ...@@ -149,9 +149,9 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndSendMonitorReport(SDnode *pDnode); void dndSendMonitorReport(SDnode *pDnode);
int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(void *pWrapper, SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp); int32_t maxNum, void *queueFp);
......
...@@ -279,9 +279,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { ...@@ -279,9 +279,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) {
return 0; return 0;
} }
int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) { int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = wrapper;
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType == PROC_CHILD) {
} else { } else {
STransMgmt *pTrans = &pWrapper->pDnode->trans; STransMgmt *pTrans = &pWrapper->pDnode->trans;
...@@ -289,9 +287,7 @@ int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) { ...@@ -289,9 +287,7 @@ int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) {
} }
} }
int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) { int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = wrapper;
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType == PROC_CHILD) {
} else { } else {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
...@@ -311,9 +307,7 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { ...@@ -311,9 +307,7 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
} }
} }
void dndSendRsp(void *wrapper, SRpcMsg *pRsp) { void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
SMgmtWrapper *pWrapper = wrapper;
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType == PROC_CHILD) {
int32_t code = -1; int32_t code = -1;
do { do {
......
...@@ -30,7 +30,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -30,7 +30,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.updateTime = pMgmt->updateTime; req.updateTime = pMgmt->updateTime;
req.numOfCores = tsNumOfCores; req.numOfCores = tsNumOfCores;
req.numOfSupportVnodes = pDnode->numOfSupportVnodes; req.numOfSupportVnodes = pDnode->numOfSupportVnodes;
memcpy(req.dnodeEp, pDnode->localEp, TSDB_EP_LEN); tstrncpy(req.dnodeEp, pDnode->localEp, TSDB_EP_LEN);
req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.statusInterval = tsStatusInterval;
req.clusterCfg.checkTime = 0; req.clusterCfg.checkTime = 0;
......
...@@ -62,8 +62,8 @@ int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); ...@@ -62,8 +62,8 @@ int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutMsgToWriteQueue(void *wrapper, SRpcMsg *pRpcMsg); int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(void *wrapper, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -135,11 +135,11 @@ int32_t mmDrop(SMnodeMgmt *pMgmt) { ...@@ -135,11 +135,11 @@ int32_t mmDrop(SMnodeMgmt *pMgmt) {
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pMgmt->pDnode;
pOption->pDnode = pDnode; pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue; pOption->putToWriteQFp = mmPutMsgToWriteQueue;
pOption->putReqToMReadQFp = mmPutMsgToReadQueue; pOption->putToReadQFp = mmPutMsgToReadQueue;
pOption->dnodeId = pDnode->dnodeId; pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId; pOption->clusterId = pDnode->clusterId;
} }
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "mmInt.h" #include "mmInt.h"
static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed", pMsg); dTrace("msg:%p, will be processed in mnode queue", pMsg);
SMnode *pMnode = mmAcquire(pMgmt); SMnode *pMnode = mmAcquire(pMgmt);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
bool isReq = (pRpc->msgType & 1U); bool isReq = (pRpc->msgType & 1U);
...@@ -102,7 +102,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -102,7 +102,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
} }
static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker, SRpcMsg *pRpc) { static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) { if (pMsg == NULL) {
return -1; return -1;
...@@ -111,7 +111,7 @@ static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker ...@@ -111,7 +111,7 @@ static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker
dTrace("msg:%p, is created", pMsg); dTrace("msg:%p, is created", pMsg);
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
int32_t code = mmPutMsgToWorker(pWrapper->pMgmt, pWorker, pMsg); int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
if (code != 0) { if (code != 0) {
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
...@@ -121,16 +121,12 @@ static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker ...@@ -121,16 +121,12 @@ static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker
return code; return code;
} }
int32_t mmPutMsgToWriteQueue(void *wrapper, SRpcMsg *pRpc) { int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
// SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); SMnodeMgmt *pMgmt = pWrapper->pMgmt;
// SMnodeMgmt *pMgmt = pWrapper->pMgmt; return mmPutRpcMsgToWorker(pMgmt, &pMgmt->writeWorker, pRpc);
// return mmPutRpcMsgToWorker(pWrapper, &pMgmt->writeWorker, pRpc);
return 0;
} }
int32_t mmPutMsgToReadQueue(void *wrapper, SRpcMsg *pRpc) { int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
// SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); SMnodeMgmt *pMgmt = pWrapper->pMgmt;
// SMnodeMgmt *pMgmt = pWrapper->pMgmt; return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc);
// return mmPutRpcMsgToWorker(pWrapper, &pMgmt->readWorker, pRpc);
return 0;
} }
...@@ -184,7 +184,7 @@ static void dndStopQnodeWorker(SDnode *pDnode) { ...@@ -184,7 +184,7 @@ static void dndStopQnodeWorker(SDnode *pDnode) {
static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
pOption->pDnode = pDnode; pOption->pDnode = pDnode;
pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId; pOption->clusterId = pDnode->clusterId;
......
...@@ -209,7 +209,7 @@ static void dndStopSnodeWorker(SDnode *pDnode) { ...@@ -209,7 +209,7 @@ static void dndStopSnodeWorker(SDnode *pDnode) {
static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
pOption->pDnode = pDnode; pOption->pDnode = pDnode;
pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId; pOption->clusterId = pDnode->clusterId;
......
...@@ -34,7 +34,6 @@ typedef struct SVnodesMgmt { ...@@ -34,7 +34,6 @@ typedef struct SVnodesMgmt {
SWWorkerPool syncPool; SWWorkerPool syncPool;
SWWorkerPool writePool; SWWorkerPool writePool;
const char *path; const char *path;
SMnode *pMnode;
SDnode *pDnode; SDnode *pDnode;
SMgmtWrapper *pWrapper; SMgmtWrapper *pWrapper;
SDnodeWorker mgmtWorker; SDnodeWorker mgmtWorker;
...@@ -100,8 +99,8 @@ void vmStopWorker(SVnodesMgmt *pMgmt); ...@@ -100,8 +99,8 @@ void vmStopWorker(SVnodesMgmt *pMgmt);
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SVnodesMgmt *pMgmt, int32_t vgId, SRpcMsg *pMsg); int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pMsg);
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
......
...@@ -257,10 +257,6 @@ static void vmCleanup(SMgmtWrapper *pWrapper) { ...@@ -257,10 +257,6 @@ static void vmCleanup(SMgmtWrapper *pWrapper) {
dInfo("vnodes-mgmt is cleaned up"); dInfo("vnodes-mgmt is cleaned up");
} }
static int32_t vmSendReqToDnode(SVnodesMgmt *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg) {
return dndSendReqToDnode(pMgmt->pDnode, epSet, rpcMsg);
}
static int32_t vmInit(SMgmtWrapper *pWrapper) { static int32_t vmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt)); SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt));
...@@ -298,8 +294,8 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { ...@@ -298,8 +294,8 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
} }
vnodeOpt.nthreads = tsNumOfCommitThreads; vnodeOpt.nthreads = tsNumOfCommitThreads;
vnodeOpt.putToQueryQFp = (VndPutToQueryQFp)vmPutMsgToQueryQueue; vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue;
vnodeOpt.sendReqFp = (VndSendReqFp)vmSendReqToDnode; vnodeOpt.sendReqFp = dndSendReqToDnode;
if (vnodeInit(&vnodeOpt) != 0) { if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode since %s", terrstr()); dError("failed to init vnode since %s", terrstr());
goto _OVER; goto _OVER;
......
...@@ -146,7 +146,9 @@ int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -146,7 +146,9 @@ int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
} }
} }
int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pRpc) { int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t code = -1; int32_t code = -1;
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
// pHead->vgId = htonl(pHead->vgId); // pHead->vgId = htonl(pHead->vgId);
...@@ -162,11 +164,20 @@ int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pRpc) { ...@@ -162,11 +164,20 @@ int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pRpc) {
return code; return code;
} }
int32_t vmPutMsgToApplyQueue(SVnodesMgmt *pMgmt, int32_t vgId, SRpcMsg *pMsg) { int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc) {
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t code = -1;
SMsgHead *pHead = pRpc->pCont;
// pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg != NULL) {
code = vmWriteMsgToQueue(pVnode->pApplyQ, pMsg, false);
}
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
return code; return code;
} }
......
...@@ -110,7 +110,7 @@ typedef struct SMnode { ...@@ -110,7 +110,7 @@ typedef struct SMnode {
char *path; char *path;
int64_t checkTime; int64_t checkTime;
SSdb *pSdb; SSdb *pSdb;
SDnode *pDnode; SMgmtWrapper *pWrapper;
SArray *pSteps; SArray *pSteps;
SShowMgmt showMgmt; SShowMgmt showMgmt;
SProfileMgmt profileMgmt; SProfileMgmt profileMgmt;
...@@ -120,9 +120,9 @@ typedef struct SMnode { ...@@ -120,9 +120,9 @@ typedef struct SMnode {
SGrantInfo grant; SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX]; MndMsgFp msgFp[TDMT_MAX];
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendMnodeReqFp sendReqToMnodeFp; SendMnodeReqFp sendMnodeReqFp;
PutToQueueFp putReqToMWriteQFp; PutToQueueFp putToWriteQFp;
PutToQueueFp putReqToMReadQFp; PutToQueueFp putToReadQFp;
} SMnode; } SMnode;
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
......
...@@ -420,7 +420,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { ...@@ -420,7 +420,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
.pCont = pRebMsg, .pCont = pRebMsg,
.contLen = sizeof(SMqDoRebalanceMsg), .contLen = sizeof(SMqDoRebalanceMsg),
}; };
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); (*pMnode->putToWriteQFp)(pMnode->pWrapper, &rpcMsg);
} else { } else {
taosHashCleanup(pRebMsg->rebSubHash); taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg); rpcFreeCont(pRebMsg);
......
...@@ -49,7 +49,7 @@ int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -49,7 +49,7 @@ int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
return -1; return -1;
} }
return (*pMnode->sendReqFp)(pMnode->pDnode, pEpSet, pMsg); return (*pMnode->sendReqFp)(pMnode->pWrapper, pEpSet, pMsg);
} }
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) { int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
...@@ -58,7 +58,7 @@ int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) { ...@@ -58,7 +58,7 @@ int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
return -1; return -1;
} }
return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg); return (*pMnode->sendMnodeReqFp)(pMnode->pWrapper, pMsg);
} }
static void *mndBuildTimerMsg(int32_t *pContLen) { static void *mndBuildTimerMsg(int32_t *pContLen) {
...@@ -80,7 +80,7 @@ static void mndPullupTrans(void *param, void *tmrId) { ...@@ -80,7 +80,7 @@ static void mndPullupTrans(void *param, void *tmrId) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); pMnode->putToWriteQFp(pMnode->pWrapper, &rpcMsg);
} }
taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer); taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
...@@ -96,7 +96,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { ...@@ -96,7 +96,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
.pCont = pReq, .pCont = pReq,
.contLen = contLen, .contLen = contLen,
}; };
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg);
} }
taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer); taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
...@@ -108,7 +108,7 @@ static void mndPullupTelem(void *param, void *tmrId) { ...@@ -108,7 +108,7 @@ static void mndPullupTelem(void *param, void *tmrId) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg);
} }
taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer); taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
...@@ -286,14 +286,14 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -286,14 +286,14 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->replica = pOption->replica; pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex; pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pDnode = pOption->pDnode; pMnode->pWrapper = pOption->pWrapper;
pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp; pMnode->putToWriteQFp = pOption->putToWriteQFp;
pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp; pMnode->putToReadQFp = pOption->putToReadQFp;
pMnode->sendReqFp = pOption->sendReqFp; pMnode->sendReqFp = pOption->sendReqFp;
pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp; pMnode->sendMnodeReqFp = pOption->sendMnodeReqFp;
if (pMnode->sendReqFp == NULL || pMnode->sendReqToMnodeFp == NULL || if (pMnode->sendReqFp == NULL || pMnode->sendMnodeReqFp == NULL ||
pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { pMnode->putToWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS; terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1; return -1;
} }
......
...@@ -30,10 +30,8 @@ extern "C" { ...@@ -30,10 +30,8 @@ extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SVnode SVnode; typedef struct SMgmtWrapper SMgmtWrapper;
typedef int32_t (*VndPutToQueryQFp)(void *pMgmt, struct SRpcMsg *pReq); typedef struct SVnode SVnode;
typedef int32_t (*VndSendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef struct { typedef struct {
// TODO // TODO
int32_t reserved; int32_t reserved;
...@@ -62,9 +60,11 @@ typedef struct { ...@@ -62,9 +60,11 @@ typedef struct {
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef struct {
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
VndPutToQueryQFp putToQueryQFp; PutToQueueFp putToQueryQFp;
VndSendReqFp sendReqFp; SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SVnodeOpt; } SVnodeOpt;
typedef struct { typedef struct {
......
...@@ -53,8 +53,8 @@ typedef struct SVnodeMgr { ...@@ -53,8 +53,8 @@ typedef struct SVnodeMgr {
pthread_cond_t hasTask; pthread_cond_t hasTask;
TD_DLIST(SVnodeTask) queue; TD_DLIST(SVnodeTask) queue;
// For vnode Mgmt // For vnode Mgmt
VndPutToQueryQFp putToQueryQFp; PutToQueueFp putToQueryQFp;
VndSendReqFp sendReqFp; SendReqFp sendReqFp;
} SVnodeMgr; } SVnodeMgr;
extern SVnodeMgr vnodeMgr; extern SVnodeMgr vnodeMgr;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册