提交 966a5e18 编写于 作者: S Shengliang Guan

rename some files

上级 7c1c99d3
...@@ -5,12 +5,12 @@ INCLUDE_DIRECTORIES(inc) ...@@ -5,12 +5,12 @@ INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX) IF (TD_LINUX)
LIST(REMOVE_ITEM SRC src/tarbitrator.c) LIST(REMOVE_ITEM SRC src/syncArbitrator.c)
ADD_LIBRARY(sync ${SRC}) ADD_LIBRARY(sync ${SRC})
TARGET_LINK_LIBRARIES(sync tutil pthread common) TARGET_LINK_LIBRARIES(sync tutil pthread common)
LIST(APPEND BIN_SRC src/tarbitrator.c) LIST(APPEND BIN_SRC src/syncArbitrator.c)
LIST(APPEND BIN_SRC src/taosTcpPool.c) LIST(APPEND BIN_SRC src/syncTcp.c)
ADD_EXECUTABLE(tarbitrator ${BIN_SRC}) ADD_EXECUTABLE(tarbitrator ${BIN_SRC})
TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil) TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil)
......
...@@ -13,16 +13,13 @@ ...@@ -13,16 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_TCP_POOL_H #ifndef TDENGINE_SYNC_TCP_POOL_H
#define TDENGINE_TCP_POOL_H #define TDENGINE_SYNC_TCP_POOL_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef void *ttpool_h;
typedef void *tthread_h;
typedef struct { typedef struct {
int32_t numOfThreads; int32_t numOfThreads;
uint32_t serverIp; uint32_t serverIp;
...@@ -33,10 +30,10 @@ typedef struct { ...@@ -33,10 +30,10 @@ typedef struct {
void (*processIncomingConn)(int32_t fd, uint32_t ip); void (*processIncomingConn)(int32_t fd, uint32_t ip);
} SPoolInfo; } SPoolInfo;
ttpool_h taosOpenTcpThreadPool(SPoolInfo *pInfo); void *syncOpenTcpThreadPool(SPoolInfo *pInfo);
void taosCloseTcpThreadPool(ttpool_h); void syncCloseTcpThreadPool(void *);
void * taosAllocateTcpConn(void *, void *ahandle, int32_t connFd); void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd);
void taosFreeTcpConn(void *); void syncFreeTcpConn(void *);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,17 +22,17 @@ ...@@ -22,17 +22,17 @@
#include "tsocket.h" #include "tsocket.h"
#include "tglobal.h" #include "tglobal.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h"
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(void *param); static void arbProcessBrokenLink(void *param);
static int32_t arbProcessPeerMsg(void *param, void *buffer); static int32_t arbProcessPeerMsg(void *param, void *buffer);
static tsem_t tsArbSem; static tsem_t tsArbSem;
static ttpool_h tsArbTcpPool; static void * tsArbTcpPool;
typedef struct { typedef struct {
char id[TSDB_EP_LEN + 24]; char id[TSDB_EP_LEN + 24];
...@@ -90,7 +90,7 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -90,7 +90,7 @@ int32_t main(int32_t argc, char *argv[]) {
info.processBrokenLink = arbProcessBrokenLink; info.processBrokenLink = arbProcessBrokenLink;
info.processIncomingMsg = arbProcessPeerMsg; info.processIncomingMsg = arbProcessPeerMsg;
info.processIncomingConn = arbProcessIncommingConnection; info.processIncomingConn = arbProcessIncommingConnection;
tsArbTcpPool = taosOpenTcpThreadPool(&info); tsArbTcpPool = syncOpenTcpThreadPool(&info);
if (tsArbTcpPool == NULL) { if (tsArbTcpPool == NULL) {
sDebug("failed to open TCP thread pool, exit..."); sDebug("failed to open TCP thread pool, exit...");
...@@ -101,8 +101,8 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -101,8 +101,8 @@ int32_t main(int32_t argc, char *argv[]) {
tsem_wait(&tsArbSem); tsem_wait(&tsArbSem);
taosCloseTcpThreadPool(tsArbTcpPool); syncCloseTcpThreadPool(tsArbTcpPool);
sInfo("TAOS arbitrator is shut down\n"); sInfo("TAOS arbitrator is shut down");
closelog(); closelog();
return 0; return 0;
...@@ -138,7 +138,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -138,7 +138,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
sDebug("%s, arbitrator request is accepted", pNode->id); sDebug("%s, arbitrator request is accepted", pNode->id);
pNode->nodeFd = connFd; pNode->nodeFd = connFd;
pNode->pConn = taosAllocateTcpConn(tsArbTcpPool, pNode, connFd); pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd);
return; return;
} }
......
...@@ -23,10 +23,10 @@ ...@@ -23,10 +23,10 @@
#include "tsocket.h" #include "tsocket.h"
#include "tglobal.h" #include "tglobal.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "syncTcp.h"
#include "syncInt.h" #include "syncInt.h"
// global configurable // global configurable
...@@ -39,10 +39,10 @@ int32_t tsSyncTimer = 1; ...@@ -39,10 +39,10 @@ int32_t tsSyncTimer = 1;
int32_t tsSyncNum; // number of sync in process in whole system int32_t tsSyncNum; // number of sync in process in whole system
char tsNodeFqdn[TSDB_FQDN_LEN]; char tsNodeFqdn[TSDB_FQDN_LEN];
static ttpool_h tsTcpPool; static void * tsTcpPool;
static void * tsSyncTmrCtrl = NULL; static void * tsSyncTmrCtrl = NULL;
static void * tsVgIdHash; static void * tsVgIdHash;
static int32_t tsSyncRefId = -1; static int32_t tsSyncRefId = -1;
// local functions // local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
...@@ -117,7 +117,7 @@ int32_t syncInit() { ...@@ -117,7 +117,7 @@ int32_t syncInit() {
info.processIncomingMsg = syncProcessPeerMsg; info.processIncomingMsg = syncProcessPeerMsg;
info.processIncomingConn = syncProcessIncommingConnection; info.processIncomingConn = syncProcessIncommingConnection;
tsTcpPool = taosOpenTcpThreadPool(&info); tsTcpPool = syncOpenTcpThreadPool(&info);
if (tsTcpPool == NULL) { if (tsTcpPool == NULL) {
sError("failed to init tcpPool"); sError("failed to init tcpPool");
return -1; return -1;
...@@ -126,7 +126,7 @@ int32_t syncInit() { ...@@ -126,7 +126,7 @@ int32_t syncInit() {
tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
if (tsSyncTmrCtrl == NULL) { if (tsSyncTmrCtrl == NULL) {
sError("failed to init tmrCtrl"); sError("failed to init tmrCtrl");
taosCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
return -1; return -1;
} }
...@@ -135,7 +135,7 @@ int32_t syncInit() { ...@@ -135,7 +135,7 @@ int32_t syncInit() {
if (tsVgIdHash == NULL) { if (tsVgIdHash == NULL) {
sError("failed to init tsVgIdHash"); sError("failed to init tsVgIdHash");
taosTmrCleanUp(tsSyncTmrCtrl); taosTmrCleanUp(tsSyncTmrCtrl);
taosCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
tsSyncTmrCtrl = NULL; tsSyncTmrCtrl = NULL;
return -1; return -1;
...@@ -155,7 +155,7 @@ int32_t syncInit() { ...@@ -155,7 +155,7 @@ int32_t syncInit() {
void syncCleanUp() { void syncCleanUp() {
if (tsTcpPool) { if (tsTcpPool) {
taosCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
} }
...@@ -509,7 +509,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { ...@@ -509,7 +509,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1; pPeer->peerFd = -1;
taosFreeTcpConn(pPeer->pConn); syncFreeTcpConn(pPeer->pConn);
} }
} }
...@@ -1065,7 +1065,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1065,7 +1065,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId); sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer); syncAddPeerRef(pPeer);
} else { } else {
sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
...@@ -1159,7 +1159,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1159,7 +1159,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd); sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd);
syncClosePeerConn(pPeer); syncClosePeerConn(pPeer);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer); syncAddPeerRef(pPeer);
sDebug("%s, ready to exchange data", pPeer->id); sDebug("%s, ready to exchange data", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId()); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
......
...@@ -19,10 +19,10 @@ ...@@ -19,10 +19,10 @@
#include "tutil.h" #include "tutil.h"
#include "tsocket.h" #include "tsocket.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h"
typedef struct SThreadObj { typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
...@@ -47,12 +47,12 @@ typedef struct { ...@@ -47,12 +47,12 @@ typedef struct {
int32_t closedByApp; int32_t closedByApp;
} SConnObj; } SConnObj;
static void *taosAcceptPeerTcpConnection(void *argv); static void *syncAcceptPeerTcpConnection(void *argv);
static void *taosProcessTcpData(void *param); static void *syncProcessTcpData(void *param);
static void taosStopPoolThread(SThreadObj *pThread); static void syncStopPoolThread(SThreadObj *pThread);
static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static SThreadObj *syncGetTcpThread(SPoolObj *pPool);
void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { void *syncOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_t thattr; pthread_attr_t thattr;
SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
...@@ -80,7 +80,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { ...@@ -80,7 +80,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) { if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) {
sError("failed to create accept thread for TCP server since %s", strerror(errno)); sError("failed to create accept thread for TCP server since %s", strerror(errno));
close(pPool->acceptFd); close(pPool->acceptFd);
tfree(pPool->pThread); tfree(pPool->pThread);
...@@ -94,7 +94,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { ...@@ -94,7 +94,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
return pPool; return pPool;
} }
void taosCloseTcpThreadPool(void *param) { void syncCloseTcpThreadPool(void *param) {
SPoolObj * pPool = param; SPoolObj * pPool = param;
SThreadObj *pThread; SThreadObj *pThread;
...@@ -103,7 +103,7 @@ void taosCloseTcpThreadPool(void *param) { ...@@ -103,7 +103,7 @@ void taosCloseTcpThreadPool(void *param) {
for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) { for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) {
pThread = pPool->pThread[i]; pThread = pPool->pThread[i];
if (pThread) taosStopPoolThread(pThread); if (pThread) syncStopPoolThread(pThread);
} }
sDebug("%p TCP pool is closed", pPool); sDebug("%p TCP pool is closed", pPool);
...@@ -112,7 +112,7 @@ void taosCloseTcpThreadPool(void *param) { ...@@ -112,7 +112,7 @@ void taosCloseTcpThreadPool(void *param) {
tfree(pPool); tfree(pPool);
} }
void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = param; SPoolObj *pPool = param;
...@@ -122,7 +122,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { ...@@ -122,7 +122,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
return NULL; return NULL;
} }
SThreadObj *pThread = taosGetTcpThread(pPool); SThreadObj *pThread = syncGetTcpThread(pPool);
if (pThread == NULL) { if (pThread == NULL) {
tfree(pConn); tfree(pConn);
return NULL; return NULL;
...@@ -149,7 +149,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { ...@@ -149,7 +149,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
return pConn; return pConn;
} }
void taosFreeTcpConn(void *param) { void syncFreeTcpConn(void *param) {
SConnObj * pConn = param; SConnObj * pConn = param;
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
...@@ -175,7 +175,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { ...@@ -175,7 +175,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
#define maxEvents 10 #define maxEvents 10
static void *taosProcessTcpData(void *param) { static void *syncProcessTcpData(void *param) {
SThreadObj *pThread = (SThreadObj *)param; SThreadObj *pThread = (SThreadObj *)param;
SPoolObj * pPool = pThread->pPool; SPoolObj * pPool = pThread->pPool;
SPoolInfo * pInfo = &pPool->info; SPoolInfo * pInfo = &pPool->info;
...@@ -222,7 +222,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -222,7 +222,7 @@ static void *taosProcessTcpData(void *param) {
if (pConn->closedByApp == 0) { if (pConn->closedByApp == 0) {
if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) { if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) {
taosFreeTcpConn(pConn); syncFreeTcpConn(pConn);
continue; continue;
} }
} }
...@@ -239,7 +239,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -239,7 +239,7 @@ static void *taosProcessTcpData(void *param) {
return NULL; return NULL;
} }
static void *taosAcceptPeerTcpConnection(void *argv) { static void *syncAcceptPeerTcpConnection(void *argv) {
SPoolObj * pPool = (SPoolObj *)argv; SPoolObj * pPool = (SPoolObj *)argv;
SPoolInfo *pInfo = &pPool->info; SPoolInfo *pInfo = &pPool->info;
...@@ -268,7 +268,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) { ...@@ -268,7 +268,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) {
return NULL; return NULL;
} }
static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
SThreadObj *pThread = pPool->pThread[pPool->nextId]; SThreadObj *pThread = pPool->pThread[pPool->nextId];
if (pThread) return pThread; if (pThread) return pThread;
...@@ -286,7 +286,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { ...@@ -286,7 +286,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread); int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)syncProcessTcpData, pThread);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret != 0) { if (ret != 0) {
...@@ -303,7 +303,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { ...@@ -303,7 +303,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
return pThread; return pThread;
} }
static void taosStopPoolThread(SThreadObj *pThread) { static void syncStopPoolThread(SThreadObj *pThread) {
pthread_t thread = pThread->thread; pthread_t thread = pThread->thread;
if (!taosCheckPthreadValid(thread)) { if (!taosCheckPthreadValid(thread)) {
return; return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册