提交 e06f9fd0 编写于 作者: S Shengliang Guan

refactor: sync integrate into mnode

上级 c4428ba5
...@@ -158,7 +158,6 @@ enum { ...@@ -158,7 +158,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL)
// Requests handled by VNODE // Requests handled by VNODE
......
...@@ -89,9 +89,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); ...@@ -89,9 +89,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
* @return int32_t 0 for success, -1 for failure. * @return int32_t 0 for success, -1 for failure.
*/ */
int32_t mndProcessMsg(SRpcMsg *pMsg); int32_t mndProcessMsg(SRpcMsg *pMsg);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
int32_t mndProcessApplyMsg(SRpcMsg *pMsg); int32_t mndProcessApplyMsg(SRpcMsg *pMsg);
/** /**
......
...@@ -59,32 +59,17 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -59,32 +59,17 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
pMsg->info.node = pMgmt->pMnode; pMsg->info.node = pMgmt->pMnode;
mndProcessSyncMsg(pMsg); mndProcessSyncMsg(pMsg);
return;
} }
static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1; dTrace("msg:%p, get from mnode-apply queue", pMsg);
tmsg_t msgType = pMsg->msgType;
bool isRequest = msgType & 1U;
dTrace("msg:%p, get from mnode-query queue", pMsg);
pMsg->info.node = pMgmt->pMnode; pMsg->info.node = pMgmt->pMnode;
mndProcessApplyMsg(pMsg); mndProcessApplyMsg(pMsg);
/* dTrace("msg:%p, is freed", pMsg);
if (isRequest) {
if (pMsg->info.handle != NULL && code != 0) {
if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code);
}
}
*/
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "mndDef.h" #include "mndDef.h"
#include "sdb.h" #include "sdb.h"
#include "syncTools.h"
#include "tcache.h" #include "tcache.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -31,12 +32,14 @@ ...@@ -31,12 +32,14 @@
extern "C" { extern "C" {
#endif #endif
// clang-format off
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} #define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} #define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} #define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
// clang-format on
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
...@@ -75,9 +78,7 @@ typedef struct { ...@@ -75,9 +78,7 @@ typedef struct {
int32_t errCode; int32_t errCode;
sem_t syncSem; sem_t syncSem;
SWal *pWal; SWal *pWal;
//SSyncNode *pSyncNode; int64_t sync;
int64_t sync;
ESyncState state; ESyncState state;
} SSyncMgmt; } SSyncMgmt;
......
...@@ -125,24 +125,11 @@ _OVER: ...@@ -125,24 +125,11 @@ _OVER:
} }
int32_t mndSyncEqMsg(const SMsgCb* msgcb, SRpcMsg *pMsg) { int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
int32_t ret = 0;
if (msgcb->queueFps[SYNC_QUEUE] != NULL) {
tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
} else {
mError("mndSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
}
return ret;
}
int32_t mndSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
int32_t ret = 0;
pMsg->info.noResp = 1;
tmsgSendReq(pEpSet, pMsg);
return ret;
}
void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshot != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
...@@ -163,26 +150,26 @@ void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMe ...@@ -163,26 +150,26 @@ void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMe
} }
} }
void mndSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
// strict consistent, do nothing // strict consistent, do nothing
} }
void mndSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
// strict consistent, do nothing // strict consistent, do nothing
} }
int32_t mndSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
// snapshot // snapshot
return 0; return 0;
} }
SSyncFSM *syncMnodeMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode; pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitCb; pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpPreCommitCb = mndSyncPreCommitCb; pFsm->FpPreCommitCb = mndSyncPreCommitMsg;
pFsm->FpRollBackCb = mndSyncRollBackCb; pFsm->FpRollBackCb = mndSyncRollBackMsg;
pFsm->FpGetSnapshot = mndSyncGetSnapshotCb; pFsm->FpGetSnapshot = mndSyncGetSnapshot;
return pFsm; return pFsm;
} }
...@@ -195,18 +182,11 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -195,18 +182,11 @@ int32_t mndInitSync(SMnode *pMnode) {
return -1; return -1;
} }
if (mndRestoreWal(pMnode) < 0) {
mError("failed to restore wal since %s", terrstr());
return -1;
}
if (pMnode->selfId == 1) { if (pMnode->selfId == 1) {
pMgmt->state = TAOS_SYNC_STATE_LEADER; pMgmt->state = TAOS_SYNC_STATE_LEADER;
} }
// pMgmt->pSyncNode = NULL; SSyncInfo syncInfo = {.vgId = 1};
SSyncInfo syncInfo;
syncInfo.vgId = 1;
SSyncCfg *pCfg = &(syncInfo.syncCfg); SSyncCfg *pCfg = &(syncInfo.syncCfg);
pCfg->replicaNum = pMnode->replica; pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex; pCfg->myIndex = pMnode->selfIndex;
...@@ -216,9 +196,8 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -216,9 +196,8 @@ int32_t mndInitSync(SMnode *pMnode) {
} }
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path);
syncInfo.pWal = pMnode->syncMgmt.pWal; syncInfo.pWal = pMnode->syncMgmt.pWal;
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
syncInfo.pFsm = syncMnodeMakeFsm(pMnode); syncInfo.FpSendMsg = mndSyncSendMsg;
syncInfo.FpSendMsg = mndSendMsg;
syncInfo.FpEqMsg = mndSyncEqMsg; syncInfo.FpEqMsg = mndSyncEqMsg;
pMnode->syncMgmt.sync = syncOpen(&syncInfo); pMnode->syncMgmt.sync = syncOpen(&syncInfo);
......
...@@ -343,25 +343,18 @@ int32_t mndStart(SMnode *pMnode) { ...@@ -343,25 +343,18 @@ int32_t mndStart(SMnode *pMnode) {
void mndStop(SMnode *pMnode) { void mndStop(SMnode *pMnode) {
syncStop(pMnode->syncMgmt.sync); syncStop(pMnode->syncMgmt.sync);
return mndCleanupTimer(pMnode); return mndCleanupTimer(pMnode);
} }
int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { int32_t mndProcessApplyMsg(SRpcMsg *pMsg) {
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); return sdbWriteWithoutFree(pMnode->pSdb, pRaw);
rpcFreeCont(pMsg->pCont);
return code;
} }
#include "syncTools.h"
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
void *ahandle = pMsg->info.ahandle; void *ahandle = pMsg->info.ahandle;
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) { if (syncEnvIsStart()) {
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdint.h>
#include "sync.h" #include "sync.h"
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册