提交 8d789bba 编写于 作者: M Minghao Li

sync enq

上级 f7239014
......@@ -138,6 +138,8 @@ typedef struct SSyncInfo {
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
} SSyncInfo;
......@@ -147,13 +149,10 @@ typedef struct SSyncNode SSyncNode;
int32_t syncInit();
void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak);
int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);
......
......@@ -26,6 +26,9 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
void syncNodeElect(SSyncNode* pSyncNode);
void syncNodeRequestVotePeers(SSyncNode* pSyncNode);
#ifdef __cplusplus
}
#endif
......
......@@ -58,9 +58,10 @@ extern SSyncIO *gSyncIO;
int32_t syncIOStart(char *host, uint16_t port);
int32_t syncIOStop();
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOTickQ();
int32_t syncIOTickPing();
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -113,6 +113,8 @@ typedef struct SSyncNode {
char path[TSDB_FILENAME_LEN];
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
// init internal
SNodeInfo me;
......@@ -149,19 +151,19 @@ typedef struct SSyncNode {
// timer
tmr_h pPingTimer;
int32_t pingTimerMS;
uint8_t pingTimerStart;
uint8_t pingTimerEnable;
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
uint64_t pingTimerCounter;
tmr_h pElectTimer;
int32_t electTimerMS;
uint8_t electTimerStart;
uint8_t electTimerEnable;
TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp
uint64_t electTimerCounter;
tmr_h pHeartbeatTimer;
int32_t heartbeatTimerMS;
uint8_t heartbeatTimerStart;
uint8_t heartbeatTimerEnable;
TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp
uint64_t heartbeatTimerCounter;
......@@ -180,8 +182,24 @@ void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePingAll(SSyncNode* pSyncNode);
void syncNodePingPeers(SSyncNode* pSyncNode);
void syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms);
void syncNodeBecomeFollower(SSyncNode* pSyncNode);
void syncNodeBecomeLeader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
#ifdef __cplusplus
}
......
......@@ -30,6 +30,7 @@ extern "C" {
// encode as uint32
typedef enum ESyncMessageType {
SYNC_UNKNOWN = 99,
SYNC_PING = 101,
SYNC_PING_REPLY = 103,
SYNC_CLIENT_REQUEST = 105,
......@@ -40,6 +41,10 @@ typedef enum ESyncMessageType {
SYNC_APPEND_ENTRIES_REPLY = 115,
} ESyncMessageType;
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
cJSON* syncRpcUnknownMsg2Json();
// ---------------------------------------------
typedef struct SyncPing {
uint32_t bytes;
......
......@@ -26,6 +26,8 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
#ifdef __cplusplus
}
#endif
......
......@@ -14,3 +14,7 @@
*/
#include "syncElection.h"
void syncNodeElect(SSyncNode* pSyncNode) {}
void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
\ No newline at end of file
......@@ -40,17 +40,6 @@ static void syncIOTickPingFunc(void *param, void *tmrId);
// ----------------------------
// public function ------------
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
sTrace(
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, "
"pMsg->msgType:%d, pMsg->contLen:%d",
clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle,
pMsg->msgType, pMsg->contLen);
pMsg->handle = NULL;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return 0;
}
int32_t syncIOStart(char *host, uint16_t port) {
gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL);
......@@ -83,6 +72,35 @@ int32_t syncIOTickPing() {
return ret;
}
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
sTrace(
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, "
"pMsg->msgType:%d, pMsg->contLen:%d",
clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle,
pMsg->msgType, pMsg->contLen);
{
cJSON *pJson = syncRpcMsg2Json(pMsg);
char * serialized = cJSON_Print(pJson);
sTrace("process syncMessage send: pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
pMsg->handle = NULL;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return 0;
}
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
STaosQueue *pMsgQ = queue;
taosWriteQitem(pMsgQ, pTemp);
return 0;
}
// local function ------------
static int32_t syncIOStartInternal(SSyncIO *io) {
taosBlockSIGPIPE();
......@@ -215,6 +233,7 @@ static void *syncIOConsumerFunc(void *param) {
syncPingFromRpcMsg(pRpcMsg, pSyncMsg);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
......@@ -223,6 +242,7 @@ static void *syncIOConsumerFunc(void *param) {
pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen);
syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
}
} else {
;
......
......@@ -56,8 +56,6 @@ void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
......@@ -76,6 +74,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->queue = pSyncInfo->queue;
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
......@@ -93,7 +93,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = 1000;
atomic_store_8(&pSyncNode->pingTimerStart, 0);
atomic_store_8(&pSyncNode->pingTimerEnable, 0);
pSyncNode->FpPingTimer = syncNodePingTimerCb;
pSyncNode->pingTimerCounter = 0;
......@@ -148,22 +148,76 @@ void syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pPingTimer == NULL) {
pSyncNode->pPingTimer =
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager);
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager,
taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
}
atomic_store_8(&pSyncNode->pingTimerStart, 1);
atomic_store_8(&pSyncNode->pingTimerEnable, 1);
return 0;
}
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->pingTimerStart, 0);
pSyncNode->pingTimerCounter = TIMER_MAX_MS;
atomic_store_8(&pSyncNode->pingTimerEnable, 0);
pSyncNode->pingTimerMS = TIMER_MAX_MS;
return 0;
}
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pElectTimer == NULL) {
pSyncNode->pElectTimer =
taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer);
}
atomic_store_8(&pSyncNode->electTimerEnable, 1);
return 0;
}
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->electTimerEnable, 0);
pSyncNode->electTimerMS = TIMER_MAX_MS;
return 0;
}
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pHeartbeatTimer == NULL) {
pSyncNode->pHeartbeatTimer =
taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pHeartbeatTimer);
}
atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1);
return 0;
}
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0);
pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
return 0;
}
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {}
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {}
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}
// ------ local funciton ---------
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
......@@ -204,7 +258,6 @@ static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pM
}
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode);
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
......@@ -225,7 +278,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized);
sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
......@@ -245,7 +298,7 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
{
cJSON* pJson = syncPingReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized);
sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
......@@ -275,7 +328,7 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR
static void syncNodePingTimerCb(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerStart)) {
if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
++(pSyncNode->pingTimerCounter);
// pSyncNode->pingTimerMS += 100;
......@@ -289,6 +342,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) {
taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
} else {
sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart);
sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
}
}
\ No newline at end of file
......@@ -20,6 +20,59 @@
void onMessage(SRaft* pRaft, void* pMsg) {}
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* pRoot;
if (pRpcMsg->msgType == SYNC_PING) {
SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont;
pRoot = syncPing2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont;
pRoot = syncPingReply2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
pRoot = syncRpcUnknownMsg2Json();
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) {
pRoot = syncRpcUnknownMsg2Json();
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont;
pRoot = syncRequestVote2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont;
pRoot = syncRequestVoteReply2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont;
pRoot = syncAppendEntries2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
pRoot = syncAppendEntriesReply2Json(pSyncMsg);
} else {
pRoot = syncRpcUnknownMsg2Json();
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "RpcMsg", pRoot);
return pJson;
}
cJSON* syncRpcUnknownMsg2Json() {
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN);
cJSON_AddStringToObject(pRoot, "data", "known message");
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
return pJson;
}
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_PING_FIX_LEN + dataLen;
......
......@@ -14,3 +14,5 @@
*/
#include "syncReplication.h"
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}
\ No newline at end of file
......@@ -8,6 +8,7 @@ add_executable(syncIOSendMsgTest "")
add_executable(syncIOSendMsgClientTest "")
add_executable(syncIOSendMsgServerTest "")
add_executable(syncRaftStoreTest "")
add_executable(syncEnqTest "")
target_sources(syncTest
......@@ -50,6 +51,10 @@ target_sources(syncRaftStoreTest
PRIVATE
"syncRaftStoreTest.cpp"
)
target_sources(syncEnqTest
PRIVATE
"syncEnqTest.cpp"
)
target_include_directories(syncTest
......@@ -102,6 +107,11 @@ target_include_directories(syncRaftStoreTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEnqTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -144,6 +154,10 @@ target_link_libraries(syncRaftStoreTest
sync
gtest_main
)
target_link_libraries(syncEnqTest
sync
gtest_main
)
enable_testing()
......
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[3] = {7010, 7110, 7210};
SSyncNode* doSync(int myIndex) {
SSyncFSM* pFsm;
SSyncInfo syncInfo;
syncInfo.vgId = 1;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = 3;
pCfg->nodeInfo[0].nodePort = ports[0];
snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg->nodeInfo[1].nodePort = ports[1];
snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg->nodeInfo[2].nodePort = ports[2];
snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
void timerPingAll(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
syncNodePingAll(pSyncNode);
}
int main(int argc, char** argv) {
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
if (myIndex > 2 || myIndex < 0) {
fprintf(stderr, "myIndex:%d error. should be 0 - 2", myIndex);
return 1;
}
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
SSyncNode* pSyncNode = doSync(myIndex);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
for (int i = 0; i < 10; ++i) {
SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
taosMsleep(1000);
}
while (1) {
taosMsleep(1000);
}
return 0;
}
......@@ -22,6 +22,8 @@ SSyncNode* doSync(int myIndex) {
syncInfo.vgId = 1;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
......@@ -80,11 +82,9 @@ int main(int argc, char** argv) {
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
/*
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
*/
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
while (1) {
taosMsleep(1000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册