未验证 提交 8a72d444 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #13909 from taosdata/fix/mnodesync2

refactor(sync): add syncIsReady
...@@ -24,7 +24,7 @@ extern "C" { ...@@ -24,7 +24,7 @@ extern "C" {
#include "tdef.h" #include "tdef.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1 #define SYNC_INDEX_INVALID -1
typedef uint64_t SyncNodeId; typedef uint64_t SyncNodeId;
...@@ -182,6 +182,7 @@ void syncStart(int64_t rid); ...@@ -182,6 +182,7 @@ void syncStart(int64_t rid);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncSetStandby(int64_t rid); int32_t syncSetStandby(int64_t rid);
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
bool syncIsReady(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid); const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
......
...@@ -221,17 +221,17 @@ void mndCleanupSync(SMnode *pMnode) { ...@@ -221,17 +221,17 @@ void mndCleanupSync(SMnode *pMnode) {
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
rsp.pCont = rpcMallocCont(rsp.contLen); req.pCont = rpcMallocCont(req.contLen);
if (rsp.pCont == NULL) return -1; if (req.pCont == NULL) return -1;
memcpy(rsp.pCont, pRaw, rsp.contLen); memcpy(req.pCont, pRaw, req.contLen);
pMgmt->errCode = 0; pMgmt->errCode = 0;
pMgmt->transId = transId; pMgmt->transId = transId;
mTrace("trans:%d, will be proposed", pMgmt->transId); mTrace("trans:%d, will be proposed", pMgmt->transId);
const bool isWeak = false; const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
if (code == 0) { if (code == 0) {
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
...@@ -242,7 +242,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { ...@@ -242,7 +242,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
} }
rpcFreeCont(rsp.pCont); rpcFreeCont(req.pCont);
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code); mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
return code; return code;
......
...@@ -195,6 +195,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S ...@@ -195,6 +195,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
cJSON* syncNode2Json(const SSyncNode* pSyncNode); cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop); void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop);
SSyncNode* syncNodeAcquire(int64_t rid); SSyncNode* syncNodeAcquire(int64_t rid);
...@@ -230,6 +231,8 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd ...@@ -230,6 +231,8 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
......
...@@ -713,7 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -713,7 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries // delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
sDebug("vgId:%d sync event log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd); sDebug("vgId:%d sync event currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, ths->pRaftStore->currentTerm,
delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code; return code;
...@@ -994,8 +995,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -994,8 +995,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin, sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", ths->vgId,
commitEnd, syncUtilState2String(ths->state)); ths->pRaftStore->currentTerm, commitBegin, commitEnd, syncUtilState2String(ths->state));
} }
SyncIndex beginIndex = ths->commitIndex + 1; SyncIndex beginIndex = ths->commitIndex + 1;
......
...@@ -190,18 +190,19 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -190,18 +190,19 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (gRaftDetailLog) { if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender); char* s = snapshotSender2Str(pSender);
sDebug( sDebug(
"vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastConfigIndex:%ld" "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
"sender:%s", "sender:%s",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastConfigIndex, s); pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s);
taosMemoryFree(s); taosMemoryFree(s);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld", "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastConfigIndex); pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm);
} }
} }
......
...@@ -56,8 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -56,8 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state)); pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex,
syncUtilState2String(pSyncNode->state));
} }
// update commit index // update commit index
......
...@@ -187,7 +187,9 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg ...@@ -187,7 +187,9 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
int32_t ret = 0; int32_t ret = 0;
bool IamInNew = false; bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) { for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
...@@ -203,6 +205,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg ...@@ -203,6 +205,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
} }
*/ */
} }
#endif
if (!IamInNew) { if (!IamInNew) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
...@@ -230,7 +233,9 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { ...@@ -230,7 +233,9 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
bool IamInNew = false; bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) { for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
...@@ -249,6 +254,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { ...@@ -249,6 +254,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
} }
*/ */
} }
#endif
if (!IamInNew) { if (!IamInNew) {
sError("sync reconfig error, not in new config"); sError("sync reconfig error, not in new config");
...@@ -377,6 +383,17 @@ ESyncState syncGetMyRole(int64_t rid) { ...@@ -377,6 +383,17 @@ ESyncState syncGetMyRole(int64_t rid) {
return state; return state;
} }
bool syncIsReady(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
assert(rid == pSyncNode->rid);
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return b;
}
bool syncIsRestoreFinish(int64_t rid) { bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
...@@ -558,7 +575,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -558,7 +575,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return -1; return -1;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType);
ret = syncNodePropose(pSyncNode, pMsg, isWeak); ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
...@@ -567,7 +585,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -567,7 +585,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = 0;
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub; SRespStub stub;
...@@ -600,8 +619,6 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) ...@@ -600,8 +619,6 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
sDebug("vgId:%d sync event sync open", pSyncInfo->vgId);
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode));
...@@ -815,6 +832,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -815,6 +832,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta // snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1; // pSyncNode->sMeta.lastConfigIndex = -1;
sDebug("vgId:%d sync event currentTerm:%lu sync open", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm);
return pSyncNode; return pSyncNode;
} }
...@@ -860,7 +879,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ...@@ -860,7 +879,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
} }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
sDebug("vgId:%d sync event sync close", pSyncNode->vgId); sDebug("vgId:%d sync event currentTerm:%lu sync close", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm);
int32_t ret; int32_t ret;
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
...@@ -1258,9 +1277,36 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ...@@ -1258,9 +1277,36 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
return s; return s;
} }
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) { bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
bool b1 = false;
bool b2 = false;
for (int i = 0; i < config->replicaNum; ++i) {
if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
b1 = true;
break;
}
}
for (int i = 0; i < config->replicaNum; ++i) {
SRaftId raftId;
raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
raftId.vgId = pSyncNode->vgId;
if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
b2 = true;
break;
}
}
ASSERT(b1 == b2);
return b1;
}
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) {
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
pSyncNode->pRaftCfg->cfg = *newConfig; pSyncNode->pRaftCfg->cfg = *pNewConfig;
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
int32_t ret = 0; int32_t ret = 0;
...@@ -1272,7 +1318,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l ...@@ -1272,7 +1318,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i]; oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d sync event save senders %d, %p", pSyncNode->vgId, i, oldSenders[i]); sDebug("vgId:%d sync event currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm);
if (gRaftDetailLog) { if (gRaftDetailLog) {
; ;
} }
...@@ -1324,8 +1371,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l ...@@ -1324,8 +1371,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p", pSyncNode->vgId, sDebug("vgId:%d sync event currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu",
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]); pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port,
oldSenders[j], oldSenders[j]->privateTerm);
(pSyncNode->senders)[i] = oldSenders[j]; (pSyncNode->senders)[i] = oldSenders[j];
oldSenders[j] = NULL; oldSenders[j] = NULL;
reset = true; reset = true;
...@@ -1333,8 +1381,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l ...@@ -1333,8 +1381,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
// reset replicaIndex // reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i; (pSyncNode->senders)[i]->replicaIndex = i;
sDebug("vgId:%d sync event udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", pSyncNode->vgId, sDebug("vgId:%d sync event currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d",
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port,
(pSyncNode->senders)[i], reset);
} }
} }
} }
...@@ -1343,7 +1392,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l ...@@ -1343,7 +1392,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) { if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug("vgId:%d sync event create new sender %p replicaIndex:%d", pSyncNode->vgId, (pSyncNode->senders)[i], i); sDebug("vgId:%d sync event currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i,
(pSyncNode->senders)[i]->privateTerm);
} }
} }
...@@ -1351,13 +1402,16 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l ...@@ -1351,13 +1402,16 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) { if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]); snapshotSenderDestroy(oldSenders[i]);
sDebug("vgId:%d sync event delete old sender %p replicaIndex:%d", pSyncNode->vgId, oldSenders[i], i); sDebug("vgId:%d sync event currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
oldSenders[i] = NULL; oldSenders[i] = NULL;
} }
} }
bool IamInOld = false; bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
bool IamInNew = false; bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
#if 0
for (int i = 0; i < oldConfig.replicaNum; ++i) { for (int i = 0; i < oldConfig.replicaNum; ++i) {
if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
...@@ -1373,6 +1427,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l ...@@ -1373,6 +1427,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
break; break;
} }
} }
#endif
*isDrop = true; *isDrop = true;
if (IamInOld && !IamInNew) { if (IamInOld && !IamInNew) {
...@@ -1381,6 +1436,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l ...@@ -1381,6 +1436,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
*isDrop = false; *isDrop = false;
} }
// may be add me to a new raft group
if (IamInOld && IamInNew && oldConfig.replicaNum == 1) {
}
if (IamInNew) { if (IamInNew) {
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
} }
...@@ -1406,14 +1465,19 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid) ...@@ -1406,14 +1465,19 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid)
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) { if (term > pSyncNode->pRaftStore->currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term); raftStoreSetTerm(pSyncNode->pRaftStore, term);
syncNodeBecomeFollower(pSyncNode, "update term"); char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %lu", term);
syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode->pRaftStore); raftStoreClearVote(pSyncNode->pRaftStore);
} }
} }
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug("vgId:%d sync event become follower, isStandBy:%d, replicaNum:%d, %s", pSyncNode->vgId, sDebug(
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr); "vgId:%d sync event currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// maybe clear leader cache // maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
...@@ -1447,8 +1511,12 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1447,8 +1511,12 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>> // /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
// //
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
sDebug("vgId:%d sync event become leader, isStandBy:%d, replicaNum:%d %s", pSyncNode->vgId, // reset restoreFinish
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr); pSyncNode->restoreFinish = false;
sDebug("vgId:%d sync event currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->state = TAOS_SYNC_STATE_LEADER;
...@@ -2022,21 +2090,13 @@ const char* syncStr(ESyncState state) { ...@@ -2022,21 +2090,13 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
/* sDebug("vgId:%d sync event currentTerm:%lu begin leader transfer", ths->vgId, ths->pRaftStore->currentTerm);
char host[128];
uint16_t port;
syncUtilU642Addr(pSyncLeaderTransfer->newLeaderId.addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, host, port,
pSyncLeaderTransfer->newLeaderId.addr);
*/
sDebug("vgId:%d sync event, begin leader transfer", ths->vgId);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, sDebug("vgId:%d sync event currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
pSyncLeaderTransfer->newLeaderId.addr); pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
// reset elect timer now! // reset elect timer now!
int32_t electMS = 1; int32_t electMS = 1;
...@@ -2069,6 +2129,21 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2069,6 +2129,21 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
return 0; return 0;
} }
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
SRaftId raftId;
raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
raftId.vgId = ths->vgId;
if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
pNewCfg->myIndex = i;
return 0;
}
}
return -1;
}
static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
...@@ -2077,19 +2152,23 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2077,19 +2152,23 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
ASSERT(ret == 0); ASSERT(ret == 0);
// update new config myIndex // update new config myIndex
bool IamInNew = false; syncNodeUpdateNewConfigIndex(ths, &newSyncCfg);
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && bool IamInNew = syncNodeInConfig(ths, &newSyncCfg);
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i; /*
IamInNew = true; for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
break; if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
} ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
} newSyncCfg.myIndex = i;
IamInNew = true;
break;
}
}
*/
bool isDrop; bool isDrop;
// if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
if (IamInNew) { if (IamInNew) {
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
...@@ -2139,8 +2218,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2139,8 +2218,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0; int32_t code = 0;
ESyncState state = flag; ESyncState state = flag;
sDebug("vgId:%d sync event commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex, sDebug("vgId:%d sync event currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId,
endIndex, syncUtilState2String(state)); ths->pRaftStore->currentTerm, beginIndex, endIndex, syncUtilState2String(state));
// execute fsm // execute fsm
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
...@@ -2188,7 +2267,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -2188,7 +2267,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm); ths->pFsm->FpRestoreFinishCb(ths->pFsm);
} }
ths->restoreFinish = true; ths->restoreFinish = true;
sDebug("vgId:%d sync event restore finish, index:%ld", ths->vgId, pEntry->index); sDebug("vgId:%d sync event currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), pEntry->index);
} }
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "wal.h" #include "wal.h"
// refactor, log[0 .. n] ==> log[m .. n] // refactor, log[0 .. n] ==> log[m .. n]
...@@ -162,9 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -162,9 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true); walFsync(pWal, true);
sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId, sDebug("vgId:%d sync event currentTerm:%lu write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d",
pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code; return code;
} }
...@@ -320,7 +322,12 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -320,7 +322,12 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync(pWal, true); walFsync(pWal, true);
sDebug("sync event old write wal: %ld", pEntry->index); sDebug(
"vgId:%d sync event currentTerm:%lu old write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d",
pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index,
syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType),
pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code; return code;
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "syncRespMgr.h" #include "syncRespMgr.h"
#include "syncRaftStore.h"
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr)); SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
...@@ -45,8 +46,9 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { ...@@ -45,8 +46,9 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event resp mgr add, type:%s seq:%lu handle:%p", pSyncNode->vgId, sDebug("vgId:%d sync event currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle); pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
return keyCode; return keyCode;
...@@ -69,8 +71,9 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { ...@@ -69,8 +71,9 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy(pStub, pTmp, sizeof(SRespStub)); memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event resp mgr get, type:%s seq:%lu handle:%p", pSyncNode->vgId, sDebug("vgId:%d sync event currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle); pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType,
index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
return 1; // get one object return 1; // get one object
...@@ -87,8 +90,9 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu ...@@ -87,8 +90,9 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy(pStub, pTmp, sizeof(SRespStub)); memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event resp mgr get and del, type:%s seq:%lu handle:%p", pSyncNode->vgId, sDebug("vgId:%d sync event currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle); pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType,
index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosHashRemove(pObj->pRespHash, &index, sizeof(index));
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
......
...@@ -141,18 +141,22 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -141,18 +141,22 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"lastConfigIndex:%ld send " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"lastConfigIndex:%ld", "lastApplyTerm:%lu "
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, "lastConfigIndex:%ld privateTerm:%lu",
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
} }
syncSnapshotSendDestroy(pMsg); syncSnapshotSendDestroy(pMsg);
...@@ -279,25 +283,31 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -279,25 +283,31 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"lastConfigIndex:%ld send " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr); pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"lastConfigIndex:%ld", "lastApplyTerm:%lu "
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, "lastConfigIndex:%ld privateTerm:%lu",
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
} }
} else { } else {
sDebug( sDebug(
"vgId:%d sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu " "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld "
"lastConfigIndex:%ld", "lastApplyTerm:%lu "
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, "lastConfigIndex:%ld privateTerm:%lu",
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
} }
syncSnapshotSendDestroy(pMsg); syncSnapshotSendDestroy(pMsg);
...@@ -328,12 +338,15 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -328,12 +338,15 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId, sDebug(
host, port, pSender->seq, pSender->ack, msgStr); "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port, sDebug("vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu",
pSender->seq, pSender->ack); pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq,
pSender->ack, pSender->privateTerm);
} }
syncSnapshotSendDestroy(pMsg); syncSnapshotSendDestroy(pMsg);
...@@ -485,7 +498,7 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { ...@@ -485,7 +498,7 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
pReceiver->start = false; pReceiver->start = false;
if (apply) { if (apply) {
++(pReceiver->privateTerm); // ++(pReceiver->privateTerm);
} }
if (gRaftDetailLog) { if (gRaftDetailLog) {
...@@ -566,16 +579,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -566,16 +579,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
msgStr); pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld", "lastConfigIndex:%ld privateTerm:%lu",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
...@@ -590,6 +604,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -590,6 +604,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
int32_t oldReplicaNum = pSyncNode->replicaNum; int32_t oldReplicaNum = pSyncNode->replicaNum;
// update new config myIndex
SSyncCfg newSyncCfg = pMsg->lastConfig;
syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg);
#if 0
// update new config myIndex // update new config myIndex
bool IamInNew = false; bool IamInNew = false;
SSyncCfg newSyncCfg = pMsg->lastConfig; SSyncCfg newSyncCfg = pMsg->lastConfig;
...@@ -601,17 +621,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -601,17 +621,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
break; break;
} }
} }
#endif
bool isDrop; bool isDrop;
if (IamInNew) { if (IamInNew) {
sDebug("vgId:%d sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ", sDebug(
pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); "vgId:%d sync event currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event currentTerm:%lu do not update config by snapshot, I am not in newCfg, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ", "lastConfigIndex:%ld ",
pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex);
} }
// change isStandBy to normal // change isStandBy to normal
...@@ -636,19 +662,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -636,19 +662,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, raft log:%s", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1,
snapshot.lastConfigIndex, logSimpleStr); snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm,
logSimpleStr);
taosMemoryFree(logSimpleStr); taosMemoryFree(logSimpleStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1,
snapshot.lastConfigIndex); snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm);
} }
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
...@@ -659,17 +686,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -659,17 +686,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastConfigIndex, msgStr); pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld", "lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastConfigIndex); pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
...@@ -684,18 +711,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -684,18 +711,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"lastConfigIndex:%ld, recv " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv "
"msg:%s", "msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastConfigIndex, msgStr); pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"lastConfigIndex:%ld", "lastTerm:%lu, "
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, "lastConfigIndex:%ld, privateTerm:%lu",
pMsg->lastConfigIndex); pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
...@@ -715,16 +744,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -715,16 +744,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"lastConfigIndex:%ld, recv msg:%s", "lastTerm:%lu, "
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
msgStr); pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"lastConfigIndex:%ld", "lastTerm:%lu, "
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); "lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else { } else {
......
...@@ -297,7 +297,7 @@ void usage(char* exe) { ...@@ -297,7 +297,7 @@ void usage(char* exe) {
SRpcMsg* createRpcMsg(int i, int count, int myIndex) { SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg)); SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg)); memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999; pMsg->msgType = TDMT_VND_SUBMIT;
pMsg->contLen = 256; pMsg->contLen = 256;
pMsg->pCont = rpcMallocCont(pMsg->contLen); pMsg->pCont = rpcMallocCont(pMsg->contLen);
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs()); snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs());
...@@ -384,8 +384,10 @@ int main(int argc, char** argv) { ...@@ -384,8 +384,10 @@ int main(int argc, char** argv) {
leaderTransferWait++; leaderTransferWait++;
if (leaderTransferWait == 7) { if (leaderTransferWait == 7) {
sTrace("begin leader transfer ..."); if (leaderTransfer) {
int32_t ret = syncLeaderTransfer(rid); sTrace("begin leader transfer ...");
int32_t ret = syncLeaderTransfer(rid);
}
} }
if (alreadySend < writeRecordNum) { if (alreadySend < writeRecordNum) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册