提交 4e18484b 编写于 作者: S Shengliang Guan

TD-1382 TD-1383

上级 880aaf57
...@@ -103,6 +103,9 @@ typedef struct { ...@@ -103,6 +103,9 @@ typedef struct {
typedef void* tsync_h; typedef void* tsync_h;
int32_t syncInit();
void syncCleanUp();
tsync_h syncStart(const SSyncInfo *); tsync_h syncStart(const SSyncInfo *);
void syncStop(tsync_h shandle); void syncStop(tsync_h shandle);
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); int32_t syncReconfig(tsync_h shandle, const SSyncCfg *);
......
...@@ -42,11 +42,9 @@ int tsSyncTimer = 1; ...@@ -42,11 +42,9 @@ int tsSyncTimer = 1;
int tsSyncNum; // number of sync in process in whole system int tsSyncNum; // number of sync in process in whole system
char tsNodeFqdn[TSDB_FQDN_LEN]; char tsNodeFqdn[TSDB_FQDN_LEN];
static int tsNodeNum; // number of nodes in system
static ttpool_h tsTcpPool; static ttpool_h tsTcpPool;
static void *syncTmrCtrl = NULL; static void *syncTmrCtrl = NULL;
static void *vgIdHash; static void *vgIdHash;
static pthread_once_t syncModuleInit = PTHREAD_ONCE_INIT;
// local functions // local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
...@@ -75,7 +73,7 @@ char* syncRole[] = { ...@@ -75,7 +73,7 @@ char* syncRole[] = {
"master" "master"
}; };
static void syncModuleInitFunc() { int32_t syncInit() {
SPoolInfo info; SPoolInfo info;
info.numOfThreads = tsSyncTcpThreads; info.numOfThreads = tsSyncTcpThreads;
...@@ -87,25 +85,52 @@ static void syncModuleInitFunc() { ...@@ -87,25 +85,52 @@ static void syncModuleInitFunc() {
info.processIncomingConn = syncProcessIncommingConnection; info.processIncomingConn = syncProcessIncommingConnection;
tsTcpPool = taosOpenTcpThreadPool(&info); tsTcpPool = taosOpenTcpThreadPool(&info);
if (tsTcpPool == NULL) return; if (tsTcpPool == NULL) {
sError("failed to init tcpPool");
return -1;
}
syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
if (syncTmrCtrl == NULL) { if (syncTmrCtrl == NULL) {
sError("failed to init tmrCtrl");
taosCloseTcpThreadPool(tsTcpPool); taosCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
return; return -1;
} }
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
if (vgIdHash == NULL) { if (vgIdHash == NULL) {
sError("failed to init vgIdHash");
taosTmrCleanUp(syncTmrCtrl); taosTmrCleanUp(syncTmrCtrl);
taosCloseTcpThreadPool(tsTcpPool); taosCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
syncTmrCtrl = NULL; syncTmrCtrl = NULL;
return; return -1;
} }
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
sInfo("sync module initialized successfully");
return 0;
}
void syncCleanUp() {
if (tsTcpPool) {
taosCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
}
if (syncTmrCtrl) {
taosTmrCleanUp(syncTmrCtrl);
syncTmrCtrl = NULL;
}
if (vgIdHash) {
vgIdHash = NULL;
taosHashCleanup(vgIdHash);
}
sInfo("sync module is cleaned up");
} }
void *syncStart(const SSyncInfo *pInfo) { void *syncStart(const SSyncInfo *pInfo) {
...@@ -118,15 +143,6 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -118,15 +143,6 @@ void *syncStart(const SSyncInfo *pInfo) {
return NULL; return NULL;
} }
pthread_once(&syncModuleInit, syncModuleInitFunc);
if (tsTcpPool == NULL) {
free(pNode);
syncModuleInit = PTHREAD_ONCE_INIT;
sError("failed to init sync module(%s)", tstrerror(errno));
return NULL;
}
atomic_add_fetch_32(&tsNodeNum, 1);
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
pthread_mutex_init(&pNode->mutex, NULL); pthread_mutex_init(&pNode->mutex, NULL);
...@@ -435,17 +451,6 @@ static void syncDecNodeRef(SSyncNode *pNode) ...@@ -435,17 +451,6 @@ static void syncDecNodeRef(SSyncNode *pNode)
taosTFree(pNode->pRecv); taosTFree(pNode->pRecv);
taosTFree(pNode->pSyncFwds); taosTFree(pNode->pSyncFwds);
taosTFree(pNode); taosTFree(pNode);
if (atomic_sub_fetch_32(&tsNodeNum, 1) == 0) {
if (tsTcpPool) taosCloseTcpThreadPool(tsTcpPool);
if (syncTmrCtrl) taosTmrCleanUp(syncTmrCtrl);
if (vgIdHash) taosHashCleanup(vgIdHash);
syncTmrCtrl = NULL;
tsTcpPool = NULL;
vgIdHash = NULL;
syncModuleInit = PTHREAD_ONCE_INIT;
sDebug("sync module is cleaned up");
}
} }
} }
......
...@@ -57,6 +57,9 @@ void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} ...@@ -57,6 +57,9 @@ void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
#endif #endif
int32_t vnodeInitResources() { int32_t vnodeInitResources() {
int code = syncInit();
if (code != 0) return code;
vnodeInitWriteFp(); vnodeInitWriteFp();
vnodeInitReadFp(); vnodeInitReadFp();
...@@ -70,11 +73,12 @@ int32_t vnodeInitResources() { ...@@ -70,11 +73,12 @@ int32_t vnodeInitResources() {
} }
void vnodeCleanupResources() { void vnodeCleanupResources() {
if (tsDnodeVnodesHash != NULL) { if (tsDnodeVnodesHash != NULL) {
taosHashCleanup(tsDnodeVnodesHash); taosHashCleanup(tsDnodeVnodesHash);
tsDnodeVnodesHash = NULL; tsDnodeVnodesHash = NULL;
} }
syncCleanUp();
} }
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册