From 2dc480adf632360694843fd181665308cf1d85e8 Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 29 Oct 2021 14:31:21 +0800 Subject: [PATCH] [TD-10645][raft]sync manager --- include/libs/sync/sync.h | 2 +- source/libs/sync/inc/{raftInt.h => raft.h} | 18 ++- source/libs/sync/inc/syncInt.h | 57 +++++++++ source/libs/sync/src/sync.c | 130 ++++++++++++++++++++- 4 files changed, 189 insertions(+), 18 deletions(-) rename source/libs/sync/inc/{raftInt.h => raft.h} (74%) create mode 100644 source/libs/sync/inc/syncInt.h diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 9ffd74c229..f9d348d77e 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -152,7 +152,7 @@ int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool i // int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); -extern int32_t syncDebugFlag; +extern int32_t sDebugFlag; #ifdef __cplusplus } diff --git a/source/libs/sync/inc/raftInt.h b/source/libs/sync/inc/raft.h similarity index 74% rename from source/libs/sync/inc/raftInt.h rename to source/libs/sync/inc/raft.h index 75c1c2187f..78c0c97ed6 100644 --- a/source/libs/sync/inc/raftInt.h +++ b/source/libs/sync/inc/raft.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 TAOS Data, Inc. + * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -13,15 +13,11 @@ * along with this program. If not, see . */ -#ifndef _TD_RAFT_INT_H_ -#define _TD_RAFT_INT_H_ +#ifndef _TD_LIBS_SYNC_RAFT_H +#define _TD_LIBS_SYNC_RAFT_H -#ifdef __cplusplus -extern "C" { -#endif +typedef struct SSyncRaft { + +} SSyncRaft; -#ifdef __cplusplus -} -#endif - -#endif /*_TD_RAFT_INT_H_*/ \ No newline at end of file +#endif /* _TD_LIBS_SYNC_RAFT_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h new file mode 100644 index 0000000000..33cbd836a1 --- /dev/null +++ b/source/libs/sync/inc/syncInt.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_INT_H +#define _TD_LIBS_SYNC_INT_H + +#include "thash.h" +#include "os.h" +#include "sync.h" +#include "raft.h" +#include "tlog.h" + +#define TAOS_SYNC_MAX_WORKER 3 + +typedef struct SSyncWorker { + pthread_t thread; +} SSyncWorker; + +struct SSyncNode { + pthread_mutex_t mutex; + SyncGroupId vgId; + SSyncRaft raft; +}; + +typedef struct SSyncManager { + pthread_mutex_t mutex; + + // worker threads + SSyncWorker worker[TAOS_SYNC_MAX_WORKER]; + + // vgroup hash table + SHashObj* vgroupTable; + +} SSyncManager; + +extern SSyncManager* gSyncManager; + +#define syncFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYNC FATAL ", 255, __VA_ARGS__); }} while(0) +#define syncError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYNC ERROR ", 255, __VA_ARGS__); }} while(0) +#define syncWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYNC WARN ", 255, __VA_ARGS__); }} while(0) +#define syncInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYNC ", 255, __VA_ARGS__); }} while(0) +#define syncDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYNC ", sDebugFlag, __VA_ARGS__); }} while(0) +#define syncTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYNC ", sDebugFlag, __VA_ARGS__); }} while(0) + +#endif /* _TD_LIBS_SYNC_INT_H */ \ No newline at end of file diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index 879f2d4f6d..a974a17ad2 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -13,14 +13,132 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncInt.h" -int32_t syncInit() { return 0; } +SSyncManager* gSyncManager = NULL; -void syncCleanUp() {} +static int syncOpenWorkerPool(SSyncManager* syncManager); +static int syncCloseWorkerPool(SSyncManager* syncManager); +static void *syncWorkerMain(void *argv); -SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; } +int32_t syncInit() { + if (gSyncManager != NULL) { + return 0; + } -void syncStop(const SSyncNode* pNode) {} + gSyncManager = (SSyncManager*)malloc(sizeof(SSyncManager)); + if (gSyncManager == NULL) { + syncError("malloc SSyncManager fail"); + return -1; + } -void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {} \ No newline at end of file + pthread_mutex_init(&gSyncManager->mutex, NULL); + // init worker pool + if (syncOpenWorkerPool(gSyncManager) != 0) { + syncCleanUp(); + return -1; + } + + // init vgroup hash table + gSyncManager->vgroupTable = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (gSyncManager->vgroupTable == NULL) { + syncCleanUp(); + return -1; + } + return 0; +} + +void syncCleanUp() { + if (gSyncManager == NULL) { + return; + } + pthread_mutex_lock(&gSyncManager->mutex); + if (gSyncManager->vgroupTable) { + taosHashCleanup(gSyncManager->vgroupTable); + } + syncCloseWorkerPool(gSyncManager); + pthread_mutex_unlock(&gSyncManager->mutex); + pthread_mutex_destroy(&gSyncManager->mutex); + free(gSyncManager); + gSyncManager = NULL; +} + +SSyncNode* syncStart(const SSyncInfo* pInfo) { + pthread_mutex_lock(&gSyncManager->mutex); + + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId)); + if (ppNode != NULL) { + syncInfo("vgroup %d already exist", pInfo->vgId); + pthread_mutex_unlock(&gSyncManager->mutex); + return *ppNode; + } + + SSyncNode *pNode = (SSyncNode*)malloc(sizeof(SSyncNode)); + if (pNode == NULL) { + syncInfo("malloc vgroup %d node fail", pInfo->vgId); + pthread_mutex_unlock(&gSyncManager->mutex); + return NULL; + } + + pthread_mutex_init(&pNode->mutex, NULL); + + taosHashPut(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId), &pNode, sizeof(SSyncNode *)); + + pthread_mutex_unlock(&gSyncManager->mutex); + return NULL; +} + +void syncStop(const SSyncNode* pNode) { + pthread_mutex_lock(&gSyncManager->mutex); + + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId)); + if (ppNode == NULL) { + syncInfo("vgroup %d not exist", pNode->vgId); + pthread_mutex_unlock(&gSyncManager->mutex); + return; + } + assert(*ppNode == pNode); + + taosHashRemove(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId)); + pthread_mutex_unlock(&gSyncManager->mutex); + + pthread_mutex_destroy(&pNode->mutex); + free(*ppNode); +} + +void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {} + +static int syncOpenWorkerPool(SSyncManager* syncManager) { + int i; + pthread_attr_t thattr; + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + for (i = 0; i < TAOS_SYNC_MAX_WORKER; ++i) { + SSyncWorker* pWorker = &(syncManager->worker[i]); + + if (pthread_create(&(pWorker->thread), &thattr, (void *)syncWorkerMain, pWorker) != 0) { + syncError("failed to create sync worker since %s", strerror(errno)); + + return -1; + } + } + + pthread_attr_destroy(&thattr); + + return 0; +} + +static int syncCloseWorkerPool(SSyncManager* syncManager) { + return 0; +} + +static void *syncWorkerMain(void *argv) { + SSyncWorker* pWorker = (SSyncWorker *)argv; + + taosBlockSIGPIPE(); + setThreadName("syncWorker"); + + return NULL; +} \ No newline at end of file -- GitLab