提交 877cb761 编写于 作者: S Shengliang Guan

refactor: mnode worker

上级 65686df0
......@@ -83,6 +83,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
*/
int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg);
/**
* @brief Generate machine code
......
......@@ -53,20 +53,18 @@ void mmRelease(SMnodeMgmt *pMgmt);
SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg);
// mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt);
void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
#ifdef __cplusplus
}
......
......@@ -43,7 +43,6 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
pOption->deploy = true;
pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
pOption->replica = 1;
pOption->selfIndex = 0;
......@@ -54,8 +53,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
}
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->deploy = false;
pOption->standby = false;
pOption->deploy = false;
pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
......@@ -105,7 +104,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path;
pMgmt->name = pInput->name;
pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutRpcMsgToQueue;
pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutMsgToQueue;
pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL);
......
......@@ -26,7 +26,7 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
tmsgSendRsp(&rsp);
}
static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
dTrace("msg:%p, get from mnode queue", pMsg);
......@@ -53,11 +53,10 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from mnode-sync queue", pMsg);
pMsg->info.node = pMgmt->pMnode;
dTrace("msg:%p, get from mnode-sync queue", pMsg);
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = ntohl(pHead->contLen);
......@@ -70,64 +69,63 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
if (mmAcquire(pMgmt) == 0) {
dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg);
mmRelease(pMgmt);
return 0;
} else {
dTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(),
TMSG_INFO(pMsg->msgType));
return -1;
}
}
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg);
inline int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
}
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
inline int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
}
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
inline int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
}
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
mndPreprocessQueryMsg(pMgmt->pMnode, pMsg);
return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
inline int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
if (mndPreprocessQueryMsg(pMgmt->pMnode, pMsg) != 0) {
dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
return -1;
}
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
}
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
inline int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg);
}
int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
inline int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
dTrace("msg:%p, is created", pMsg);
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
switch (qtype) {
case WRITE_QUEUE:
dTrace("msg:%p, is created and will put into vnode-write queue", pMsg);
taosWriteQitem(pMgmt->writeWorker.queue, pMsg);
return 0;
return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
case QUERY_QUEUE:
dTrace("msg:%p, is created and will put into vnode-query queue", pMsg);
taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return 0;
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
case READ_QUEUE:
dTrace("msg:%p, is created and will put into vnode-read queue", pMsg);
taosWriteQitem(pMgmt->readWorker.queue, pMsg);
return 0;
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
case SYNC_QUEUE:
if (mmAcquire(pMgmt) == 0) {
dTrace("msg:%p, is created and will put into vnode-sync queue", pMsg);
taosWriteQitem(pMgmt->syncWorker.queue, pMsg);
mmRelease(pMgmt);
return 0;
} else {
return -1;
}
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
default:
terrno = TSDB_CODE_INVALID_PARA;
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pMsg->pCont);
return -1;
}
}
......@@ -137,7 +135,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = tsNumOfMnodeQueryThreads,
.max = tsNumOfMnodeQueryThreads,
.name = "mnode-query",
.fp = (FItem)mmProcessQueue,
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
......@@ -149,7 +147,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = tsNumOfMnodeReadThreads,
.max = tsNumOfMnodeReadThreads,
.name = "mnode-read",
.fp = (FItem)mmProcessQueue,
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
......@@ -161,7 +159,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = 1,
.max = 1,
.name = "mnode-write",
.fp = (FItem)mmProcessQueue,
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
......@@ -173,7 +171,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = 1,
.max = 1,
.name = "mnode-sync",
.fp = (FItem)mmProcessSyncQueue,
.fp = (FItem)mmProcessSyncMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
......@@ -185,7 +183,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = 1,
.max = 1,
.name = "mnode-monitor",
.fp = (FItem)mmProcessQueue,
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册