diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index b5c4997337edb7c7ffb4d7dc1c59707e6bdc4955..517a9e9bc83fb454bb4e2c43e31cbd8c90ad91cd 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -113,6 +113,7 @@ static void dnodeCleanupTmr() { int32_t dnodeInitSystem() { dnodeSetRunStatus(TSDB_RUN_STATUS_INITIALIZE); tscEmbedded = 1; + taosIgnSIGPIPE(); taosBlockSIGPIPE(); taosResolveCRC(); taosInitGlobalCfg(); @@ -120,7 +121,6 @@ int32_t dnodeInitSystem() { taosSetCoreDump(); taosInitNotes(); dnodeInitTmr(); - signal(SIGPIPE, SIG_IGN); if (dnodeCreateDir(tsLogDir) < 0) { printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); diff --git a/src/os/inc/osSocket.h b/src/os/inc/osSocket.h index baf7687dd03e1ee4f6dd92e3204244b3d31b7a1f..13d3fa407921ebf93992d3d0da5c8dabc41f5e44 100644 --- a/src/os/inc/osSocket.h +++ b/src/os/inc/osSocket.h @@ -59,6 +59,7 @@ extern "C" { // TAOS_OS_FUNC_SOCKET int32_t taosSetNonblocking(SOCKET sock, int32_t on); +void taosIgnSIGPIPE(); void taosBlockSIGPIPE(); // TAOS_OS_FUNC_SOCKET_SETSOCKETOPT diff --git a/src/os/src/detail/osSocket.c b/src/os/src/detail/osSocket.c index c7c9d774271555ae5989aa18b7e00d325a0eddde..729471247f884977e65d86166cabf06641581e2f 100644 --- a/src/os/src/detail/osSocket.c +++ b/src/os/src/detail/osSocket.c @@ -39,6 +39,10 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { return 0; } +void taosIgnSIGPIPE() { + signal(SIGPIPE, SIG_IGN); +} + void taosBlockSIGPIPE() { sigset_t signal_mask; sigemptyset(&signal_mask); diff --git a/src/os/src/windows/wSocket.c b/src/os/src/windows/wSocket.c index 3b091b269931e644d9f8c2c01ba3c9cb9ddc520c..9697c5e65fa374255c82596ec99d0c8b3e5b9aad 100644 --- a/src/os/src/windows/wSocket.c +++ b/src/os/src/windows/wSocket.c @@ -46,6 +46,7 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { return 0; } +void taosIgnSIGPIPE() {} void taosBlockSIGPIPE() {} int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index ae6a5a4fe4accbc671c2f72744a47da25611f083..47090cfa0cc6997a20c6803b8fc299f08e407bb5 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -38,7 +38,7 @@ extern "C" { #define SYNC_MAX_FWDS 512 #define SYNC_FWD_TIMER 300 #define SYNC_ROLE_TIMER 15000 // ms -#define SYNC_CHECK_INTERVAL 1 // ms +#define SYNC_CHECK_INTERVAL 1000 // ms #define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms #define nodeRole pNode->peerInfo[pNode->selfIndex]->role diff --git a/src/sync/inc/syncTcp.h b/src/sync/inc/syncTcp.h index 7db51f2a7115ccf23ee42d17cbe62f0798adf260..d4674fee6bd07d5ab68284399d9a6502dbd5ec2c 100644 --- a/src/sync/inc/syncTcp.h +++ b/src/sync/inc/syncTcp.h @@ -25,14 +25,14 @@ typedef struct { uint32_t serverIp; int16_t port; int32_t bufferSize; - void (*processBrokenLink)(void *ahandle); - int32_t (*processIncomingMsg)(void *ahandle, void *buffer); + void (*processBrokenLink)(int64_t handleId); + int32_t (*processIncomingMsg)(int64_t handleId, void *buffer); void (*processIncomingConn)(int32_t fd, uint32_t ip); } SPoolInfo; void *syncOpenTcpThreadPool(SPoolInfo *pInfo); void syncCloseTcpThreadPool(void *); -void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd); +void *syncAllocateTcpConn(void *, int64_t rid, int32_t connFd); void syncFreeTcpConn(void *); #ifdef __cplusplus diff --git a/src/sync/src/syncArbitrator.c b/src/sync/src/syncArbitrator.c index 1cb2b8f30269c572bc76ddf9c04491d9b77bb3bc..fed0774346693fbd2d5ef725bfa620433664cee8 100644 --- a/src/sync/src/syncArbitrator.c +++ b/src/sync/src/syncArbitrator.c @@ -29,8 +29,8 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); -static void arbProcessBrokenLink(void *param); -static int32_t arbProcessPeerMsg(void *param, void *buffer); +static void arbProcessBrokenLink(int64_t rid); +static int32_t arbProcessPeerMsg(int64_t rid, void *buffer); static tsem_t tsArbSem; static void * tsArbTcpPool; @@ -138,20 +138,20 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { sDebug("%s, arbitrator request is accepted", pNode->id); pNode->nodeFd = connFd; - pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd); + pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, (int64_t)pNode, connFd); return; } -static void arbProcessBrokenLink(void *param) { - SNodeConn *pNode = param; +static void arbProcessBrokenLink(int64_t rid) { + SNodeConn *pNode = (SNodeConn *)rid; sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno)); tfree(pNode); } -static int32_t arbProcessPeerMsg(void *param, void *buffer) { - SNodeConn *pNode = param; +static int32_t arbProcessPeerMsg(int64_t rid, void *buffer) { + SNodeConn *pNode = (SNodeConn *)rid; SSyncHead head; int32_t bytes = 0; char * cont = (char *)buffer; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index c30fb9092530d6740ab0b3f845ce3ce16a0089bd..e3bd53c0e0ca6fd073a2f13f63651d94b1fb88b8 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -43,8 +43,8 @@ static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncRecoverFromMaster(SSyncPeer *pPeer); static void syncCheckPeerConnection(void *param, void *tmrId); static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); -static void syncProcessBrokenLink(void *param); -static int32_t syncProcessPeerMsg(void *param, void *buffer); +static void syncProcessBrokenLink(int64_t rid); +static int32_t syncProcessPeerMsg(int64_t rid, void *buffer); static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void syncRemovePeer(SSyncPeer *pPeer); static void syncAddArbitrator(SSyncNode *pNode); @@ -543,7 +543,8 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { taosClose(pPeer->syncFd); if (pPeer->peerFd >= 0) { pPeer->peerFd = -1; - syncFreeTcpConn(pPeer->pConn); + void *pConn = pPeer->pConn; + if (pConn != NULL) syncFreeTcpConn(pPeer->pConn); } } @@ -1025,8 +1026,7 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) { return 0; } -static int32_t syncProcessPeerMsg(void *param, void *buffer) { - int64_t rid = (int64_t)param; +static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { SSyncPeer *pPeer = syncAcquirePeer(rid); if (pPeer == NULL) return -1; @@ -1115,7 +1115,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; - pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); + pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer->rid, connFd); } else { sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); taosClose(connFd); @@ -1222,7 +1222,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd); syncClosePeerConn(pPeer); pPeer->peerFd = connFd; - pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); + pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer->rid, connFd); sDebug("%s, ready to exchange data", pPeer->id); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId()); } @@ -1231,8 +1231,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { pthread_mutex_unlock(&pNode->mutex); } -static void syncProcessBrokenLink(void *param) { - int64_t rid = (int64_t)param; +static void syncProcessBrokenLink(int64_t rid) { SSyncPeer *pPeer = syncAcquirePeer(rid); if (pPeer == NULL) return; diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index fa8d486e400b448dc3814a1800f842d254735a93..474466673798ba027518a4bf5c3375492ce9fb60 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -42,7 +42,7 @@ typedef struct SPoolObj { typedef struct { SThreadObj *pThread; - void * ahandle; + int64_t handleId; int32_t fd; int32_t closedByApp; } SConnObj; @@ -112,7 +112,7 @@ void syncCloseTcpThreadPool(void *param) { tfree(pPool); } -void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { +void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) { struct epoll_event event; SPoolObj *pPool = param; @@ -130,7 +130,7 @@ void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { pConn->fd = connFd; pConn->pThread = pThread; - pConn->ahandle = (void *)(((SSyncPeer *)pPeer)->rid); + pConn->handleId = rid; pConn->closedByApp = 0; event.events = EPOLLIN | EPOLLRDHUP; @@ -164,7 +164,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { SPoolInfo * pInfo = &pPool->info; if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR); - (*pInfo->processBrokenLink)(pConn->ahandle); + (*pInfo->processBrokenLink)(pConn->handleId); pThread->numOfFds--; epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); @@ -221,7 +221,7 @@ static void *syncProcessTcpData(void *param) { } if (pConn->closedByApp == 0) { - if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) { + if ((*pInfo->processIncomingMsg)(pConn->handleId, buffer) < 0) { syncFreeTcpConn(pConn); continue; } diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 2937124bebb32a2a48d317ec2eae2b2aa08defd3..99501a08eae8e4a8bea754e188833debd92673e0 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -18,6 +18,10 @@ #include "tsocket.h" #include "taoserror.h" +#ifndef SIGPIPE + #define SIGPIPE EPIPE +#endif + int32_t taosGetFqdn(char *fqdn) { char hostname[1024]; hostname[1023] = '\0';