提交 eaf6142c 编写于 作者: M Minghao Li

sync modify timer

上级 c9efe3de
...@@ -137,7 +137,9 @@ typedef struct SSyncInfo { ...@@ -137,7 +137,9 @@ typedef struct SSyncInfo {
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
SSyncFSM* pFsm; SSyncFSM* pFsm;
int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
......
...@@ -34,17 +34,18 @@ typedef struct SSyncEnv { ...@@ -34,17 +34,18 @@ typedef struct SSyncEnv {
tmr_h pEnvTickTimer; tmr_h pEnvTickTimer;
tmr_h pTimerManager; tmr_h pTimerManager;
char name[128]; char name[128];
} SSyncEnv; } SSyncEnv;
extern SSyncEnv *gSyncEnv; extern SSyncEnv* gSyncEnv;
int32_t syncEnvStart(); int32_t syncEnvStart();
int32_t syncEnvStop(); int32_t syncEnvStop();
tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void *param); tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param);
void syncEnvStopTimer(tmr_h *pTimer); void syncEnvStopTimer(tmr_h* pTimer);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -30,10 +30,10 @@ extern "C" { ...@@ -30,10 +30,10 @@ extern "C" {
#include "trpc.h" #include "trpc.h"
typedef struct SSyncIO { typedef struct SSyncIO {
void *serverRpc; void * serverRpc;
void *clientRpc; void * clientRpc;
STaosQueue *pMsgQ; STaosQueue *pMsgQ;
STaosQset *pQset; STaosQset * pQset;
pthread_t tid; pthread_t tid;
int8_t isStart; int8_t isStart;
......
...@@ -146,7 +146,8 @@ typedef struct SSyncNode { ...@@ -146,7 +146,8 @@ typedef struct SSyncNode {
int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
// passed from outside // passed from outside
int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
} SSyncNode; } SSyncNode;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_UTIL_H
#define _TD_LIBS_SYNC_UTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
void nodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet);
void raftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_UTIL_H*/
...@@ -18,10 +18,14 @@ ...@@ -18,10 +18,14 @@
#include "syncEnv.h" #include "syncEnv.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "syncUtil.h"
static int32_t tsNodeRefId = -1; static int32_t tsNodeRefId = -1;
// ------ local funciton --------- // ------ local funciton ---------
static int32_t doSyncNodeSendMsgById(SRaftId* destRaftId, struct SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t doSyncNodeSendMsgByInfo(SNodeInfo* nodeInfo, struct SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg); static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg);
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg); static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg);
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg); static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg);
...@@ -68,7 +72,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -68,7 +72,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->FpPingTimer = syncNodePingTimerCb; pSyncNode->FpPingTimer = syncNodePingTimerCb;
pSyncNode->pingTimerCounter = 0; pSyncNode->pingTimerCounter = 0;
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->FpOnPing = onSyncNodePing; pSyncNode->FpOnPing = onSyncNodePing;
pSyncNode->FpOnPingReply = onSyncNodePingReply; pSyncNode->FpOnPingReply = onSyncNodePingReply;
pSyncNode->FpOnRequestVote = onSyncNodeRequestVote; pSyncNode->FpOnRequestVote = onSyncNodeRequestVote;
...@@ -84,7 +90,11 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -84,7 +90,11 @@ void syncNodeClose(SSyncNode* pSyncNode) {
free(pSyncNode); free(pSyncNode);
} }
void syncNodePingAll(SSyncNode* pSyncNode) { sTrace("syncNodePingAll %p ", pSyncNode); } void syncNodePingAll(SSyncNode* pSyncNode) {
sTrace("syncNodePingAll %p ", pSyncNode);
SyncPing msg;
doSyncNodePing(pSyncNode, &msg);
}
void syncNodePingPeers(SSyncNode* pSyncNode) {} void syncNodePingPeers(SSyncNode* pSyncNode) {}
...@@ -110,8 +120,30 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { ...@@ -110,8 +120,30 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
} }
// ------ local funciton --------- // ------ local funciton ---------
static int32_t doSyncNodeSendMsgById(SRaftId* destRaftId, struct SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
raftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
static int32_t doSyncNodeSendMsgByInfo(SNodeInfo* nodeInfo, struct SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
nodeInfo2EpSet(nodeInfo, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) { static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) {
int32_t ret = 0; int32_t ret;
for (int i = 0; i < ths->syncCfg.replicaNum; ++i) {
SRpcMsg* rpcMsg;
syncPing2RpcMsg(pMsg, rpcMsg);
doSyncNodeSendMsgByInfo(&ths->syncCfg.nodeInfo[i], ths, rpcMsg);
}
return ret; return ret;
} }
...@@ -167,6 +199,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { ...@@ -167,6 +199,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) {
taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
syncNodePingSelf(pSyncNode); syncNodePingAll(pSyncNode);
} }
} }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncUtil.h"
void nodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) {}
void raftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {}
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {}
\ No newline at end of file
...@@ -18,6 +18,7 @@ SSyncNode* doSync() { ...@@ -18,6 +18,7 @@ SSyncNode* doSync() {
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = 1; syncInfo.vgId = 1;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册