提交 d5a2cf1e 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -35,6 +35,7 @@ typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 100,
TAOS_SYNC_STATE_CANDIDATE = 101,
TAOS_SYNC_STATE_LEADER = 102,
TAOS_SYNC_STATE_ERROR = 103,
} ESyncState;
typedef struct SSyncBuffer {
......@@ -68,17 +69,20 @@ typedef struct SSnapshot {
typedef struct SSyncFSM {
void* data;
// when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result
// when value in pMsg finish a raft flow, FpCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state);
// when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result
// when value in pMsg has been written into local log store, FpPreCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state);
// when log entry is updated by a new one, FpRollBackCb is called
// user can do something to roll back. for example, delete data from tsdb, or just ignore it
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state);
// user should implement this function, use "data" to take snapshot into "snapshot"
int32_t (*FpTakeSnapshot)(SSnapshot* snapshot);
......@@ -157,10 +161,14 @@ void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // just for compatibility
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);
// propose with sequence number, to implement linearizable semantics
int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum);
// for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
extern int32_t sDebugFlag;
......
......@@ -47,6 +47,8 @@ SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore);
char* logStoreSimple2Str(SSyncLogStore* pLogStore);
// for debug
void logStorePrint(SSyncLogStore* pLogStore);
......@@ -54,6 +56,11 @@ void logStorePrint2(char* s, SSyncLogStore* pLogStore);
void logStoreLog(SSyncLogStore* pLogStore);
void logStoreLog2(char* s, SSyncLogStore* pLogStore);
void logStoreSimplePrint(SSyncLogStore* pLogStore);
void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore);
void logStoreSimpleLog(SSyncLogStore* pLogStore);
void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore);
#ifdef __cplusplus
}
#endif
......
......@@ -120,6 +120,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
sTrace(
"syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
......@@ -137,6 +142,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
sTrace(
"syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths);
// ret or reply?
......@@ -145,89 +155,60 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
bool preMatch = false;
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID &&
ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) {
preMatch = true;
}
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pPreEntry != NULL);
if (pMsg->prevLogTerm == pPreEntry->term) {
preMatch = true;
}
syncEntryDestory(pPreEntry);
}
// preIndex = -1, or has preIndex entry in local log
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
if (preMatch) {
// must has preIndex in local log
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
// has extra entries (> preIndex) in local log
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
bool hasAppendEntries = pMsg->dataLen > 0;
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
if (hasExtraEntries && hasAppendEntries) {
// conflict
bool conflict = false;
sTrace(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL);
if (hasExtraEntries && hasAppendEntries) {
// not conflict by default
bool conflict = false;
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL);
assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term == pAppendEntry->term) {
conflict = true;
}
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
// log not match, conflict
assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term != pAppendEntry->term) {
conflict = true;
}
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0);
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pRollBackEntry);
}
}
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
// delete confict entries
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0);
}
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state);
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pRollBackEntry);
}
rpcFreeCont(rpcMsg.pCont);
}
// free memory
syncEntryDestory(pExtraEntry);
syncEntryDestory(pAppendEntry);
} else if (hasExtraEntries && !hasAppendEntries) {
// do nothing
} else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// delete confict entries
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
......@@ -237,53 +218,63 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0);
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state);
}
}
rpcFreeCont(rpcMsg.pCont);
}
// free memory
syncEntryDestory(pAppendEntry);
// free memory
syncEntryDestory(pExtraEntry);
syncEntryDestory(pAppendEntry);
} else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else if (hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
assert(0);
}
} else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state);
}
}
rpcFreeCont(rpcMsg.pCont);
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
// free memory
syncEntryDestory(pAppendEntry);
syncAppendEntriesReplyDestroy(pReply);
} else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
assert(0);
}
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
}
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
// maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
......@@ -308,7 +299,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
}
rpcFreeCont(rpcMsg.pCont);
......
......@@ -48,6 +48,9 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
return ret;
}
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
......@@ -77,5 +80,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
}
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);
return ret;
}
......@@ -63,7 +63,14 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (pEntry->term == pSyncNode->pRaftStore->currentTerm) {
// update commit index
newCommitIndex = index;
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld",
newCommitIndex, pSyncNode->commitIndex);
break;
} else {
sTrace(
"syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, "
"pSyncNode->pRaftStore->currentTerm:%lu",
pEntry->term, pSyncNode->pRaftStore->currentTerm);
}
}
}
......@@ -91,7 +98,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state);
}
rpcFreeCont(rpcMsg.pCont);
......
......@@ -257,13 +257,6 @@ static void *syncIOConsumerFunc(void *param) {
assert(pSyncMsg != NULL);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
/*
pSyncMsg = syncPingBuild(pRpcMsg->contLen);
syncPingFromRpcMsg(pRpcMsg, pSyncMsg);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
*/
}
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
......
......@@ -104,9 +104,26 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
}
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
int32_t ret = syncPropose2(rid, pMsg, isWeak, 0);
return ret;
}
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret;
}
ESyncState syncGetMyRole(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
assert(rid == pSyncNode->rid);
return pSyncNode->state;
}
// todo : get pointer from rid
int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum) {
int32_t ret = 0;
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
......@@ -114,7 +131,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
assert(rid == pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak);
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak);
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
......@@ -122,7 +139,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
ret = 0;
} else {
sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state));
sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
ret = -1; // todo : need define err code !!
}
......@@ -130,19 +147,6 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return ret;
}
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret;
}
ESyncState syncGetMyRole(int64_t rid) {
// todo : get pointer from rid
SSyncNode* pSyncNode = NULL;
return pSyncNode->state;
}
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
// open/close --------------
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
......@@ -848,7 +852,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
}
}
rpcFreeCont(rpcMsg.pCont);
......@@ -863,7 +867,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -2);
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state);
}
}
rpcFreeCont(rpcMsg.pCont);
......
......@@ -834,7 +834,7 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastLogIndex);
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm);
cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf);
......@@ -1127,14 +1127,14 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex);
cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->prevLogIndex);
cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm);
cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex);
cJSON_AddStringToObject(pRoot, "commit_index", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->commitIndex);
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s;
......@@ -1288,7 +1288,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->matchIndex);
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
}
......
......@@ -165,6 +165,34 @@ char* logStore2Str(SSyncLogStore* pLogStore) {
return serialized;
}
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
char u64buf[128];
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
cJSON* pRoot = cJSON_CreateObject();
if (pData != NULL && pData->pWal != NULL) {
snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot);
return pJson;
}
char* logStoreSimple2Str(SSyncLogStore* pLogStore) {
cJSON* pJson = logStoreSimple2Json(pLogStore);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug -----------------
void logStorePrint(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
......@@ -191,3 +219,30 @@ void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
// for debug -----------------
void logStoreSimplePrint(SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore);
printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore);
printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void logStoreSimpleLog(SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore);
sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore);
sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
......@@ -49,6 +49,10 @@
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pMatchIndex", pSyncNode->pMatchIndex);
logStoreSimpleLog2("==syncNodeAppendEntriesPeers==", pSyncNode->pLogStore);
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]);
......@@ -99,6 +103,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
syncAppendEntriesLog2("==syncNodeAppendEntriesPeers==", pMsg);
// send AppendEntries
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
syncAppendEntriesDestroy(pMsg);
......
......@@ -29,10 +29,14 @@ add_executable(syncPingTimerTest2 "")
add_executable(syncPingSelfTest "")
add_executable(syncElectTest "")
add_executable(syncElectTest2 "")
add_executable(syncElectTest3 "")
add_executable(syncEncodeTest "")
add_executable(syncWriteTest "")
add_executable(syncReplicateTest "")
add_executable(syncReplicateTest2 "")
add_executable(syncReplicateLoadTest "")
add_executable(syncRefTest "")
add_executable(syncLogStoreCheck "")
target_sources(syncTest
......@@ -159,6 +163,10 @@ target_sources(syncElectTest2
PRIVATE
"syncElectTest2.cpp"
)
target_sources(syncElectTest3
PRIVATE
"syncElectTest3.cpp"
)
target_sources(syncEncodeTest
PRIVATE
"syncEncodeTest.cpp"
......@@ -171,10 +179,22 @@ target_sources(syncReplicateTest
PRIVATE
"syncReplicateTest.cpp"
)
target_sources(syncReplicateTest2
PRIVATE
"syncReplicateTest2.cpp"
)
target_sources(syncReplicateLoadTest
PRIVATE
"syncReplicateLoadTest.cpp"
)
target_sources(syncRefTest
PRIVATE
"syncRefTest.cpp"
)
target_sources(syncLogStoreCheck
PRIVATE
"syncLogStoreCheck.cpp"
)
target_include_directories(syncTest
......@@ -332,6 +352,11 @@ target_include_directories(syncElectTest2
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncElectTest3
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEncodeTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
......@@ -347,11 +372,26 @@ target_include_directories(syncReplicateTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncReplicateTest2
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncReplicateLoadTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRefTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncLogStoreCheck
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -478,6 +518,10 @@ target_link_libraries(syncElectTest2
sync
gtest_main
)
target_link_libraries(syncElectTest3
sync
gtest_main
)
target_link_libraries(syncEncodeTest
sync
gtest_main
......@@ -490,10 +534,22 @@ target_link_libraries(syncReplicateTest
sync
gtest_main
)
target_link_libraries(syncReplicateTest2
sync
gtest_main
)
target_link_libraries(syncReplicateLoadTest
sync
gtest_main
)
target_link_libraries(syncRefTest
sync
gtest_main
)
target_link_libraries(syncLogStoreCheck
sync
gtest_main
)
enable_testing()
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "tref.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SWal* pWal;
int64_t syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "./elect3_test_%d", myIndex);
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
char tmpdir[128];
snprintf(tmpdir, sizeof(tmpdir), "./elect3_test_wal_%d", myIndex);
pWal = walOpen(tmpdir, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
int64_t rid = syncStart(&syncInfo);
assert(rid > 0);
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
pSyncNode->hbBaseLine = 500;
pSyncNode->electBaseLine = 1500;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
return rid;
}
void initRaftId(SSyncNode* pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char* s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncInit();
assert(ret == 0);
int64_t rid = syncNodeInit();
assert(rid > 0);
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
syncNodePrint2((char*)"", pSyncNode);
initRaftId(pSyncNode);
//---------------------------
while (1) {
sTrace(
"elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
taosMsleep(1000);
}
syncNodeRelease(pSyncNode);
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 1;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SWal* pWal;
SSyncNode* pSyncNode;
SSyncNode* syncNodeInit(const char* path) {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./log_check");
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
pWal = walOpen(path, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
SSyncNode* logStoreCheck(const char* path) { return syncNodeInit(path); }
int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
pSyncNode = logStoreCheck(argv[1]);
assert(pSyncNode != NULL);
logStorePrint2((char*)"logStoreCheck", pSyncNode->pLogStore);
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM *pFsm;
SWal * pWal;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void initFsm() {
pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
}
int64_t syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex);
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
char tmpdir[128];
snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex);
pWal = walOpen(tmpdir, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
int64_t rid = syncStart(&syncInfo);
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
// pSyncNode->hbBaseLine = 500;
// pSyncNode->electBaseLine = 1500;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
return rid;
}
void initRaftId(SSyncNode *pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char *s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
SRpcMsg *step0(int i) {
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 128;
pMsg->pCont = malloc(pMsg->contLen);
snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i);
return pMsg;
}
SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
return pRetMsg;
}
int main(int argc, char **argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
void logTest();
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
initFsm();
ret = syncInit();
assert(ret == 0);
int64_t rid = syncNodeInit();
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
syncNodePrint2((char *)"", pSyncNode);
initRaftId(pSyncNode);
// only load ...
while (1) {
sTrace(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
taosMsleep(1000);
}
return 0;
}
......@@ -28,19 +28,29 @@ SSyncFSM * pFsm;
SWal * pWal;
SSyncNode *gSyncNode;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf);
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf);
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf);
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void initFsm() {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM *pFsm;
SWal * pWal;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void initFsm() {
pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
}
int64_t syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex);
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
char tmpdir[128];
snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex);
pWal = walOpen(tmpdir, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
int64_t rid = syncStart(&syncInfo);
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
// pSyncNode->hbBaseLine = 500;
// pSyncNode->electBaseLine = 1500;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
return rid;
}
void initRaftId(SSyncNode *pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char *s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
SRpcMsg *step0(int i) {
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 128;
pMsg->pCont = malloc(pMsg->contLen);
snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i);
return pMsg;
}
SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
return pRetMsg;
}
int main(int argc, char **argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
void logTest();
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
initFsm();
ret = syncInit();
assert(ret == 0);
int64_t rid = syncNodeInit();
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
syncNodePrint2((char *)"", pSyncNode);
initRaftId(pSyncNode);
for (int i = 0; i < 30; ++i) {
// step0
SRpcMsg *pMsg0 = step0(i);
syncRpcMsgPrint2((char *)"==step0==", pMsg0);
syncPropose(rid, pMsg0, true);
taosMsleep(1000);
free(pMsg0);
sTrace(
"syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
}
while (1) {
sTrace(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
taosMsleep(1000);
}
return 0;
}
......@@ -28,19 +28,29 @@ SSyncFSM * pFsm;
SWal * pWal;
SSyncNode *gSyncNode;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf);
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf);
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf);
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void initFsm() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册