提交 9cb40b16 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format

...@@ -146,6 +146,7 @@ int32_t syncGetVgId(int64_t rid); ...@@ -146,6 +146,7 @@ int32_t syncGetVgId(int64_t rid);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -111,9 +111,16 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -111,9 +111,16 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
if (mmAcquire(pMgmt) != 0) return -1; int32_t code = -1;
int32_t code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); if (mmAcquire(pMgmt) == 0) {
code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
mmRelease(pMgmt); mmRelease(pMgmt);
}
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code; return code;
} }
......
...@@ -17,13 +17,7 @@ ...@@ -17,13 +17,7 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
}
return code;
}
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
...@@ -33,7 +27,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM ...@@ -33,7 +27,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state)); mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
sdbWriteWithoutFree(pSdb, pRaw); sdbWriteWithoutFree(pSdb, pRaw);
sdbSetApplyIndex(pSdb, cbMeta.index); sdbSetApplyIndex(pSdb, cbMeta.index);
sdbSetApplyTerm(pSdb, cbMeta.term); sdbSetApplyTerm(pSdb, cbMeta.term);
......
...@@ -157,6 +157,18 @@ ESyncState syncGetMyRole(int64_t rid) { ...@@ -157,6 +157,18 @@ ESyncState syncGetMyRole(int64_t rid) {
return state; return state;
} }
bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
assert(rid == pSyncNode->rid);
bool b = pSyncNode->restoreFinish;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return b;
}
const char* syncGetMyRoleStr(int64_t rid) { const char* syncGetMyRoleStr(int64_t rid) {
const char* s = syncUtilState2String(syncGetMyRole(rid)); const char* s = syncUtilState2String(syncGetMyRole(rid));
return s; return s;
...@@ -308,8 +320,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -308,8 +320,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
rpcFreeCont(pMsg->pCont);
return TAOS_SYNC_PROPOSE_OTHER_ERROR; return TAOS_SYNC_PROPOSE_OTHER_ERROR;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册