提交 2dc480ad 编写于 作者: L lichuang

[TD-10645][raft]<feature>sync manager

上级 4e902908
......@@ -152,7 +152,7 @@ int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool i
// int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
extern int32_t syncDebugFlag;
extern int32_t sDebugFlag;
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
......@@ -13,15 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_RAFT_INT_H_
#define _TD_RAFT_INT_H_
#ifndef _TD_LIBS_SYNC_RAFT_H
#define _TD_LIBS_SYNC_RAFT_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SSyncRaft {
} SSyncRaft;
#ifdef __cplusplus
}
#endif
#endif /*_TD_RAFT_INT_H_*/
\ No newline at end of file
#endif /* _TD_LIBS_SYNC_RAFT_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_INT_H
#define _TD_LIBS_SYNC_INT_H
#include "thash.h"
#include "os.h"
#include "sync.h"
#include "raft.h"
#include "tlog.h"
#define TAOS_SYNC_MAX_WORKER 3
typedef struct SSyncWorker {
pthread_t thread;
} SSyncWorker;
struct SSyncNode {
pthread_mutex_t mutex;
SyncGroupId vgId;
SSyncRaft raft;
};
typedef struct SSyncManager {
pthread_mutex_t mutex;
// worker threads
SSyncWorker worker[TAOS_SYNC_MAX_WORKER];
// vgroup hash table
SHashObj* vgroupTable;
} SSyncManager;
extern SSyncManager* gSyncManager;
#define syncFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYNC FATAL ", 255, __VA_ARGS__); }} while(0)
#define syncError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYNC ERROR ", 255, __VA_ARGS__); }} while(0)
#define syncWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYNC WARN ", 255, __VA_ARGS__); }} while(0)
#define syncInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYNC ", 255, __VA_ARGS__); }} while(0)
#define syncDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYNC ", sDebugFlag, __VA_ARGS__); }} while(0)
#define syncTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYNC ", sDebugFlag, __VA_ARGS__); }} while(0)
#endif /* _TD_LIBS_SYNC_INT_H */
\ No newline at end of file
......@@ -13,14 +13,132 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync.h"
#include "syncInt.h"
int32_t syncInit() { return 0; }
SSyncManager* gSyncManager = NULL;
void syncCleanUp() {}
static int syncOpenWorkerPool(SSyncManager* syncManager);
static int syncCloseWorkerPool(SSyncManager* syncManager);
static void *syncWorkerMain(void *argv);
SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; }
int32_t syncInit() {
if (gSyncManager != NULL) {
return 0;
}
void syncStop(const SSyncNode* pNode) {}
gSyncManager = (SSyncManager*)malloc(sizeof(SSyncManager));
if (gSyncManager == NULL) {
syncError("malloc SSyncManager fail");
return -1;
}
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
\ No newline at end of file
pthread_mutex_init(&gSyncManager->mutex, NULL);
// init worker pool
if (syncOpenWorkerPool(gSyncManager) != 0) {
syncCleanUp();
return -1;
}
// init vgroup hash table
gSyncManager->vgroupTable = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (gSyncManager->vgroupTable == NULL) {
syncCleanUp();
return -1;
}
return 0;
}
void syncCleanUp() {
if (gSyncManager == NULL) {
return;
}
pthread_mutex_lock(&gSyncManager->mutex);
if (gSyncManager->vgroupTable) {
taosHashCleanup(gSyncManager->vgroupTable);
}
syncCloseWorkerPool(gSyncManager);
pthread_mutex_unlock(&gSyncManager->mutex);
pthread_mutex_destroy(&gSyncManager->mutex);
free(gSyncManager);
gSyncManager = NULL;
}
SSyncNode* syncStart(const SSyncInfo* pInfo) {
pthread_mutex_lock(&gSyncManager->mutex);
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId));
if (ppNode != NULL) {
syncInfo("vgroup %d already exist", pInfo->vgId);
pthread_mutex_unlock(&gSyncManager->mutex);
return *ppNode;
}
SSyncNode *pNode = (SSyncNode*)malloc(sizeof(SSyncNode));
if (pNode == NULL) {
syncInfo("malloc vgroup %d node fail", pInfo->vgId);
pthread_mutex_unlock(&gSyncManager->mutex);
return NULL;
}
pthread_mutex_init(&pNode->mutex, NULL);
taosHashPut(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId), &pNode, sizeof(SSyncNode *));
pthread_mutex_unlock(&gSyncManager->mutex);
return NULL;
}
void syncStop(const SSyncNode* pNode) {
pthread_mutex_lock(&gSyncManager->mutex);
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId));
if (ppNode == NULL) {
syncInfo("vgroup %d not exist", pNode->vgId);
pthread_mutex_unlock(&gSyncManager->mutex);
return;
}
assert(*ppNode == pNode);
taosHashRemove(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId));
pthread_mutex_unlock(&gSyncManager->mutex);
pthread_mutex_destroy(&pNode->mutex);
free(*ppNode);
}
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
static int syncOpenWorkerPool(SSyncManager* syncManager) {
int i;
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (i = 0; i < TAOS_SYNC_MAX_WORKER; ++i) {
SSyncWorker* pWorker = &(syncManager->worker[i]);
if (pthread_create(&(pWorker->thread), &thattr, (void *)syncWorkerMain, pWorker) != 0) {
syncError("failed to create sync worker since %s", strerror(errno));
return -1;
}
}
pthread_attr_destroy(&thattr);
return 0;
}
static int syncCloseWorkerPool(SSyncManager* syncManager) {
return 0;
}
static void *syncWorkerMain(void *argv) {
SSyncWorker* pWorker = (SSyncWorker *)argv;
taosBlockSIGPIPE();
setThreadName("syncWorker");
return NULL;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册