From 24c5f02cc9df41c8643fa66ab38837b36f215ff9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 27 Sep 2020 13:47:58 +0000 Subject: [PATCH] TD-1617 --- src/sync/inc/syncInt.h | 40 ++++++++++++++--------------- src/sync/src/syncMain.c | 3 +-- src/sync/src/tarbitrator.c | 52 +++++++++++++++++++------------------- 3 files changed, 47 insertions(+), 48 deletions(-) diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index cd1252f4b4..94df664219 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -114,26 +114,26 @@ typedef struct { } SSyncFwds; typedef struct SsyncPeer { - int32_t nodeId; - uint32_t ip; - uint16_t port; - char fqdn[TSDB_FQDN_LEN]; // peer ip string - char id[TSDB_EP_LEN+16]; // peer vgId + end point - int8_t role; - int8_t sstatus; // sync status - uint64_t version; - uint64_t sversion; // track the peer version in retrieve process - int syncFd; - int peerFd; // forward FD - int numOfRetrieves; // number of retrieves tried - int fileChanged; // a flag to indicate file is changed during retrieving process - void *timer; - void *pConn; - int notifyFd; - int watchNum; - int *watchFd; - int8_t refCount; // reference count - struct SSyncNode *pSyncNode; + int32_t nodeId; + uint32_t ip; + uint16_t port; + char fqdn[TSDB_FQDN_LEN]; // peer ip string + char id[TSDB_EP_LEN + 32]; // peer vgId + end point + int8_t role; + int8_t sstatus; // sync status + uint64_t version; + uint64_t sversion; // track the peer version in retrieve process + int syncFd; + int peerFd; // forward FD + int numOfRetrieves; // number of retrieves tried + int fileChanged; // a flag to indicate file is changed during retrieving process + void * timer; + void * pConn; + int notifyFd; + int watchNum; + int * watchFd; + int8_t refCount; // reference count + struct SSyncNode *pSyncNode; } SSyncPeer; typedef struct SSyncNode { diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 9b73558ba8..b8300dc75e 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -671,7 +671,6 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne int8_t selfOldRole = nodeRole; int8_t i, syncRequired = 0; - pNode->peerInfo[pNode->selfIndex]->version = nodeVersion; pPeer->role = newRole; sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]); @@ -923,7 +922,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { static int syncProcessPeerMsg(void *param, void *buffer) { SSyncPeer *pPeer = param; SSyncHead head; - char * cont = (char *)buffer; + char * cont = buffer; SSyncNode *pNode = pPeer->pSyncNode; pthread_mutex_lock(&(pNode->mutex)); diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c index b704b1ecae..360ea93f6c 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/tarbitrator.c @@ -27,29 +27,29 @@ #include "tsync.h" #include "syncInt.h" -static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); -static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp); -static void arbProcessBrokenLink(void *param); -static int arbProcessPeerMsg(void *param, void *buffer); +static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); +static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp); +static void arbProcessBrokenLink(void *param); +static int arbProcessPeerMsg(void *param, void *buffer); static tsem_t tsArbSem; static ttpool_h tsArbTcpPool; typedef struct { - char id[TSDB_EP_LEN+24]; + char id[TSDB_EP_LEN + 24]; int nodeFd; void *pConn; } SNodeConn; int main(int argc, char *argv[]) { - char arbLogPath[TSDB_FILENAME_LEN + 16] = {0}; + char arbLogPath[TSDB_FILENAME_LEN + 16] = {0}; - for (int i=1; i TSDB_FILENAME_LEN) continue; + } else if (strcmp(argv[i], "-g") == 0 && i < argc - 1) { + if (strlen(argv[++i]) > TSDB_FILENAME_LEN) continue; tstrncpy(arbLogPath, argv[i], sizeof(arbLogPath)); } else { printf("\nusage: %s [options] \n", argv[0]); @@ -62,8 +62,8 @@ int main(int argc, char *argv[]) { } sDebugFlag = debugFlag; - - if (tsem_init(&tsArbSem, 0, 0) != 0) { + + if (tsem_init(&tsArbSem, 0, 0) != 0) { printf("failed to create exit semphore\n"); exit(EXIT_FAILURE); } @@ -91,10 +91,10 @@ int main(int argc, char *argv[]) { info.processIncomingMsg = arbProcessPeerMsg; info.processIncomingConn = arbProcessIncommingConnection; tsArbTcpPool = taosOpenTcpThreadPool(&info); - + if (tsArbTcpPool == NULL) { - sDebug("failed to open TCP thread pool, exit..."); - return -1; + sDebug("failed to open TCP thread pool, exit..."); + return -1; } sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsArbitratorPort); @@ -108,9 +108,8 @@ int main(int argc, char *argv[]) { return 0; } -static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp) -{ - char ipstr[24]; +static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp) { + char ipstr[24]; tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); @@ -121,15 +120,16 @@ static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp) return; } - SNodeConn *pNode = (SNodeConn *) calloc(sizeof(SNodeConn), 1); + SNodeConn *pNode = (SNodeConn *)calloc(sizeof(SNodeConn), 1); if (pNode == NULL) { sError("failed to allocate memory(%s)", strerror(errno)); taosCloseSocket(connFd); return; } - snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); - if (firstPkt.syncHead.vgId) { + firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0; + snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); + if (firstPkt.syncHead.vgId) { sDebug("%s, vgId in head is not zero, close the connection", pNode->id); taosTFree(pNode); taosCloseSocket(connFd); @@ -151,10 +151,10 @@ static void arbProcessBrokenLink(void *param) { } static int arbProcessPeerMsg(void *param, void *buffer) { - SNodeConn * pNode = param; - SSyncHead head; - int bytes = 0; - char *cont = (char *)buffer; + SNodeConn *pNode = param; + SSyncHead head; + int bytes = 0; + char * cont = (char *)buffer; int hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head)); if (hlen != sizeof(head)) { -- GitLab