提交 833d65f6 编写于 作者: S Shengliang Guan

TD-2524

上级 8afffbad
...@@ -113,6 +113,7 @@ static void dnodeCleanupTmr() { ...@@ -113,6 +113,7 @@ static void dnodeCleanupTmr() {
int32_t dnodeInitSystem() { int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_RUN_STATUS_INITIALIZE); dnodeSetRunStatus(TSDB_RUN_STATUS_INITIALIZE);
tscEmbedded = 1; tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE(); taosBlockSIGPIPE();
taosResolveCRC(); taosResolveCRC();
taosInitGlobalCfg(); taosInitGlobalCfg();
...@@ -120,7 +121,6 @@ int32_t dnodeInitSystem() { ...@@ -120,7 +121,6 @@ int32_t dnodeInitSystem() {
taosSetCoreDump(); taosSetCoreDump();
taosInitNotes(); taosInitNotes();
dnodeInitTmr(); dnodeInitTmr();
signal(SIGPIPE, SIG_IGN);
if (dnodeCreateDir(tsLogDir) < 0) { if (dnodeCreateDir(tsLogDir) < 0) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
......
...@@ -59,6 +59,7 @@ extern "C" { ...@@ -59,6 +59,7 @@ extern "C" {
// TAOS_OS_FUNC_SOCKET // TAOS_OS_FUNC_SOCKET
int32_t taosSetNonblocking(SOCKET sock, int32_t on); int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosIgnSIGPIPE();
void taosBlockSIGPIPE(); void taosBlockSIGPIPE();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT // TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
......
...@@ -39,6 +39,10 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { ...@@ -39,6 +39,10 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
return 0; return 0;
} }
void taosIgnSIGPIPE() {
signal(SIGPIPE, SIG_IGN);
}
void taosBlockSIGPIPE() { void taosBlockSIGPIPE() {
sigset_t signal_mask; sigset_t signal_mask;
sigemptyset(&signal_mask); sigemptyset(&signal_mask);
......
...@@ -46,6 +46,7 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { ...@@ -46,6 +46,7 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
return 0; return 0;
} }
void taosIgnSIGPIPE() {}
void taosBlockSIGPIPE() {} void taosBlockSIGPIPE() {}
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
......
...@@ -38,7 +38,7 @@ extern "C" { ...@@ -38,7 +38,7 @@ extern "C" {
#define SYNC_MAX_FWDS 512 #define SYNC_MAX_FWDS 512
#define SYNC_FWD_TIMER 300 #define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 15000 // ms #define SYNC_ROLE_TIMER 15000 // ms
#define SYNC_CHECK_INTERVAL 1 // ms #define SYNC_CHECK_INTERVAL 1000 // ms
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms #define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
......
...@@ -25,14 +25,14 @@ typedef struct { ...@@ -25,14 +25,14 @@ typedef struct {
uint32_t serverIp; uint32_t serverIp;
int16_t port; int16_t port;
int32_t bufferSize; int32_t bufferSize;
void (*processBrokenLink)(void *ahandle); void (*processBrokenLink)(int64_t handleId);
int32_t (*processIncomingMsg)(void *ahandle, void *buffer); int32_t (*processIncomingMsg)(int64_t handleId, void *buffer);
void (*processIncomingConn)(int32_t fd, uint32_t ip); void (*processIncomingConn)(int32_t fd, uint32_t ip);
} SPoolInfo; } SPoolInfo;
void *syncOpenTcpThreadPool(SPoolInfo *pInfo); void *syncOpenTcpThreadPool(SPoolInfo *pInfo);
void syncCloseTcpThreadPool(void *); void syncCloseTcpThreadPool(void *);
void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd); void *syncAllocateTcpConn(void *, int64_t rid, int32_t connFd);
void syncFreeTcpConn(void *); void syncFreeTcpConn(void *);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -29,8 +29,8 @@ ...@@ -29,8 +29,8 @@
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(void *param); static void arbProcessBrokenLink(int64_t rid);
static int32_t arbProcessPeerMsg(void *param, void *buffer); static int32_t arbProcessPeerMsg(int64_t rid, void *buffer);
static tsem_t tsArbSem; static tsem_t tsArbSem;
static void * tsArbTcpPool; static void * tsArbTcpPool;
...@@ -138,20 +138,20 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -138,20 +138,20 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
sDebug("%s, arbitrator request is accepted", pNode->id); sDebug("%s, arbitrator request is accepted", pNode->id);
pNode->nodeFd = connFd; pNode->nodeFd = connFd;
pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd); pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, (int64_t)pNode, connFd);
return; return;
} }
static void arbProcessBrokenLink(void *param) { static void arbProcessBrokenLink(int64_t rid) {
SNodeConn *pNode = param; SNodeConn *pNode = (SNodeConn *)rid;
sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno)); sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno));
tfree(pNode); tfree(pNode);
} }
static int32_t arbProcessPeerMsg(void *param, void *buffer) { static int32_t arbProcessPeerMsg(int64_t rid, void *buffer) {
SNodeConn *pNode = param; SNodeConn *pNode = (SNodeConn *)rid;
SSyncHead head; SSyncHead head;
int32_t bytes = 0; int32_t bytes = 0;
char * cont = (char *)buffer; char * cont = (char *)buffer;
......
...@@ -43,8 +43,8 @@ static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); ...@@ -43,8 +43,8 @@ static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
static void syncRecoverFromMaster(SSyncPeer *pPeer); static void syncRecoverFromMaster(SSyncPeer *pPeer);
static void syncCheckPeerConnection(void *param, void *tmrId); static void syncCheckPeerConnection(void *param, void *tmrId);
static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId);
static void syncProcessBrokenLink(void *param); static void syncProcessBrokenLink(int64_t rid);
static int32_t syncProcessPeerMsg(void *param, void *buffer); static int32_t syncProcessPeerMsg(int64_t rid, void *buffer);
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void syncRemovePeer(SSyncPeer *pPeer); static void syncRemovePeer(SSyncPeer *pPeer);
static void syncAddArbitrator(SSyncNode *pNode); static void syncAddArbitrator(SSyncNode *pNode);
...@@ -543,7 +543,8 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { ...@@ -543,7 +543,8 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1; pPeer->peerFd = -1;
syncFreeTcpConn(pPeer->pConn); void *pConn = pPeer->pConn;
if (pConn != NULL) syncFreeTcpConn(pPeer->pConn);
} }
} }
...@@ -1025,8 +1026,7 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) { ...@@ -1025,8 +1026,7 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) {
return 0; return 0;
} }
static int32_t syncProcessPeerMsg(void *param, void *buffer) { static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid); SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return -1; if (pPeer == NULL) return -1;
...@@ -1115,7 +1115,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1115,7 +1115,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId); sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer->rid, connFd);
} else { } else {
sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
taosClose(connFd); taosClose(connFd);
...@@ -1222,7 +1222,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1222,7 +1222,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd); sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd);
syncClosePeerConn(pPeer); syncClosePeerConn(pPeer);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer->rid, connFd);
sDebug("%s, ready to exchange data", pPeer->id); sDebug("%s, ready to exchange data", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId()); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
} }
...@@ -1231,8 +1231,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1231,8 +1231,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
} }
static void syncProcessBrokenLink(void *param) { static void syncProcessBrokenLink(int64_t rid) {
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid); SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return; if (pPeer == NULL) return;
......
...@@ -42,7 +42,7 @@ typedef struct SPoolObj { ...@@ -42,7 +42,7 @@ typedef struct SPoolObj {
typedef struct { typedef struct {
SThreadObj *pThread; SThreadObj *pThread;
void * ahandle; int64_t handleId;
int32_t fd; int32_t fd;
int32_t closedByApp; int32_t closedByApp;
} SConnObj; } SConnObj;
...@@ -112,7 +112,7 @@ void syncCloseTcpThreadPool(void *param) { ...@@ -112,7 +112,7 @@ void syncCloseTcpThreadPool(void *param) {
tfree(pPool); tfree(pPool);
} }
void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) {
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = param; SPoolObj *pPool = param;
...@@ -130,7 +130,7 @@ void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { ...@@ -130,7 +130,7 @@ void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
pConn->fd = connFd; pConn->fd = connFd;
pConn->pThread = pThread; pConn->pThread = pThread;
pConn->ahandle = (void *)(((SSyncPeer *)pPeer)->rid); pConn->handleId = rid;
pConn->closedByApp = 0; pConn->closedByApp = 0;
event.events = EPOLLIN | EPOLLRDHUP; event.events = EPOLLIN | EPOLLRDHUP;
...@@ -164,7 +164,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { ...@@ -164,7 +164,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
SPoolInfo * pInfo = &pPool->info; SPoolInfo * pInfo = &pPool->info;
if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR); if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
(*pInfo->processBrokenLink)(pConn->ahandle); (*pInfo->processBrokenLink)(pConn->handleId);
pThread->numOfFds--; pThread->numOfFds--;
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
...@@ -221,7 +221,7 @@ static void *syncProcessTcpData(void *param) { ...@@ -221,7 +221,7 @@ static void *syncProcessTcpData(void *param) {
} }
if (pConn->closedByApp == 0) { if (pConn->closedByApp == 0) {
if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) { if ((*pInfo->processIncomingMsg)(pConn->handleId, buffer) < 0) {
syncFreeTcpConn(pConn); syncFreeTcpConn(pConn);
continue; continue;
} }
......
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
#include "tsocket.h" #include "tsocket.h"
#include "taoserror.h" #include "taoserror.h"
#ifndef SIGPIPE
#define SIGPIPE EPIPE
#endif
int32_t taosGetFqdn(char *fqdn) { int32_t taosGetFqdn(char *fqdn) {
char hostname[1024]; char hostname[1024];
hostname[1023] = '\0'; hostname[1023] = '\0';
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册