diff --git a/src/inc/tsync.h b/src/inc/tsync.h index ff9c9901bd91e05f443ada68508cdccbc13ef066..ca0f70d104d603d176d89dd5b92979433f390466 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -103,6 +103,9 @@ typedef struct { typedef void* tsync_h; +int32_t syncInit(); +void syncCleanUp(); + tsync_h syncStart(const SSyncInfo *); void syncStop(tsync_h shandle); int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index da40160bc19fea6babf2d2cddf9ea6b6f261f2fe..68f2ed122c989c66e967c6f92e1c470ab897d72c 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -42,11 +42,9 @@ int tsSyncTimer = 1; int tsSyncNum; // number of sync in process in whole system char tsNodeFqdn[TSDB_FQDN_LEN]; -static int tsNodeNum; // number of nodes in system static ttpool_h tsTcpPool; static void *syncTmrCtrl = NULL; static void *vgIdHash; -static pthread_once_t syncModuleInit = PTHREAD_ONCE_INIT; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -75,7 +73,7 @@ char* syncRole[] = { "master" }; -static void syncModuleInitFunc() { +int32_t syncInit() { SPoolInfo info; info.numOfThreads = tsSyncTcpThreads; @@ -87,25 +85,52 @@ static void syncModuleInitFunc() { info.processIncomingConn = syncProcessIncommingConnection; tsTcpPool = taosOpenTcpThreadPool(&info); - if (tsTcpPool == NULL) return; + if (tsTcpPool == NULL) { + sError("failed to init tcpPool"); + return -1; + } syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); if (syncTmrCtrl == NULL) { + sError("failed to init tmrCtrl"); taosCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; - return; + return -1; } - + vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); if (vgIdHash == NULL) { + sError("failed to init vgIdHash"); taosTmrCleanUp(syncTmrCtrl); taosCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; syncTmrCtrl = NULL; - return; - } + return -1; + } tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); + sInfo("sync module initialized successfully"); + + return 0; +} + +void syncCleanUp() { + if (tsTcpPool) { + taosCloseTcpThreadPool(tsTcpPool); + tsTcpPool = NULL; + } + + if (syncTmrCtrl) { + taosTmrCleanUp(syncTmrCtrl); + syncTmrCtrl = NULL; + } + + if (vgIdHash) { + vgIdHash = NULL; + taosHashCleanup(vgIdHash); + } + + sInfo("sync module is cleaned up"); } void *syncStart(const SSyncInfo *pInfo) { @@ -118,15 +143,6 @@ void *syncStart(const SSyncInfo *pInfo) { return NULL; } - pthread_once(&syncModuleInit, syncModuleInitFunc); - if (tsTcpPool == NULL) { - free(pNode); - syncModuleInit = PTHREAD_ONCE_INIT; - sError("failed to init sync module(%s)", tstrerror(errno)); - return NULL; - } - - atomic_add_fetch_32(&tsNodeNum, 1); tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); pthread_mutex_init(&pNode->mutex, NULL); @@ -435,17 +451,6 @@ static void syncDecNodeRef(SSyncNode *pNode) taosTFree(pNode->pRecv); taosTFree(pNode->pSyncFwds); taosTFree(pNode); - - if (atomic_sub_fetch_32(&tsNodeNum, 1) == 0) { - if (tsTcpPool) taosCloseTcpThreadPool(tsTcpPool); - if (syncTmrCtrl) taosTmrCleanUp(syncTmrCtrl); - if (vgIdHash) taosHashCleanup(vgIdHash); - syncTmrCtrl = NULL; - tsTcpPool = NULL; - vgIdHash = NULL; - syncModuleInit = PTHREAD_ONCE_INIT; - sDebug("sync module is cleaned up"); - } } } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 4a2d9859b943c10550ae69b119627f6025b073bd..a4e88fb9468e47f91e908ea669456d52b2592765 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -57,6 +57,9 @@ void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} #endif int32_t vnodeInitResources() { + int code = syncInit(); + if (code != 0) return code; + vnodeInitWriteFp(); vnodeInitReadFp(); @@ -70,11 +73,12 @@ int32_t vnodeInitResources() { } void vnodeCleanupResources() { - if (tsDnodeVnodesHash != NULL) { taosHashCleanup(tsDnodeVnodesHash); tsDnodeVnodesHash = NULL; } + + syncCleanUp(); } int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {