提交 e18d7de9 编写于 作者: D dapan1121

feature/qnode

上级 77898958
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
typedef struct SRpcMsg SRpcMsg; typedef struct SRpcMsg SRpcMsg;
typedef struct SEpSet SEpSet; typedef struct SEpSet SEpSet;
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SMgmtWrapper SMgmtWrapper;
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; typedef enum { QUERY_QUEUE, FETCH_QUEUE, READ_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype);
......
...@@ -28,6 +28,7 @@ typedef struct SMnodeMgmt { ...@@ -28,6 +28,7 @@ typedef struct SMnodeMgmt {
SDnode *pDnode; SDnode *pDnode;
SMgmtWrapper *pWrapper; SMgmtWrapper *pWrapper;
const char *path; const char *path;
SSingleWorker queryWorker;
SSingleWorker readWorker; SSingleWorker readWorker;
SSingleWorker writeWorker; SSingleWorker writeWorker;
SSingleWorker syncWorker; SSingleWorker syncWorker;
...@@ -57,11 +58,13 @@ void mmStopWorker(SMnodeMgmt *pMgmt); ...@@ -57,11 +58,13 @@ void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessQueryMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_DND_MNODE_INT_H_*/ #endif /*_TD_DND_MNODE_INT_H_*/
\ No newline at end of file
...@@ -45,7 +45,8 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { ...@@ -45,7 +45,8 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToReadQueue; msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
...@@ -258,4 +259,4 @@ int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo ...@@ -258,4 +259,4 @@ int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo
SMonGrantInfo *pGrantInfo) { SMonGrantInfo *pGrantInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
} }
\ No newline at end of file
...@@ -156,10 +156,10 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { ...@@ -156,10 +156,10 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessReadMsg, MND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessReadMsg, MND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessReadMsg, MND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessReadMsg, MND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)mmProcessReadMsg, MND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
} }
...@@ -61,6 +61,10 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -61,6 +61,10 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
} }
int32_t mmProcessQueryMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
}
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) { if (pMsg == NULL) {
...@@ -90,11 +94,21 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { ...@@ -90,11 +94,21 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc); return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc);
} }
int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc);
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
SSingleWorkerCfg readCfg = {.minNum = 2, .maxNum = 2, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->readWorker, &readCfg) != 0) { if (tSingleWorkerInit(&pMgmt->queryWorker, &cfg) != 0) {
dError("failed to start mnode-query worker since %s", terrstr());
return -1;
}
if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) {
dError("failed to start mnode-read worker since %s", terrstr()); dError("failed to start mnode-read worker since %s", terrstr());
return -1; return -1;
} }
...@@ -115,6 +129,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -115,6 +129,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
void mmStopWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker); tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker); tSingleWorkerCleanup(&pMgmt->syncWorker);
dDebug("mnode workers are closed"); dDebug("mnode workers are closed");
......
...@@ -80,7 +80,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { ...@@ -80,7 +80,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
.pCont = pReq, .pCont = pReq,
.contLen = contLen, .contLen = contLen,
}; };
tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer); taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
...@@ -92,7 +92,7 @@ static void mndPullupTelem(void *param, void *tmrId) { ...@@ -92,7 +92,7 @@ static void mndPullupTelem(void *param, void *tmrId) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer); taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册