未验证 提交 53480b07 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #7005 from taosdata/fix/TD-5512

[TD-5512]<fix>: fix arbitrator offline caused by vnode destroying
...@@ -25,7 +25,7 @@ typedef struct { ...@@ -25,7 +25,7 @@ typedef struct {
uint32_t serverIp; uint32_t serverIp;
int16_t port; int16_t port;
int32_t bufferSize; int32_t bufferSize;
void (*processBrokenLink)(int64_t handleId); void (*processBrokenLink)(int64_t handleId, int32_t closedByApp);
int32_t (*processIncomingMsg)(int64_t handleId, void *buffer); int32_t (*processIncomingMsg)(int64_t handleId, void *buffer);
void (*processIncomingConn)(SOCKET fd, uint32_t ip); void (*processIncomingConn)(SOCKET fd, uint32_t ip);
} SPoolInfo; } SPoolInfo;
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
extern void syncProcessTestMsg(SSyncMsg *pMsg, SOCKET connFd); extern void syncProcessTestMsg(SSyncMsg *pMsg, SOCKET connFd);
static void arbSignalHandler(int32_t signum, void *sigInfo, void *context); static void arbSignalHandler(int32_t signum, void *sigInfo, void *context);
static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp); static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(int64_t rid); static void arbProcessBrokenLink(int64_t rid, int32_t closedByApp);
static int32_t arbProcessPeerMsg(int64_t rid, void *buffer); static int32_t arbProcessPeerMsg(int64_t rid, void *buffer);
static tsem_t tsArbSem; static tsem_t tsArbSem;
static void * tsArbTcpPool; static void * tsArbTcpPool;
...@@ -147,10 +147,10 @@ static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { ...@@ -147,10 +147,10 @@ static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
return; return;
} }
static void arbProcessBrokenLink(int64_t rid) { static void arbProcessBrokenLink(int64_t rid, int32_t closedByApp) {
SNodeConn *pNode = (SNodeConn *)rid; SNodeConn *pNode = (SNodeConn *)rid;
sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno)); sDebug("%s, TCP link is broken since %s, closedByApp:%d", pNode->id, strerror(errno), closedByApp);
tfree(pNode); tfree(pNode);
} }
......
...@@ -43,7 +43,7 @@ static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); ...@@ -43,7 +43,7 @@ static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
static void syncRecoverFromMaster(SSyncPeer *pPeer); static void syncRecoverFromMaster(SSyncPeer *pPeer);
static void syncCheckPeerConnection(void *param, void *tmrId); static void syncCheckPeerConnection(void *param, void *tmrId);
static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId);
static void syncProcessBrokenLink(int64_t rid); static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp);
static int32_t syncProcessPeerMsg(int64_t rid, void *buffer); static int32_t syncProcessPeerMsg(int64_t rid, void *buffer);
static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp); static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
static void syncRemovePeer(SSyncPeer *pPeer); static void syncRemovePeer(SSyncPeer *pPeer);
...@@ -1308,7 +1308,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { ...@@ -1308,7 +1308,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
} }
static void syncProcessBrokenLink(int64_t rid) { static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
SSyncPeer *pPeer = syncAcquirePeer(rid); SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return; if (pPeer == NULL) return;
...@@ -1316,9 +1316,10 @@ static void syncProcessBrokenLink(int64_t rid) { ...@@ -1316,9 +1316,10 @@ static void syncProcessBrokenLink(int64_t rid) {
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd); sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d",
pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd, closedByApp);
pPeer->peerFd = -1; pPeer->peerFd = -1;
if (pPeer->isArb) { if (!closedByApp && pPeer->isArb) {
tsArbOnline = 0; tsArbOnline = 0;
} }
......
...@@ -177,7 +177,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { ...@@ -177,7 +177,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
SPoolInfo * pInfo = &pPool->info; SPoolInfo * pInfo = &pPool->info;
if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR); if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
(*pInfo->processBrokenLink)(pConn->handleId); (*pInfo->processBrokenLink)(pConn->handleId, pConn->closedByApp);
pThread->numOfFds--; pThread->numOfFds--;
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册