diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 82108acf7b95de38e1987e21e636674cc846c1a0..d75c5424ea8956f8bf9f75b1dc98ab3815ed0ae8 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -97,14 +97,17 @@ typedef struct SRaftId { } SRaftId; typedef struct SSyncNode { - int8_t replica; - int8_t quorum; - int32_t refCount; - int64_t rid; - SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; + SSyncFSM* pFsm; + + // passed from outside + void* rpcClient; + int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + + int32_t refCount; + int64_t rid; SNodeInfo me; SNodeInfo peers[TSDB_MAX_REPLICA]; @@ -112,7 +115,6 @@ typedef struct SSyncNode { ESyncRole role; SRaftId raftId; - SSyncFSM* pFsm; tmr_h pPingTimer; int32_t pingTimerMS; @@ -145,10 +147,6 @@ typedef struct SSyncNode { int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); - // passed from outside - void* rpcClient; - int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); - } SSyncNode; SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 5c27f9ec3ac957a6e3460453a1ef72391d6feb9e..c035ad5d6b88f8e6554faff7ec21eb257670f027 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -30,19 +30,26 @@ static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, S static int32_t doSyncIODestroy(SSyncIO *io); static SSyncIO *syncIOCreate(); -static void * syncIOConsumer(void *param); +static void *syncIOConsumer(void *param); static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); static void syncIODoReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIODoRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOTick(void *param, void *tmrId); // ---------------------------- -int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } +int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { + pMsg->handle = NULL; + rpcSendRequest(handle, pEpSet, pMsg, NULL); + return 0; +} int32_t syncIOStart() { gSyncIO = syncIOCreate(); assert(gSyncIO != NULL); + int32_t ret = doSyncIOStart(gSyncIO); + assert(ret == 0); + return 0; } @@ -67,15 +74,14 @@ static void syncIOTick(void *param, void *tmrId) { taosWriteQitem(io->pMsgQ, pTemp); - bool b = taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, io->syncTimer); - assert(b); + taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, io->syncTimer); } static void *syncIOConsumer(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); @@ -215,8 +221,8 @@ static int32_t doSyncIOStart(SSyncIO *io) { } // start tmr thread - io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); - io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager); + // io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); + // io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager); return 0; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4d9e5887cd79c7f215049c4f75ce3f4f7464b268..0aa3e5606292a11a32801e03dfbc33d23267abb2 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -69,15 +69,34 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { assert(pSyncNode != NULL); memset(pSyncNode, 0, sizeof(SSyncNode)); + pSyncNode->vgId = pSyncInfo->vgId; + pSyncNode->syncCfg = pSyncInfo->syncCfg; + memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); + pSyncNode->pFsm = pSyncInfo->pFsm; + + pSyncNode->rpcClient = pSyncInfo->rpcClient; + pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + + pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex]; + pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; + + int j = 0; + for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) { + if (i != pSyncInfo->syncCfg.myIndex) { + pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i]; + j++; + } + } + + pSyncNode->role = TAOS_SYNC_STATE_FOLLOWER; + syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); + pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = 1000; atomic_store_8(&pSyncNode->pingTimerStart, 0); pSyncNode->FpPingTimer = syncNodePingTimerCb; pSyncNode->pingTimerCounter = 0; - pSyncNode->rpcClient = pSyncInfo->rpcClient; - pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; - pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; @@ -97,10 +116,11 @@ void syncNodePingAll(SSyncNode* pSyncNode) { sTrace("syncNodePingAll %p ", pSyncNode); int32_t ret = 0; for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { - SyncPing* pSyncPing; - SRaftId raftId; - syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &raftId); - ret = syncNodePing(pSyncNode, &raftId, pSyncPing); + SyncPing* pMsg = syncPingBuild(strlen("ping") + 1); + memcpy(pMsg->data, "ping", strlen("ping") + 1); + syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &pMsg->destId); + pMsg->srcId = pSyncNode->raftId; + ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); assert(ret == 0); } } @@ -118,10 +138,11 @@ void syncNodePingPeers(SSyncNode* pSyncNode) { void syncNodePingSelf(SSyncNode* pSyncNode) { int32_t ret = 0; - SyncPing* pSyncPing; - SRaftId raftId; - syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &raftId); - ret = syncNodePing(pSyncNode, &raftId, pSyncPing); + SyncPing* pMsg = syncPingBuild(strlen("ping") + 1); + memcpy(pMsg->data, "ping", strlen("ping") + 1); + pMsg->destId = pSyncNode->raftId; + pMsg->srcId = pSyncNode->raftId; + ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); assert(ret == 0); } @@ -146,10 +167,10 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { // ------ local funciton --------- static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { - int32_t ret = 0; - SRpcMsg* rpcMsg; - syncPing2RpcMsg(pMsg, rpcMsg); - syncNodeSendMsgById(destRaftId, pSyncNode, rpcMsg); + int32_t ret = 0; + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); return ret; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 080840bbf6403d8cfb0400d0a5e54ab82ba87953..0066ab783e1a3f2586fa84b8fb6cd22cefb5bd72 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -22,7 +22,7 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { uint64_t u64; uint32_t hostU32 = (uint32_t)inet_addr(host); - assert(hostU32 != (uint32_t)-1); + // assert(hostU32 != (uint32_t)-1); u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16); return u64; }