提交 4b18bd71 编写于 作者: M Minghao Li

refactor(sync): adjust errno

上级 c9f6c204
......@@ -24,7 +24,7 @@ extern "C" {
#include "tdef.h"
#include "tmsgcb.h"
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
typedef uint64_t SyncNodeId;
......@@ -44,14 +44,6 @@ typedef enum {
TAOS_SYNC_STATE_ERROR = 103,
} ESyncState;
typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
TAOS_SYNC_ONLY_ONE_REPLICA = 2,
TAOS_SYNC_NOT_IN_NEW_CONFIG = 3,
TAOS_SYNC_OTHER_ERROR = 100,
} ESyncProposeCode;
typedef enum {
TAOS_SYNC_FSM_CB_SUCCESS = 0,
TAOS_SYNC_FSM_CB_OTHER_ERROR = 1,
......
......@@ -411,6 +411,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A)
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
......
......@@ -380,17 +380,19 @@ void mndStop(SMnode *pMnode) {
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
int32_t code = TAOS_SYNC_OTHER_ERROR;
int32_t code = 0;
if (!syncEnvIsStart()) {
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
if (pSyncNode == NULL) {
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
char logBuf[512] = {0};
......@@ -451,7 +453,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
tmsgSendRsp(&rsp);
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = TAOS_SYNC_OTHER_ERROR;
code = -1;
}
} else {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
......@@ -492,10 +494,13 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
tmsgSendRsp(&rsp);
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = TAOS_SYNC_OTHER_ERROR;
code = -1;
}
}
if (code != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return code;
}
......
......@@ -234,9 +234,9 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
if (code == 0) {
tsem_wait(&pMgmt->syncSem);
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
terrno = TSDB_CODE_APP_NOT_READY;
} else if (code == TAOS_SYNC_OTHER_ERROR) {
} else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} else {
terrno = TSDB_CODE_APP_ERROR;
......@@ -257,13 +257,13 @@ void mndSyncStart(SMnode *pMnode) {
syncStart(pMgmt->sync);
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
/*
if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
*/
/*
if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
*/
}
void mndSyncStop(SMnode *pMnode) {}
......
......@@ -296,7 +296,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
}
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = TAOS_SYNC_OTHER_ERROR;
int32_t ret = 0;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
......@@ -381,15 +381,18 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tmsgSendRsp(&rsp);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = TAOS_SYNC_OTHER_ERROR;
ret = -1;
}
syncNodeRelease(pSyncNode);
} else {
vError("==vnodeProcessSyncReq== error syncEnv stop");
ret = TAOS_SYNC_OTHER_ERROR;
ret = -1;
}
if (ret != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return ret;
}
......
......@@ -98,7 +98,8 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
if (code == 0) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
SEpSet newEpSet = {0};
syncGetEpSet(pVnode->sync, &newEpSet);
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
......@@ -247,29 +248,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}
int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) {
return 0;
}
int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
return 0;
}
int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
return 0;
}
int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) {
return 0;
}
int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
return 0;
}
int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
return 0;
}
int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
......
......@@ -149,12 +149,14 @@ void syncStop(int64_t rid) {
int32_t syncSetStandby(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
// state change
......@@ -177,7 +179,8 @@ int32_t syncSetStandby(int64_t rid) {
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
......@@ -201,7 +204,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
if (!IamInNew) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_NOT_IN_NEW_CONFIG;
terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG;
return -1;
}
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
......@@ -219,7 +223,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
......@@ -246,7 +251,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
if (!IamInNew) {
sError("sync reconfig error, not in new config");
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_NOT_IN_NEW_CONFIG;
terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG;
return -1;
}
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
......@@ -272,13 +278,15 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
int32_t syncLeaderTransfer(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
if (pSyncNode->peersNum == 0) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
......@@ -291,7 +299,8 @@ int32_t syncLeaderTransfer(int64_t rid) {
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
int32_t ret = 0;
......@@ -299,7 +308,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
if (pSyncNode->replicaNum == 1) {
sError("only one replica, cannot drop leader");
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_ONLY_ONE_REPLICA;
terrno = TSDB_CODE_SYN_ONE_REPLICA;
return -1;
}
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
......@@ -538,11 +548,12 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
}
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
int32_t ret = 0;
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_OTHER_ERROR;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
......@@ -553,7 +564,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
}
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
int32_t ret = 0;
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
......@@ -567,14 +578,17 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
ret = TAOS_SYNC_PROPOSE_SUCCESS;
ret = 0;
} else {
ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("syncPropose pSyncNode->FpEqMsg is NULL");
}
syncClientRequestDestroy(pSyncMsg);
} else {
ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
}
return ret;
......
......@@ -338,7 +338,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend);
} else {
assert(ret == 0);
......
......@@ -251,7 +251,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend);
} else {
assert(ret == 0);
......
......@@ -188,7 +188,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend);
} else {
assert(ret == 0);
......
......@@ -391,7 +391,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait);
} else {
assert(ret == 0);
......
......@@ -413,6 +413,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
// wal
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册