未验证 提交 7571d9d9 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #14024 from taosdata/fix/mnode

refactor: get sdb snapshot in atomic way
...@@ -95,8 +95,8 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); ...@@ -95,8 +95,8 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
*/ */
int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
int32_t mndPreProcessMsg(SRpcMsg *pMsg); int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg);
void mndAbortPreprocessMsg(SRpcMsg *pMsg); void mndPostProcessQueryMsg(SRpcMsg *pMsg);
/** /**
* @brief Generate machine code * @brief Generate machine code
......
...@@ -68,6 +68,10 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -68,6 +68,10 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
mmSendRsp(pMsg, code); mmSendRsp(pMsg, code);
} }
if (code == TSDB_CODE_RPC_REDIRECT) {
mndPostProcessQueryMsg(pMsg);
}
dTrace("msg:%p, is freed, code:0x%x", pMsg, code); dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
...@@ -116,7 +120,7 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -116,7 +120,7 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pMsg->info.node = pMgmt->pMnode; pMsg->info.node = pMgmt->pMnode;
if (mndPreProcessMsg(pMsg) != 0) { if (mndPreProcessQueryMsg(pMsg) != 0) {
dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
return -1; return -1;
} }
......
...@@ -550,12 +550,10 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -550,12 +550,10 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && if (pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
pMsg->msgType != TDMT_MND_TRANS_TIMER) { pMsg->msgType != TDMT_MND_TRANS_TIMER) {
mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
mndAbortPreprocessMsg(pMsg);
SEpSet epSet = {0}; SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet); mndGetMnodeEpSet(pMsg->info.node, &epSet);
......
...@@ -18,16 +18,14 @@ ...@@ -18,16 +18,14 @@
#include "mndMnode.h" #include "mndMnode.h"
#include "qworker.h" #include "qworker.h"
int32_t mndPreProcessMsg(SRpcMsg *pMsg) { int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) return 0; if (TDMT_VND_QUERY != pMsg->msgType) return 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
} }
void mndAbortPreprocessMsg(SRpcMsg *pMsg) { void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) return; if (TDMT_VND_QUERY != pMsg->msgType) return;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
} }
......
...@@ -75,12 +75,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM ...@@ -75,12 +75,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
} }
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
// TODO: mInfo("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data;
// atomic operation return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
// step1. sdbGetCommitInfo &pSnapshot->lastConfigIndex);
// step2. create ppReader with pReaderParam
return 0; return 0;
} }
...@@ -106,14 +104,6 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM ...@@ -106,14 +104,6 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
#if 0
// send response
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info);
#endif
pMgmt->errCode = cbMeta.code; pMgmt->errCode = cbMeta.code;
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
cbMeta.code, cbMeta.index, cbMeta.term); cbMeta.code, cbMeta.index, cbMeta.term);
...@@ -130,7 +120,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM ...@@ -130,7 +120,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) {
mInfo("start to read snapshot from sdb"); mInfo("start to read snapshot from sdb");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader); return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
} }
int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
......
...@@ -910,7 +910,7 @@ TEST_F(MndTestSdb, 01_Read_Str) { ...@@ -910,7 +910,7 @@ TEST_F(MndTestSdb, 01_Read_Str) {
int32_t len = 0; int32_t len = 0;
int32_t code = 0; int32_t code = 0;
code = sdbStartRead(pSdb, &pReader); code = sdbStartRead(pSdb, &pReader, NULL, NULL, NULL);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = sdbStartWrite(pSdb, &pWritter); code = sdbStartWrite(pSdb, &pWritter);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
......
...@@ -388,7 +388,7 @@ SSdbRow *sdbAllocRow(int32_t objSize); ...@@ -388,7 +388,7 @@ SSdbRow *sdbAllocRow(int32_t objSize);
void *sdbGetRowObj(SSdbRow *pRow); void *sdbGetRowObj(SSdbRow *pRow);
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter); int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config);
int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter); int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter);
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len); int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len);
......
...@@ -523,7 +523,7 @@ static void sdbCloseIter(SSdbIter *pIter) { ...@@ -523,7 +523,7 @@ static void sdbCloseIter(SSdbIter *pIter) {
taosMemoryFree(pIter); taosMemoryFree(pIter);
} }
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) {
SSdbIter *pIter = sdbCreateIter(pSdb); SSdbIter *pIter = sdbCreateIter(pSdb);
if (pIter == NULL) return -1; if (pIter == NULL) return -1;
...@@ -552,6 +552,10 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -552,6 +552,10 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
} }
*ppIter = pIter; *ppIter = pIter;
if (index != NULL) *index = commitIndex;
if (term != NULL) *term = commitTerm;
if (config != NULL) *config = commitConfig;
mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
pIter, commitIndex, commitTerm, commitConfig, pIter->name); pIter, commitIndex, commitTerm, commitConfig, pIter->name);
return 0; return 0;
......
...@@ -148,7 +148,7 @@ endi ...@@ -148,7 +148,7 @@ endi
print =============== step6: stop mnode1 print =============== step6: stop mnode1
system sh/exec.sh -n dnode1 -s stop -x SIGKILL system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sql_error drop mnode on dnode 1 # sql_error drop mnode on dnode 1
$x = 0 $x = 0
step61: step61:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册