diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index eaa073348a90ad4e3bb9469dc0705475aa981ae8..8e7f18dc3b5cc4711808346a46118f77b196887d 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -108,8 +108,7 @@ static void syncModuleInitFunc() { tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); } -void *syncStart(const SSyncInfo *pInfo) -{ +void *syncStart(const SSyncInfo *pInfo) { const SSyncCfg *pCfg = &pInfo->syncCfg; SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1); @@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo) return pNode; } -void syncStop(void *param) -{ - SSyncNode *pNode = param; +void syncStop(void *param) { + SSyncNode * pNode = param; SSyncPeer *pPeer; if (pNode == NULL) return; @@ -215,9 +213,8 @@ void syncStop(void *param) syncDecNodeRef(pNode); } -int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) -{ - SSyncNode *pNode = param; +int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { + SSyncNode * pNode = param; int i, j; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; @@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) return 0; } -int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) -{ - SSyncNode *pNode = param; - SSyncPeer *pPeer; +int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { + SSyncNode * pNode = param; + SSyncPeer * pPeer; SSyncHead *pSyncHead; SWalHead *pWalHead = data; int fwdLen; @@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) return code; } -void syncConfirmForward(void *param, uint64_t version, int32_t code) -{ - SSyncNode *pNode = param; +void syncConfirmForward(void *param, uint64_t version, int32_t code) { + SSyncNode *pNode = param; if (pNode == NULL) return; if (pNode->quorum <= 1) return; @@ -387,10 +382,9 @@ void syncRecover(void *param) { pthread_mutex_unlock(&(pNode->mutex)); } -int syncGetNodesRole(void *param, SNodesRole *pNodesRole) -{ +int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { SSyncNode *pNode = param; - + pNodesRole->selfIndex = pNode->selfIndex; for (int i=0; ireplica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; @@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) return 0; } -static void syncAddArbitrator(SSyncNode *pNode) -{ +static void syncAddArbitrator(SSyncNode *pNode) { SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; // if not configured, return right away @@ -454,13 +447,11 @@ static void syncDecNodeRef(SSyncNode *pNode) } } -void syncAddPeerRef(SSyncPeer *pPeer) -{ +void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } -int syncDecPeerRef(SSyncPeer *pPeer) -{ +int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { syncDecNodeRef(pPeer->pSyncNode); @@ -473,18 +464,16 @@ int syncDecPeerRef(SSyncPeer *pPeer) return 1; } -static void syncClosePeerConn(SSyncPeer *pPeer) -{ +static void syncClosePeerConn(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); taosClose(pPeer->syncFd); - if (pPeer->peerFd >=0) { + if (pPeer->peerFd >= 0) { pPeer->peerFd = -1; taosFreeTcpConn(pPeer->pConn); } } -static void syncRemovePeer(SSyncPeer *pPeer) -{ +static void syncRemovePeer(SSyncPeer *pPeer) { sInfo("%s, it is removed", pPeer->id); pPeer->ip = 0; @@ -492,8 +481,7 @@ static void syncRemovePeer(SSyncPeer *pPeer) syncDecPeerRef(pPeer); } -static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) -{ +static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); if (ip == -1) return NULL; @@ -523,25 +511,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) return pPeer; } -void syncBroadcastStatus(SSyncNode *pNode) -{ +void syncBroadcastStatus(SSyncNode *pNode) { SSyncPeer *pPeer; for (int i = 0; i < pNode->replica; ++i) { - if ( i == pNode->selfIndex ) continue; + if (i == pNode->selfIndex) continue; pPeer = pNode->peerInfo[i]; syncSendPeersStatusMsgToPeer(pPeer, 1); } -} +} static void syncResetFlowCtrl(SSyncNode *pNode) { - for (int i = 0; i < pNode->replica; ++i) { pNode->peerInfo[i]->numOfRetrieves = 0; } - if (pNode->notifyFlowCtrl) - (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + if (pNode->notifyFlowCtrl) { + (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + } } static void syncChooseMaster(SSyncNode *pNode) { @@ -598,9 +585,9 @@ static void syncChooseMaster(SSyncNode *pNode) { } else { sDebug("vgId:%d, failed to choose master", pNode->vgId); } -} - -static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { +} + +static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { int onlineNum = 0; int index = -1; int replica = pNode->replica; @@ -617,7 +604,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { replica = pNode->replica + 1; } - if (onlineNum <= replica*0.5) { + if (onlineNum <= replica * 0.5) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; pNode->peerInfo[pNode->selfIndex]->role = nodeRole; @@ -625,13 +612,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); } } else { - for (int i=0; ireplica; ++i) { + for (int i = 0; i < pNode->replica; ++i) { SSyncPeer *pTemp = pNode->peerInfo[i]; - if ( pTemp->role != TAOS_SYNC_ROLE_MASTER ) continue; - if ( index < 0 ) { + if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue; + if (index < 0) { index = i; - } else { // multiple masters, it shall not happen - if ( i == pNode->selfIndex ) { + } else { // multiple masters, it shall not happen + if (i == pNode->selfIndex) { sError("%s, peer is master, work as slave instead", pTemp->id); nodeRole = TAOS_SYNC_ROLE_SLAVE; (*pNode->notifyRole)(pNode->ahandle, nodeRole); @@ -640,7 +627,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { } } - SSyncPeer *pMaster = (index>=0) ? pNode->peerInfo[index]:NULL; + SSyncPeer *pMaster = (index >= 0) ? pNode->peerInfo[index] : NULL; return pMaster; } @@ -649,7 +636,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) { int code = 0; if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { - sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); + sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); nodeRole = TAOS_SYNC_ROLE_UNSYNCED; (*pNode->notifyRole)(pNode->ahandle, nodeRole); code = -1; @@ -658,13 +645,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) { if ( i == pNode->selfIndex ) continue; syncRestartPeer(pNode->peerInfo[i]); } - } + } return code; } -static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) -{ +static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) { SSyncNode *pNode = pPeer->pSyncNode; int8_t peerOldRole = pPeer->role; int8_t selfOldRole = nodeRole; @@ -686,14 +672,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne if (syncValidateMaster(pPeer) < 0) return; if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { - if ( nodeVersion < pMaster->version) { + if (nodeVersion < pMaster->version) { syncRequired = 1; } else { sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version); nodeRole = TAOS_SYNC_ROLE_SLAVE; (*pNode->notifyRole)(pNode->ahandle, nodeRole); } - } else if ( nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { + } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { // nodeVersion = pMaster->version; } } else { @@ -734,20 +720,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) { pPeer->sstatus = TAOS_SYNC_STATUS_INIT; int ret = strcmp(pPeer->fqdn, tsNodeFqdn); - if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort) ) - taosTmrReset(syncCheckPeerConnection, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); + if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); } -void syncRestartConnection(SSyncPeer *pPeer) -{ +void syncRestartConnection(SSyncPeer *pPeer) { if (pPeer->ip == 0) return; syncRestartPeer(pPeer); syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE); } -static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) -{ +static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; sDebug("%s, sync-req is received", pPeer->id); @@ -782,8 +766,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) } } -static void syncNotStarted(void *param, void *tmrId) -{ +static void syncNotStarted(void *param, void *tmrId) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; @@ -803,14 +786,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { pthread_mutex_unlock(&(pNode->mutex)); } -static void syncRecoverFromMaster(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncRecoverFromMaster(SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; - if ( nodeSStatus != TAOS_SYNC_STATUS_INIT) { + if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus); return; - } + } taosTmrStopA(&pPeer->timer); if (tsSyncNum >= tsMaxSyncNum) { @@ -840,9 +822,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) return; } -static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SFwdRsp *pFwdRsp = (SFwdRsp *) cont; SSyncFwds *pSyncFwds = pNode->pSyncFwds; SFwdInfo *pFwdInfo; @@ -862,10 +843,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) } } - -static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SWalHead *pHead = (SWalHead *)cont; sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); @@ -884,9 +863,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) return; } -static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SPeersStatus *pPeersStatus = (SPeersStatus *)cont; sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, @@ -909,10 +887,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { } // head.len = htonl(head.len); - if (pHead->len <0) { + if (pHead->len < 0) { sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len); return -1; - } + } int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); if (bytes != pHead->len) { @@ -923,9 +901,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { return 0; } -static int syncProcessPeerMsg(void *param, void *buffer) -{ - SSyncPeer *pPeer = param; +static int syncProcessPeerMsg(void *param, void *buffer) { + SSyncPeer * pPeer = param; SSyncHead head; char *cont = (char *)buffer; @@ -953,8 +930,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) #define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA -static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) -{ +static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { SSyncNode *pNode = pPeer->pSyncNode; char msg[statusMsgLen] = {0}; @@ -1011,7 +987,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { firstPkt.port = tsSyncPort; firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId - if ( write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { + if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { sDebug("%s, connection to peer server is setup", pPeer->id); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; @@ -1024,8 +1000,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { } } -static void syncCheckPeerConnection(void *param, void *tmrId) -{ +static void syncCheckPeerConnection(void *param, void *tmrId) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; @@ -1037,8 +1012,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) pthread_mutex_unlock(&(pNode->mutex)); } -static void syncCreateRestoreDataThread(SSyncPeer *pPeer) -{ +static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); pthread_attr_t thattr; @@ -1059,8 +1033,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) } } -static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) -{ +static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { char ipstr[24]; int i; @@ -1137,8 +1110,7 @@ static void syncProcessBrokenLink(void *param) { syncDecNodeRef(pNode); } -static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) -{ +static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; uint64_t time = taosGetTimestampMs(); @@ -1160,8 +1132,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); } -static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) -{ +static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; int fwds = pSyncFwds->fwds; @@ -1178,8 +1149,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) } } -static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) -{ +static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) { int confirm = 0; if (pFwdInfo->code == 0) pFwdInfo->code = code; @@ -1200,8 +1170,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } } -static void syncMonitorFwdInfos(void *param, void *tmrId) -{ +static void syncMonitorFwdInfos(void *param, void *tmrId) { SSyncNode *pNode = param; SSyncFwds *pSyncFwds = pNode->pSyncFwds; uint64_t time = taosGetTimestampMs(); @@ -1220,6 +1189,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); } - - - diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 5c9b7343655645b60ff872205c954059c7aa9b4a..2a0bee3726d939cc9da436af36ce02d45dad34d5 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind } } -static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) -{ +static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info @@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) return code; } -static int syncRestoreWal(SSyncPeer *pPeer) -{ +static int syncRestoreWal(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; int ret, code = -1; @@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) return offset; } -static int syncProcessBufferedFwd(SSyncPeer *pPeer) -{ +static int syncProcessBufferedFwd(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; int forwards = 0; @@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) return pRecv->code; } -int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) -{ +int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { SSyncNode *pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; @@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) return pRecv->code; } -static void syncCloseRecvBuffer(SSyncNode *pNode) -{ +static void syncCloseRecvBuffer(SSyncNode *pNode) { if (pNode->pRecv) { taosTFree(pNode->pRecv->buffer); } @@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode) taosTFree(pNode->pRecv); } -static int syncOpenRecvBuffer(SSyncNode *pNode) -{ +static int syncOpenRecvBuffer(SSyncNode *pNode) { syncCloseRecvBuffer(pNode); SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); @@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode) return 0; } -static int syncRestoreDataStepByStep(SSyncPeer *pPeer) -{ +static int syncRestoreDataStepByStep(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; nodeSStatus = TAOS_SYNC_STATUS_FILE; uint64_t fversion = 0; @@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer) return 0; } -void *syncRestoreData(void *param) -{ - SSyncPeer *pPeer = (SSyncPeer *)param; - SSyncNode *pNode = pPeer->pSyncNode; +void *syncRestoreData(void *param) { + SSyncPeer *pPeer = (SSyncPeer *)param; + SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); __sync_fetch_and_add(&tsSyncNum, 1); @@ -326,4 +318,3 @@ void *syncRestoreData(void *param) return NULL; } - diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 0cc3e7326a57accb8ce589a9dd95e6478648a2eb..8aa317b1ac7c770b6b102545716656541b8bb67e 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -27,11 +27,10 @@ #include "tsync.h" #include "syncInt.h" -static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) -{ +static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { sDebug("%s, start to monitor:%s", pPeer->id, name); - if (pPeer->notifyFd <=0) { + if (pPeer->notifyFd <= 0) { pPeer->watchNum = 0; pPeer->notifyFd = inotify_init1(IN_NONBLOCK); if (pPeer->notifyFd < 0) { @@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) return 0; } -static int syncAreFilesModified(SSyncPeer *pPeer) -{ - if (pPeer->notifyFd <=0) return 0; +static int syncAreFilesModified(SSyncPeer *pPeer) { + if (pPeer->notifyFd <= 0) return 0; char buf[2048]; int len = read(pPeer->notifyFd, buf, sizeof(buf)); @@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) } } - return code; + return code; } -static int syncRetrieveFile(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static int syncRetrieveFile(SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SFileInfo fileInfo; SFileAck fileAck; int code = -1; @@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) // wait for the ack from peer ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); - if (ret <0) break; + if (ret < 0) break; // set the peer sync version pPeer->sversion = fileInfo.fversion; @@ -148,11 +145,11 @@ static int syncRetrieveFile(SSyncPeer *pPeer) // send the file to peer int sfd = open(name, O_RDONLY); - if ( sfd < 0 ) break; + if (sfd < 0) break; ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); - close(sfd); - if (ret <0) break; + close(sfd); + if (ret < 0) break; sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size); fileInfo.index++; @@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) /* if only a partial record is read out, set the IN_MODIFY flag in event, so upper layer will reload the file to get a complete record */ -static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) -{ +static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) { int ret; ret = read(sfd, pHead, sizeof(SWalHead)); @@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) } ret = read(sfd, pHead->cont, pHead->len); - if (ret <0) return -1; + if (ret < 0) return -1; if (ret != pHead->len) { // file is not at end yet, it shall be reloaded @@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) } return sizeof(SWalHead) + pHead->len; -} +} -static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) -{ +static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { pPeer->watchNum = 0; taosClose(pPeer->notifyFd); pPeer->notifyFd = inotify_init1(IN_NONBLOCK); @@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) return -1; } - return 0; + return 0; } -static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) -{ +static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { char buf[2048]; int len = read(pPeer->notifyFd, buf, sizeof(buf)); - if (len <0 && errno != EAGAIN) { - sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); + if (len < 0 && errno != EAGAIN) { + sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); return -1; } - + if (len == 0) return 0; struct inotify_event *event; @@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) return 0; } -static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) -{ +static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { SWalHead *pHead = (SWalHead *) malloc(640000); int code = -1; int32_t bytes = 0; @@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); while (1) { - int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); - if (wsize <0) break; - if (wsize == 0) { code = 0; break; } + int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); + if (wsize < 0) break; + if (wsize == 0) { + code = 0; + break; + } sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); @@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, return -1; } -static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) -{ +static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { SSyncNode *pNode = pPeer->pSyncNode; int code = -1; char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file @@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) } if (code < 0) break; - if (pPeer->sversion >= fversion && fversion > 0) break; + if (pPeer->sversion >= fversion && fversion > 0) break; - index++; wname[0] = 0; + index++; + wname[0] = 0; code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); - if ( code < 0) break; - if ( wname[0] == 0 ) {code = 0; break;} + if (code < 0) break; + if (wname[0] == 0) { + code = 0; + break; + } // current last wal is closed, there is a new one sDebug("%s, last wal is closed, try new one", pPeer->id); @@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) return code; } -static int syncRetrieveWal(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static int syncRetrieveWal(SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; char fname[TSDB_FILENAME_LEN * 3]; char wname[TSDB_FILENAME_LEN * 2]; int32_t size; @@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) // send wal file, // inotify is not required, old wal file won't be modified, even remove is ok - if ( stat(fname, &fstat) < 0 ) break; + if (stat(fname, &fstat) < 0) break; size = fstat.st_size; sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); @@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer) return code; } -static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; SFirstPkt firstPkt; memset(&firstPkt, 0, sizeof(firstPkt)); @@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) return 0; } -void *syncRetrieveData(void *param) -{ - SSyncPeer *pPeer = (SSyncPeer *)param; +void *syncRetrieveData(void *param) { + SSyncPeer * pPeer = (SSyncPeer *)param; SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index b523728bf9357e17602c75e3822284c2871e8511..2f064ceb3690e4e8111d67fd2fce224add74717c 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param); static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static void taosStopPoolThread(SThreadObj* pThread); -void *taosOpenTcpThreadPool(SPoolInfo *pInfo) -{ +void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { pthread_attr_t thattr; SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); @@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) return pPool; } -void taosCloseTcpThreadPool(void *param) -{ +void taosCloseTcpThreadPool(void *param) { SPoolObj *pPool = (SPoolObj *)param; SThreadObj *pThread; @@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param) uDebug("%p TCP pool is closed", pPool); } -void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) -{ +void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { struct epoll_event event; SPoolObj *pPool = (SPoolObj *)param; @@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) return pConn; } -void taosFreeTcpConn(void *param) -{ - SConnObj *pConn = (SConnObj *)param; +void taosFreeTcpConn(void *param) { + SConnObj * pConn = (SConnObj *)param; SThreadObj *pThread = pConn->pThread; uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c index 625c0d68386535297ffb1e92e112e1a18d58a48b..d281642a47a21d54f507414b234c6c2f5fbc234d 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/tarbitrator.c @@ -156,9 +156,8 @@ static void arbProcessBrokenLink(void *param) { taosTFree(pNode); } -static int arbProcessPeerMsg(void *param, void *buffer) -{ - SNodeConn *pNode = param; +static int arbProcessPeerMsg(void *param, void *buffer) { + SNodeConn * pNode = param; SSyncHead head; int bytes = 0; char *cont = (char *)buffer; @@ -180,7 +179,6 @@ static int arbProcessPeerMsg(void *param, void *buffer) } static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) { - struct sigaction act = {{0}}; act.sa_handler = SIG_IGN; sigaction(SIGTERM, &act, NULL); @@ -192,4 +190,3 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) // inform main thread to exit tsem_post(&tsArbSem); } - diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 6a94c9c6d6681d967c856019db46ef9ab43e8083..176a45116837eeb9c62be490de1d871659f741ce 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->sync = syncStart(&syncInfo); if (pVnode->sync == NULL) { + vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, + tstrerror(terrno)); vnodeCleanUp(pVnode); return terrno; }