提交 5b7261d6 编写于 作者: L lichuang

[TD-10645][raft]<feature>add sync node timer

上级 c319d1cb
......@@ -29,5 +29,6 @@ typedef struct SSyncRaft {
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo);
int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg);
int32_t syncRaftTick(SSyncRaft* pRaft);
#endif /* _TD_LIBS_SYNC_RAFT_H */
\ No newline at end of file
......@@ -73,4 +73,6 @@ static FORCE_INLINE bool syncIsInternalMsg(const RaftMessage* pMsg) {
return pMsg->msgType == RAFT_MSG_INTERNAL_PROP;
}
void syncFreeMessage(const RaftMessage* pMsg);
#endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */
\ No newline at end of file
......@@ -30,8 +30,10 @@ typedef struct SSyncWorker {
struct SSyncNode {
pthread_mutex_t mutex;
int32_t refCount;
SyncGroupId vgId;
SSyncRaft raft;
void* syncTimer;
};
typedef struct SSyncManager {
......@@ -46,6 +48,9 @@ typedef struct SSyncManager {
// vgroup hash table
SHashObj* vgroupTable;
// timer manager
void* syncTimerManager;
} SSyncManager;
extern SSyncManager* gSyncManager;
......
......@@ -67,8 +67,10 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
}
int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg) {
if (!syncIsInternalMsg(pMsg)) {
free(pMsg);
}
syncFreeMessage(pMsg);
return 0;
}
int32_t syncRaftTick(SSyncRaft* pRaft) {
return 0;
}
\ No newline at end of file
......@@ -15,3 +15,8 @@
#include "raft_message.h"
void syncFreeMessage(const RaftMessage* pMsg) {
if (!syncIsInternalMsg(pMsg)) {
free((RaftMessage*)pMsg);
}
}
\ No newline at end of file
......@@ -14,12 +14,16 @@
*/
#include "syncInt.h"
#include "ttimer.h"
SSyncManager* gSyncManager = NULL;
#define SYNC_TICK_TIMER 50
static int syncOpenWorkerPool(SSyncManager* syncManager);
static int syncCloseWorkerPool(SSyncManager* syncManager);
static void *syncWorkerMain(void *argv);
static void syncNodeTick(void *param, void *tmrId);
int32_t syncInit() {
if (gSyncManager != NULL) {
......@@ -33,6 +37,14 @@ int32_t syncInit() {
}
pthread_mutex_init(&gSyncManager->mutex, NULL);
// init sync timer manager
gSyncManager->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
if (gSyncManager->syncTimerManager == NULL) {
syncCleanUp();
return -1;
}
// init worker pool
if (syncOpenWorkerPool(gSyncManager) != 0) {
syncCleanUp();
......@@ -56,6 +68,7 @@ void syncCleanUp() {
if (gSyncManager->vgroupTable) {
taosHashCleanup(gSyncManager->vgroupTable);
}
taosTmrCleanUp(gSyncManager->syncTimerManager);
syncCloseWorkerPool(gSyncManager);
pthread_mutex_unlock(&gSyncManager->mutex);
pthread_mutex_destroy(&gSyncManager->mutex);
......@@ -80,6 +93,8 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) {
return NULL;
}
pNode->syncTimer = taosTmrStart(syncNodeTick, SYNC_TICK_TIMER, (void*)pInfo->vgId, gSyncManager->syncTimerManager);
// start raft
pNode->raft.pNode = pNode;
if (syncRaftStart(&pNode->raft, pInfo) != 0) {
......@@ -106,6 +121,7 @@ void syncStop(const SSyncNode* pNode) {
return;
}
assert(*ppNode == pNode);
taosTmrStop(pNode->syncTimer);
taosHashRemove(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId));
pthread_mutex_unlock(&gSyncManager->mutex);
......@@ -159,3 +175,18 @@ static void *syncWorkerMain(void *argv) {
return NULL;
}
static void syncNodeTick(void *param, void *tmrId) {
SyncGroupId vgId = (SyncGroupId)param;
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId));
if (ppNode == NULL) {
return;
}
SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&pNode->mutex);
syncRaftTick(&pNode->raft);
pthread_mutex_unlock(&pNode->mutex);
pNode->syncTimer = taosTmrStart(syncNodeTick, SYNC_TICK_TIMER, (void*)pNode->vgId, gSyncManager->syncTimerManager);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册