diff --git a/src/sync/CMakeLists.txt b/src/sync/CMakeLists.txt index 60271c771ca0a01bd449cb878fe2269759250fd3..aa38a56f38146bafda8a10ca786bf085bb4ea339 100644 --- a/src/sync/CMakeLists.txt +++ b/src/sync/CMakeLists.txt @@ -5,12 +5,12 @@ INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) IF (TD_LINUX) - LIST(REMOVE_ITEM SRC src/tarbitrator.c) + LIST(REMOVE_ITEM SRC src/syncArbitrator.c) ADD_LIBRARY(sync ${SRC}) TARGET_LINK_LIBRARIES(sync tutil pthread common) - LIST(APPEND BIN_SRC src/tarbitrator.c) - LIST(APPEND BIN_SRC src/taosTcpPool.c) + LIST(APPEND BIN_SRC src/syncArbitrator.c) + LIST(APPEND BIN_SRC src/syncTcp.c) ADD_EXECUTABLE(tarbitrator ${BIN_SRC}) TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil) diff --git a/src/sync/inc/taosTcpPool.h b/src/sync/inc/syncTcp.h similarity index 77% rename from src/sync/inc/taosTcpPool.h rename to src/sync/inc/syncTcp.h index 41043b0cd4c886616d5cecd2739eae684052c395..7db51f2a7115ccf23ee42d17cbe62f0798adf260 100644 --- a/src/sync/inc/taosTcpPool.h +++ b/src/sync/inc/syncTcp.h @@ -13,16 +13,13 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_TCP_POOL_H -#define TDENGINE_TCP_POOL_H +#ifndef TDENGINE_SYNC_TCP_POOL_H +#define TDENGINE_SYNC_TCP_POOL_H #ifdef __cplusplus extern "C" { #endif -typedef void *ttpool_h; -typedef void *tthread_h; - typedef struct { int32_t numOfThreads; uint32_t serverIp; @@ -33,10 +30,10 @@ typedef struct { void (*processIncomingConn)(int32_t fd, uint32_t ip); } SPoolInfo; -ttpool_h taosOpenTcpThreadPool(SPoolInfo *pInfo); -void taosCloseTcpThreadPool(ttpool_h); -void * taosAllocateTcpConn(void *, void *ahandle, int32_t connFd); -void taosFreeTcpConn(void *); +void *syncOpenTcpThreadPool(SPoolInfo *pInfo); +void syncCloseTcpThreadPool(void *); +void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd); +void syncFreeTcpConn(void *); #ifdef __cplusplus } diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/syncArbitrator.c similarity index 90% rename from src/sync/src/tarbitrator.c rename to src/sync/src/syncArbitrator.c index 4016042de2135f732dfeb2bbc3b0fdc65b1b63f6..971fac26aa02d27211da88aa9e7b6412b539ef16 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/syncArbitrator.c @@ -22,17 +22,17 @@ #include "tsocket.h" #include "tglobal.h" #include "taoserror.h" -#include "taosTcpPool.h" #include "twal.h" #include "tsync.h" #include "syncInt.h" +#include "syncTcp.h" -static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); -static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); -static void arbProcessBrokenLink(void *param); -static int32_t arbProcessPeerMsg(void *param, void *buffer); -static tsem_t tsArbSem; -static ttpool_h tsArbTcpPool; +static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); +static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); +static void arbProcessBrokenLink(void *param); +static int32_t arbProcessPeerMsg(void *param, void *buffer); +static tsem_t tsArbSem; +static void * tsArbTcpPool; typedef struct { char id[TSDB_EP_LEN + 24]; @@ -90,7 +90,7 @@ int32_t main(int32_t argc, char *argv[]) { info.processBrokenLink = arbProcessBrokenLink; info.processIncomingMsg = arbProcessPeerMsg; info.processIncomingConn = arbProcessIncommingConnection; - tsArbTcpPool = taosOpenTcpThreadPool(&info); + tsArbTcpPool = syncOpenTcpThreadPool(&info); if (tsArbTcpPool == NULL) { sDebug("failed to open TCP thread pool, exit..."); @@ -101,8 +101,8 @@ int32_t main(int32_t argc, char *argv[]) { tsem_wait(&tsArbSem); - taosCloseTcpThreadPool(tsArbTcpPool); - sInfo("TAOS arbitrator is shut down\n"); + syncCloseTcpThreadPool(tsArbTcpPool); + sInfo("TAOS arbitrator is shut down"); closelog(); return 0; @@ -138,7 +138,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { sDebug("%s, arbitrator request is accepted", pNode->id); pNode->nodeFd = connFd; - pNode->pConn = taosAllocateTcpConn(tsArbTcpPool, pNode, connFd); + pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd); return; } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index aae5dab3cdd5d394307dbbc8c5c4b0040a05d4a7..847dbc2f5dd63ed1daa763eb0f2280522acf64fb 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -23,10 +23,10 @@ #include "tsocket.h" #include "tglobal.h" #include "taoserror.h" -#include "taosTcpPool.h" #include "tqueue.h" #include "twal.h" #include "tsync.h" +#include "syncTcp.h" #include "syncInt.h" // global configurable @@ -39,10 +39,10 @@ int32_t tsSyncTimer = 1; int32_t tsSyncNum; // number of sync in process in whole system char tsNodeFqdn[TSDB_FQDN_LEN]; -static ttpool_h tsTcpPool; -static void * tsSyncTmrCtrl = NULL; -static void * tsVgIdHash; -static int32_t tsSyncRefId = -1; +static void * tsTcpPool; +static void * tsSyncTmrCtrl = NULL; +static void * tsVgIdHash; +static int32_t tsSyncRefId = -1; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -117,7 +117,7 @@ int32_t syncInit() { info.processIncomingMsg = syncProcessPeerMsg; info.processIncomingConn = syncProcessIncommingConnection; - tsTcpPool = taosOpenTcpThreadPool(&info); + tsTcpPool = syncOpenTcpThreadPool(&info); if (tsTcpPool == NULL) { sError("failed to init tcpPool"); return -1; @@ -126,7 +126,7 @@ int32_t syncInit() { tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); if (tsSyncTmrCtrl == NULL) { sError("failed to init tmrCtrl"); - taosCloseTcpThreadPool(tsTcpPool); + syncCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; return -1; } @@ -135,7 +135,7 @@ int32_t syncInit() { if (tsVgIdHash == NULL) { sError("failed to init tsVgIdHash"); taosTmrCleanUp(tsSyncTmrCtrl); - taosCloseTcpThreadPool(tsTcpPool); + syncCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; tsSyncTmrCtrl = NULL; return -1; @@ -155,7 +155,7 @@ int32_t syncInit() { void syncCleanUp() { if (tsTcpPool) { - taosCloseTcpThreadPool(tsTcpPool); + syncCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; } @@ -509,7 +509,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { taosClose(pPeer->syncFd); if (pPeer->peerFd >= 0) { pPeer->peerFd = -1; - taosFreeTcpConn(pPeer->pConn); + syncFreeTcpConn(pPeer->pConn); } } @@ -1065,7 +1065,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; - pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); + pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); syncAddPeerRef(pPeer); } else { sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); @@ -1159,7 +1159,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd); syncClosePeerConn(pPeer); pPeer->peerFd = connFd; - pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); + pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); syncAddPeerRef(pPeer); sDebug("%s, ready to exchange data", pPeer->id); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId()); diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/syncTcp.c similarity index 89% rename from src/sync/src/taosTcpPool.c rename to src/sync/src/syncTcp.c index eb05cf7c6f8d8d9a068b60c54f0ae1426e62c429..7bfdc4e440ee886065a4cd1b8f59d09e09556a47 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/syncTcp.c @@ -19,10 +19,10 @@ #include "tutil.h" #include "tsocket.h" #include "taoserror.h" -#include "taosTcpPool.h" #include "twal.h" #include "tsync.h" #include "syncInt.h" +#include "syncTcp.h" typedef struct SThreadObj { pthread_t thread; @@ -47,12 +47,12 @@ typedef struct { int32_t closedByApp; } SConnObj; -static void *taosAcceptPeerTcpConnection(void *argv); -static void *taosProcessTcpData(void *param); -static void taosStopPoolThread(SThreadObj *pThread); -static SThreadObj *taosGetTcpThread(SPoolObj *pPool); +static void *syncAcceptPeerTcpConnection(void *argv); +static void *syncProcessTcpData(void *param); +static void syncStopPoolThread(SThreadObj *pThread); +static SThreadObj *syncGetTcpThread(SPoolObj *pPool); -void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { +void *syncOpenTcpThreadPool(SPoolInfo *pInfo) { pthread_attr_t thattr; SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); @@ -80,7 +80,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) { + if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) { sError("failed to create accept thread for TCP server since %s", strerror(errno)); close(pPool->acceptFd); tfree(pPool->pThread); @@ -94,7 +94,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { return pPool; } -void taosCloseTcpThreadPool(void *param) { +void syncCloseTcpThreadPool(void *param) { SPoolObj * pPool = param; SThreadObj *pThread; @@ -103,7 +103,7 @@ void taosCloseTcpThreadPool(void *param) { for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) { pThread = pPool->pThread[i]; - if (pThread) taosStopPoolThread(pThread); + if (pThread) syncStopPoolThread(pThread); } sDebug("%p TCP pool is closed", pPool); @@ -112,7 +112,7 @@ void taosCloseTcpThreadPool(void *param) { tfree(pPool); } -void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { +void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { struct epoll_event event; SPoolObj *pPool = param; @@ -122,7 +122,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { return NULL; } - SThreadObj *pThread = taosGetTcpThread(pPool); + SThreadObj *pThread = syncGetTcpThread(pPool); if (pThread == NULL) { tfree(pConn); return NULL; @@ -149,7 +149,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { return pConn; } -void taosFreeTcpConn(void *param) { +void syncFreeTcpConn(void *param) { SConnObj * pConn = param; SThreadObj *pThread = pConn->pThread; @@ -175,7 +175,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { #define maxEvents 10 -static void *taosProcessTcpData(void *param) { +static void *syncProcessTcpData(void *param) { SThreadObj *pThread = (SThreadObj *)param; SPoolObj * pPool = pThread->pPool; SPoolInfo * pInfo = &pPool->info; @@ -222,7 +222,7 @@ static void *taosProcessTcpData(void *param) { if (pConn->closedByApp == 0) { if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) { - taosFreeTcpConn(pConn); + syncFreeTcpConn(pConn); continue; } } @@ -239,7 +239,7 @@ static void *taosProcessTcpData(void *param) { return NULL; } -static void *taosAcceptPeerTcpConnection(void *argv) { +static void *syncAcceptPeerTcpConnection(void *argv) { SPoolObj * pPool = (SPoolObj *)argv; SPoolInfo *pInfo = &pPool->info; @@ -268,7 +268,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) { return NULL; } -static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { +static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { SThreadObj *pThread = pPool->pThread[pPool->nextId]; if (pThread) return pThread; @@ -286,7 +286,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread); + int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)syncProcessTcpData, pThread); pthread_attr_destroy(&thattr); if (ret != 0) { @@ -303,7 +303,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { return pThread; } -static void taosStopPoolThread(SThreadObj *pThread) { +static void syncStopPoolThread(SThreadObj *pThread) { pthread_t thread = pThread->thread; if (!taosCheckPthreadValid(thread)) { return;