diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a369b81d2643f4a560ffb50a55d760aaa7bf2c53..2d21b3531acf1b5c115cc206b826cb087367ab27 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,7 +24,7 @@ extern "C" { #include "tdef.h" #include "tmsgcb.h" -#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 typedef uint64_t SyncNodeId; @@ -44,14 +44,6 @@ typedef enum { TAOS_SYNC_STATE_ERROR = 103, } ESyncState; -typedef enum { - TAOS_SYNC_PROPOSE_SUCCESS = 0, - TAOS_SYNC_PROPOSE_NOT_LEADER = 1, - TAOS_SYNC_ONLY_ONE_REPLICA = 2, - TAOS_SYNC_NOT_IN_NEW_CONFIG = 3, - TAOS_SYNC_OTHER_ERROR = 100, -} ESyncProposeCode; - typedef enum { TAOS_SYNC_FSM_CB_SUCCESS = 0, TAOS_SYNC_FSM_CB_OTHER_ERROR = 1, diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 30cbbbeb116f294d07e0a266c8311ca11d5a6d46..6fc84e023daa12bf008a79c8825fde2ae6e23498 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -411,6 +411,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910) +#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911) +#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 59599ee13432ac4c80140aad15d78c49bd829182..00118eac1ee7c9268771f5d6198e1b8bdf12ea89 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -380,17 +380,19 @@ void mndStop(SMnode *pMnode) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - int32_t code = TAOS_SYNC_OTHER_ERROR; + int32_t code = 0; if (!syncEnvIsStart()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); if (pSyncNode == NULL) { mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } char logBuf[512] = {0}; @@ -451,7 +453,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { tmsgSendRsp(&rsp); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = TAOS_SYNC_OTHER_ERROR; + code = -1; } } else { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { @@ -492,10 +494,13 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { tmsgSendRsp(&rsp); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = TAOS_SYNC_OTHER_ERROR; + code = -1; } } + if (code != 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + } return code; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 99400373569d4d3b6ac08c9bb47c734011fcf8a0..63708aef8a898fa24dab32ee9456d09f52f06afb 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -234,9 +234,9 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); if (code == 0) { tsem_wait(&pMgmt->syncSem); - } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { terrno = TSDB_CODE_APP_NOT_READY; - } else if (code == TAOS_SYNC_OTHER_ERROR) { + } else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } else { terrno = TSDB_CODE_APP_ERROR; @@ -257,13 +257,13 @@ void mndSyncStart(SMnode *pMnode) { syncStart(pMgmt->sync); mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); -/* - if (pMgmt->standby) { - syncStartStandBy(pMgmt->sync); - } else { - syncStart(pMgmt->sync); - } -*/ + /* + if (pMgmt->standby) { + syncStartStandBy(pMgmt->sync); + } else { + syncStart(pMgmt->sync); + } + */ } void mndSyncStop(SMnode *pMnode) {} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index cd64bc8a9c2e8af181ba217cdd12d6b51f1d9d2f..34ff9ffa0fd2deeb86a0580af1648bfedc84e125 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -296,7 +296,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { } int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - int32_t ret = TAOS_SYNC_OTHER_ERROR; + int32_t ret = 0; if (syncEnvIsStart()) { SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); @@ -381,15 +381,18 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { tmsgSendRsp(&rsp); } else { vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); - ret = TAOS_SYNC_OTHER_ERROR; + ret = -1; } syncNodeRelease(pSyncNode); } else { vError("==vnodeProcessSyncReq== error syncEnv stop"); - ret = TAOS_SYNC_OTHER_ERROR; + ret = -1; } + if (ret != 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + } return ret; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 17c2c186be920704847fccb30b3dc218a2f45755..975dff9fb655a6f3f87fd9a66e0814afcfff6d1d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -98,7 +98,8 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { if (code == 0) { vnodeAccumBlockMsg(pVnode, pMsg->msgType); - } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + + } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { SEpSet newEpSet = {0}; syncGetEpSet(pVnode->sync, &newEpSet); SEp *pEp = &newEpSet.eps[newEpSet.inUse]; @@ -247,29 +248,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } -int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { - return 0; -} +int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; } -int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { - return 0; -} +int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; } -int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { - return 0; -} +int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; } -int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { - return 0; -} +int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; } -int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { - return 0; -} +int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; } -int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { - return 0; -} +int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; } static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7b1fd010ab70d6310017a9310bb8134af3691f32..953294e489a1917ff912307ad4071c4d8c90385a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -149,12 +149,14 @@ void syncStop(int64_t rid) { int32_t syncSetStandby(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } // state change @@ -177,7 +179,8 @@ int32_t syncSetStandby(int64_t rid) { int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); @@ -201,7 +204,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg if (!IamInNew) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_NOT_IN_NEW_CONFIG; + terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; + return -1; } char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); @@ -219,7 +223,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); @@ -246,7 +251,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { if (!IamInNew) { sError("sync reconfig error, not in new config"); taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_NOT_IN_NEW_CONFIG; + terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; + return -1; } char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); @@ -272,13 +278,15 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { int32_t syncLeaderTransfer(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); if (pSyncNode->peersNum == 0) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; @@ -291,7 +299,8 @@ int32_t syncLeaderTransfer(int64_t rid) { int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); int32_t ret = 0; @@ -299,7 +308,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { if (pSyncNode->replicaNum == 1) { sError("only one replica, cannot drop leader"); taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_ONLY_ONE_REPLICA; + terrno = TSDB_CODE_SYN_ONE_REPLICA; + return -1; } SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); @@ -538,11 +548,12 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { } int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; + int32_t ret = 0; SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } assert(rid == pSyncNode->rid); sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); @@ -553,7 +564,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { } int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; + int32_t ret = 0; sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -567,14 +578,17 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { - ret = TAOS_SYNC_PROPOSE_SUCCESS; + ret = 0; } else { + ret = -1; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("syncPropose pSyncNode->FpEqMsg is NULL"); } syncClientRequestDestroy(pSyncMsg); } else { + ret = -1; + terrno = TSDB_CODE_SYN_NOT_LEADER; sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); - ret = TAOS_SYNC_PROPOSE_NOT_LEADER; } return ret; diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 10b54d0aa492a8002762048f6c8e304df97dbfa5..3ee2b458d6044fed8377d2f5a32d9e2c44351957 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -338,7 +338,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 1e64a8a6f7b28e6caf1cd8ffc4dceffd4aadf345..b1dc53d80403e2e62707e9377a890874aceff9d4 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -251,7 +251,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index bf9e34fffbc52121001ac0d9010d7a0b18869f88..d955e64f815ff9b739642c7bb06ea1fcd8eb61e8 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -188,7 +188,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index ebcd7368cc430b38a62a051edff687c85e9ea3e0..21049454f44b40cc583579c1eb899311ab374ad7 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -391,7 +391,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); } else { assert(ret == 0); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8e6284d2cbc40027b883564e506101e724afd8ef..621f947b64c92bdb7582d2a1f0916f977792a18e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -413,6 +413,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") // wal