提交 cb2527f7 编写于 作者: S Shengliang Guan

refactor: sync integrate into mnode

上级 68b7f694
...@@ -90,7 +90,6 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); ...@@ -90,7 +90,6 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
*/ */
int32_t mndProcessMsg(SRpcMsg *pMsg); int32_t mndProcessMsg(SRpcMsg *pMsg);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
int32_t mndProcessApplyMsg(SRpcMsg *pMsg);
/** /**
* @brief Generate machine code * @brief Generate machine code
......
...@@ -33,7 +33,6 @@ typedef struct SMnodeMgmt { ...@@ -33,7 +33,6 @@ typedef struct SMnodeMgmt {
SSingleWorker readWorker; SSingleWorker readWorker;
SSingleWorker writeWorker; SSingleWorker writeWorker;
SSingleWorker syncWorker; SSingleWorker syncWorker;
SSingleWorker applyWorker;
SSingleWorker monitorWorker; SSingleWorker monitorWorker;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica; int8_t replica;
...@@ -60,7 +59,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt); ...@@ -60,7 +59,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt);
void mmStopWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
...@@ -62,18 +62,6 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -62,18 +62,6 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
mndProcessSyncMsg(pMsg); mndProcessSyncMsg(pMsg);
} }
static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from mnode-apply queue", pMsg);
pMsg->info.node = pMgmt->pMnode;
mndProcessApplyMsg(pMsg);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
...@@ -88,10 +76,6 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -88,10 +76,6 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
} }
int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->applyWorker, pMsg);
}
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
} }
...@@ -179,18 +163,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -179,18 +163,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
return -1; return -1;
} }
SSingleWorkerCfg aCfg = {
.min = 1,
.max = 1,
.name = "mnode-apply",
.fp = (FItem)mmProcessApplyQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->applyWorker, &aCfg) != 0) {
dError("failed to start mnode mnode-apply worker since %s", terrstr());
return -1;
}
SSingleWorkerCfg mCfg = { SSingleWorkerCfg mCfg = {
.min = 1, .min = 1,
.max = 1, .max = 1,
......
...@@ -346,12 +346,6 @@ void mndStop(SMnode *pMnode) { ...@@ -346,12 +346,6 @@ void mndStop(SMnode *pMnode) {
return mndCleanupTimer(pMnode); return mndCleanupTimer(pMnode);
} }
int32_t mndProcessApplyMsg(SRpcMsg *pMsg) {
SSdbRaw *pRaw = pMsg->pCont;
SMnode *pMnode = pMsg->info.node;
return sdbWriteWithoutFree(pMnode->pSdb, pRaw);
}
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
void *ahandle = pMsg->info.ahandle; void *ahandle = pMsg->info.ahandle;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册