diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index b9049996788884002d30c9f7edaf039e513576e0..861c3366f662dc5b915867ff7087d46c2233121c 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -30,21 +30,19 @@ #include "syncInt.h" // global configurable -int tsMaxSyncNum = 2; -int tsSyncTcpThreads = 2; -int tsMaxWatchFiles = 500; -int tsMaxFwdInfo = 200; -int tsSyncTimer = 1; -//int sDebugFlag = 135; -//char tsArbitrator[TSDB_FQDN_LEN] = {0}; +int tsMaxSyncNum = 2; +int tsSyncTcpThreads = 2; +int tsMaxWatchFiles = 500; +int tsMaxFwdInfo = 200; +int tsSyncTimer = 1; // module global, not configurable -int tsSyncNum; // number of sync in process in whole system -char tsNodeFqdn[TSDB_FQDN_LEN]; +int tsSyncNum; // number of sync in process in whole system +char tsNodeFqdn[TSDB_FQDN_LEN]; -static ttpool_h tsTcpPool; -static void *syncTmrCtrl = NULL; -static void *vgIdHash; +static ttpool_h tsTcpPool; +static void * syncTmrCtrl = NULL; +static void * vgIdHash; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -127,7 +125,7 @@ void syncCleanUp() { if (vgIdHash) { taosHashCleanup(vgIdHash); - vgIdHash = NULL; + vgIdHash = NULL; } sInfo("sync module is cleaned up"); @@ -154,7 +152,7 @@ void *syncStart(const SSyncInfo *pInfo) { pNode->confirmForward = pInfo->confirmForward; pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl; pNode->notifyFileSynced = pInfo->notifyFileSynced; - + pNode->selfIndex = -1; pNode->vgId = pInfo->vgId; pNode->replica = pCfg->replica; @@ -164,8 +162,9 @@ void *syncStart(const SSyncInfo *pInfo) { for (int i = 0; i < pCfg->replica; ++i) { const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); - if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) + if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) { pNode->selfIndex = i; + } } if (pNode->selfIndex < 0) { @@ -197,16 +196,17 @@ void *syncStart(const SSyncInfo *pInfo) { syncAddArbitrator(pNode); syncAddNodeRef(pNode); taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); - - if (pNode->notifyRole) - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + + if (pNode->notifyRole) { + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + } return pNode; } void syncStop(void *param) { - SSyncNode * pNode = param; - SSyncPeer *pPeer; + SSyncNode *pNode = param; + SSyncPeer *pPeer; if (pNode == NULL) return; sInfo("vgId:%d, cleanup sync", pNode->vgId); @@ -215,7 +215,7 @@ void syncStop(void *param) { for (int i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; - if (pPeer) syncRemovePeer(pPeer); + if (pPeer) syncRemovePeer(pPeer); } pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; @@ -230,19 +230,19 @@ void syncStop(void *param) { } int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { - SSyncNode * pNode = param; - int i, j; + SSyncNode *pNode = param; + int i, j; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; - sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], - pNewCfg->replica, pNode->replica); + sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, + pNode->replica); pthread_mutex_lock(&(pNode->mutex)); for (i = 0; i < pNode->replica; ++i) { for (j = 0; j < pNewCfg->replica; ++j) { - if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) && - (pNode->peerInfo[i]->port == pNewCfg->nodeInfo[j].nodePort)) + if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) && + (pNode->peerInfo[i]->port == pNewCfg->nodeInfo[j].nodePort)) break; } @@ -257,8 +257,8 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { const SNodeInfo *pNewNode = &pNewCfg->nodeInfo[i]; for (j = 0; j < pNode->replica; ++j) { - if (pNode->peerInfo[j] && (strcmp(pNode->peerInfo[j]->fqdn, pNewNode->nodeFqdn) == 0) && - (pNode->peerInfo[j]->port == pNewNode->nodePort)) + if (pNode->peerInfo[j] && (strcmp(pNode->peerInfo[j]->fqdn, pNewNode->nodeFqdn) == 0) && + (pNode->peerInfo[j]->port == pNewNode->nodePort)) break; } @@ -268,8 +268,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { newPeers[i] = pNode->peerInfo[j]; } - if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) + if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) { pNode->selfIndex = i; + } } pNode->replica = pNewCfg->replica; @@ -277,8 +278,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica); - for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) + for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) { pNode->peerInfo[i] = NULL; + } syncAddArbitrator(pNode); @@ -290,43 +292,44 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { pthread_mutex_unlock(&(pNode->mutex)); - sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]); + sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, + syncRole[nodeRole]); syncBroadcastStatus(pNode); return 0; } int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { - SSyncNode * pNode = param; - SSyncPeer * pPeer; - SSyncHead *pSyncHead; - SWalHead *pWalHead = data; - int fwdLen; - int code = 0; + SSyncNode *pNode = param; + SSyncPeer *pPeer; + SSyncHead *pSyncHead; + SWalHead * pWalHead = data; + int fwdLen; + int code = 0; if (pNode == NULL) return 0; // always update version nodeVersion = pWalHead->version; - if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER ) return 0; + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; // only pkt from RPC or CQ can be forwarded if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; // a hacker way to improve the performance - pSyncHead = (SSyncHead *) ( ((char *)pWalHead) - sizeof(SSyncHead)); + pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead->type = TAOS_SMSG_FORWARD; pSyncHead->pversion = 0; - pSyncHead->len = sizeof(SWalHead) + pWalHead->len; - fwdLen = pSyncHead->len + sizeof(SSyncHead); //include the WAL and SYNC head + pSyncHead->len = sizeof(SWalHead) + pWalHead->len; + fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head pthread_mutex_lock(&(pNode->mutex)); for (int i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; - if (pPeer == NULL || pPeer->peerFd <0) continue; - if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; - + if (pPeer == NULL || pPeer->peerFd < 0) continue; + if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; + if (pNode->quorum > 1 && code == 0) { syncSaveFwdInfo(pNode, pWalHead->version, mhandle); code = 1; @@ -351,12 +354,12 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) { if (pNode == NULL) return; if (pNode->quorum <= 1) return; - SSyncPeer *pPeer = pNode->pMaster; + SSyncPeer *pPeer = pNode->pMaster; if (pPeer == NULL) return; char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; - SSyncHead *pHead = (SSyncHead *) msg; + SSyncHead *pHead = (SSyncHead *)msg; pHead->type = TAOS_SMSG_FORWARD_RSP; pHead->len = sizeof(SFwdRsp); @@ -379,7 +382,7 @@ void syncRecover(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; - // to do: add a few lines to check if recover is OK + // to do: add a few lines to check if recover is OK // if take this node to unsync state, the whole system may not work nodeRole = TAOS_SYNC_ROLE_UNSYNCED; @@ -389,7 +392,7 @@ void syncRecover(void *param) { pthread_mutex_lock(&(pNode->mutex)); for (int i = 0; i < pNode->replica; ++i) { - pPeer = (SSyncPeer *) pNode->peerInfo[i]; + pPeer = (SSyncPeer *)pNode->peerInfo[i]; if (pPeer->peerFd >= 0) { syncRestartConnection(pPeer); } @@ -402,7 +405,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { SSyncNode *pNode = param; pNodesRole->selfIndex = pNode->selfIndex; - for (int i=0; ireplica; ++i) { + for (int i = 0; i < pNode->replica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->role[i] = pNode->peerInfo[i]->role; } @@ -426,7 +429,7 @@ static void syncAddArbitrator(SSyncNode *pNode) { if (-1 == ret) { nodeInfo.nodePort = tsArbitratorPort; } - + if (pPeer) { if ((strcmp(nodeInfo.nodeFqdn, pPeer->fqdn) == 0) && (nodeInfo.nodePort == pPeer->port)) { return; @@ -434,18 +437,16 @@ static void syncAddArbitrator(SSyncNode *pNode) { syncRemovePeer(pPeer); pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = NULL; } - } + } pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); } -static void syncAddNodeRef(SSyncNode *pNode) -{ - atomic_add_fetch_8(&pNode->refCount, 1); +static void syncAddNodeRef(SSyncNode *pNode) { + atomic_add_fetch_8(&pNode->refCount, 1); } -static void syncDecNodeRef(SSyncNode *pNode) -{ +static void syncDecNodeRef(SSyncNode *pNode) { if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { pthread_mutex_destroy(&pNode->mutex); taosTFree(pNode->pRecv); @@ -455,7 +456,7 @@ static void syncDecNodeRef(SSyncNode *pNode) } void syncAddPeerRef(SSyncPeer *pPeer) { - atomic_add_fetch_8(&pPeer->refCount, 1); + atomic_add_fetch_8(&pPeer->refCount, 1); } int syncDecPeerRef(SSyncPeer *pPeer) { @@ -491,8 +492,8 @@ static void syncRemovePeer(SSyncPeer *pPeer) { static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); if (ip == -1) return NULL; - - SSyncPeer *pPeer = (SSyncPeer *) calloc(1, sizeof(SSyncPeer)); + + SSyncPeer *pPeer = (SSyncPeer *)calloc(1, sizeof(SSyncPeer)); if (pPeer == NULL) return NULL; pPeer->nodeId = pInfo->nodeId; @@ -511,9 +512,9 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { int ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { sDebug("%s, start to check peer connection", pPeer->id); - taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId*10)%100, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId * 10) % 100, pPeer, syncTmrCtrl, &pPeer->timer); } - + syncAddNodeRef(pNode); return pPeer; } @@ -547,16 +548,18 @@ static void syncChooseMaster(SSyncNode *pNode) { sDebug("vgId:%d, choose master", pNode->vgId); for (int i = 0; i < pNode->replica; ++i) { - if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) + if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { onlineNum++; + } } if (onlineNum == pNode->replica) { // if all peers are online, peer with highest version shall be master index = 0; for (int i = 1; i < pNode->replica; ++i) { - if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) + if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) { index = i; + } } } @@ -573,8 +576,9 @@ static void syncChooseMaster(SSyncNode *pNode) { //slave with highest version shall be master pPeer = pNode->peerInfo[i]; if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) { - if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) + if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) { index = i; + } } } } @@ -600,8 +604,9 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { int replica = pNode->replica; for (int i = 0; i < pNode->replica; ++i) { - if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) + if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { onlineNum++; + } } // add arbitrator connection @@ -649,7 +654,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) { code = -1; for (int i = 0; i < pNode->replica; ++i) { - if ( i == pNode->selfIndex ) continue; + if (i == pNode->selfIndex) continue; syncRestartPeer(pNode->peerInfo[i]); } } @@ -666,12 +671,11 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne pNode->peerInfo[pNode->selfIndex]->version = nodeVersion; pPeer->role = newRole; - sDebug("%s, own role:%s, new peer role:%s", pPeer->id, - syncRole[nodeRole], syncRole[pPeer->role]); + sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]); SSyncPeer *pMaster = syncCheckMaster(pNode); - if ( pMaster ) { + if (pMaster) { // master is there pNode->pMaster = pMaster; sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version); @@ -696,27 +700,30 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne for (i = 0; i < pNode->replica; ++i) { SSyncPeer *pTemp = pNode->peerInfo[i]; if (pTemp->role != peersStatus[i].role) break; - if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[i].version)) break; + if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[i].version)) break; } - + if (i >= pNode->replica) consistent = 1; } else { if (pNode->replica == 2) consistent = 1; } - if (consistent) + if (consistent) { syncChooseMaster(pNode); + } } if (syncRequired) { syncRecoverFromMaster(pMaster); } - if (peerOldRole != newRole || nodeRole != selfOldRole) + if (peerOldRole != newRole || nodeRole != selfOldRole) { syncBroadcastStatus(pNode); + } - if (nodeRole != TAOS_SYNC_ROLE_MASTER) + if (nodeRole != TAOS_SYNC_ROLE_MASTER) { syncResetFlowCtrl(pNode); + } } static void syncRestartPeer(SSyncPeer *pPeer) { @@ -727,8 +734,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) { pPeer->sstatus = TAOS_SYNC_STATUS_INIT; int ret = strcmp(pPeer->fqdn, tsNodeFqdn); - if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) + if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); + } } void syncRestartConnection(SSyncPeer *pPeer) { @@ -752,13 +760,13 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { if (pPeer->sstatus != TAOS_SYNC_STATUS_INIT) { sDebug("%s, sync is already started", pPeer->id); - return; // already started + return; // already started } // start a new thread to retrieve the data syncAddPeerRef(pPeer); - pthread_attr_t thattr; - pthread_t thread; + pthread_attr_t thattr; + pthread_t thread; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); int ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer); @@ -785,8 +793,8 @@ static void syncNotStarted(void *param, void *tmrId) { } static void syncTryRecoverFromMaster(void *param, void *tmrId) { - SSyncPeer *pPeer = param; - SSyncNode *pNode = pPeer->pSyncNode; + SSyncPeer *pPeer = param; + SSyncNode *pNode = pPeer->pSyncNode; pthread_mutex_lock(&(pNode->mutex)); syncRecoverFromMaster(pPeer); @@ -810,7 +818,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { return; } - sDebug("%s, try to sync", pPeer->id) + sDebug("%s, try to sync", pPeer->id); SFirstPkt firstPkt; memset(&firstPkt, 0, sizeof(firstPkt)); @@ -819,49 +827,47 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; - taosTmrReset(syncNotStarted, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); - if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt) ) { + if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { sError("%s, failed to send sync-req to peer", pPeer->id); } else { nodeSStatus = TAOS_SYNC_STATUS_START; sInfo("%s, sync-req is sent", pPeer->id); } - - return; } static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { - SSyncNode * pNode = pPeer->pSyncNode; - SFwdRsp *pFwdRsp = (SFwdRsp *) cont; - SSyncFwds *pSyncFwds = pNode->pSyncFwds; - SFwdInfo *pFwdInfo; + SSyncNode *pNode = pPeer->pSyncNode; + SFwdRsp * pFwdRsp = (SFwdRsp *)cont; + SSyncFwds *pSyncFwds = pNode->pSyncFwds; + SFwdInfo * pFwdInfo; sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version); SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { // find the forwardInfo from first - for (int i=0; ifwds; ++i) { - pFwdInfo = pSyncFwds->fwdInfo + (i+pSyncFwds->first)%tsMaxFwdInfo; + for (int i = 0; i < pSyncFwds->fwds; ++i) { + pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo; if (pFwdRsp->version == pFwdInfo->version) break; } - + syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code); syncRemoveConfirmedFwdInfo(pNode); } } static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { - SSyncNode * pNode = pPeer->pSyncNode; - SWalHead *pHead = (SWalHead *)cont; + SSyncNode *pNode = pPeer->pSyncNode; + SWalHead * pHead = (SWalHead *)cont; sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { - //nodeVersion = pHead->version; + // nodeVersion = pHead->version; (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); - } else { + } else { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { syncSaveIntoBuffer(pPeer, pHead); } else { @@ -882,12 +888,13 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { pPeer->version = pPeersStatus->version; syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role); - if (pPeersStatus->ack) + if (pPeersStatus->ack) { syncSendPeersStatusMsgToPeer(pPeer, 0); + } } static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { - if (pPeer->peerFd <0) return -1; + if (pPeer->peerFd < 0) return -1; int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); if (hlen != sizeof(SSyncHead)) { @@ -911,9 +918,9 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { } static int syncProcessPeerMsg(void *param, void *buffer) { - SSyncPeer * pPeer = param; - SSyncHead head; - char *cont = (char *)buffer; + SSyncPeer *pPeer = param; + SSyncHead head; + char * cont = (char *)buffer; SSyncNode *pNode = pPeer->pSyncNode; pthread_mutex_lock(&(pNode->mutex)); @@ -937,16 +944,16 @@ static int syncProcessPeerMsg(void *param, void *buffer) { return code; } -#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA +#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { SSyncNode *pNode = pPeer->pSyncNode; char msg[statusMsgLen] = {0}; - if (pPeer->peerFd <0 || pPeer->ip ==0) return; + if (pPeer->peerFd < 0 || pPeer->ip == 0) return; - SSyncHead *pHead = (SSyncHead *) msg; - SPeersStatus *pPeersStatus = (SPeersStatus *) (msg + sizeof(SSyncHead)); + SSyncHead * pHead = (SSyncHead *)msg; + SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead)); pHead->type = TAOS_SMSG_STATUS; pHead->len = statusMsgLen - sizeof(SSyncHead); @@ -984,28 +991,28 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (connFd < 0) { sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno)); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); return; } SFirstPkt firstPkt; memset(&firstPkt, 0, sizeof(firstPkt)); - firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId:0; + firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0; firstPkt.syncHead.type = TAOS_SMSG_STATUS; - tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); + tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { sDebug("%s, connection to peer server is setup", pPeer->id); - pPeer->peerFd = connFd; + pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); syncAddPeerRef(pPeer); } else { sDebug("try later"); close(connFd); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); } } @@ -1016,7 +1023,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, check peer connection", pPeer->id); - syncSetupPeerConnection(pPeer); + syncSetupPeerConnection(pPeer); pthread_mutex_unlock(&(pNode->mutex)); } @@ -1025,7 +1032,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); pthread_attr_t thattr; - pthread_t thread; + pthread_t thread; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); @@ -1037,15 +1044,15 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { sError("%s, failed to create sync thread", pPeer->id); taosClose(pPeer->syncFd); syncDecPeerRef(pPeer); - } else { + } else { sInfo("%s, sync connection is up", pPeer->id); } } static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { - char ipstr[24]; - int i; - + char ipstr[24]; + int i; + tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); @@ -1070,8 +1077,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { SSyncPeer *pPeer; for (i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; - if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) - break; + if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break; } pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL; @@ -1096,8 +1102,6 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { } pthread_mutex_unlock(&(pNode->mutex)); - - return; } static void syncProcessBrokenLink(void *param) { @@ -1126,10 +1130,12 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { if (pSyncFwds->fwds >= tsMaxFwdInfo) { pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; pSyncFwds->fwds--; - } + } + + if (pSyncFwds->fwds > 0) { + pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo; + } - if (pSyncFwds->fwds > 0) - pSyncFwds->last = (pSyncFwds->last+1) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; pFwdInfo->version = version; pFwdInfo->mhandle = mhandle; @@ -1145,14 +1151,14 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; int fwds = pSyncFwds->fwds; - for (int i=0; ifwdInfo + pSyncFwds->first; + for (int i = 0; i < fwds; ++i) { + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFwdInfo->confirmed == 0) break; - pSyncFwds->first = (pSyncFwds->first+1) % tsMaxFwdInfo; + pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; pSyncFwds->fwds--; if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; - //sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d", + // sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d", // pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); memset(pFwdInfo, 0, sizeof(SFwdInfo)); } @@ -1164,12 +1170,14 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code if (code == 0) { pFwdInfo->acks++; - if (pFwdInfo->acks >= pNode->quorum-1) + if (pFwdInfo->acks >= pNode->quorum - 1) { confirm = 1; + } } else { pFwdInfo->nacks++; - if (pFwdInfo->nacks > pNode->replica-pNode->quorum) + if (pFwdInfo->nacks > pNode->replica - pNode->quorum) { confirm = 1; + } } if (confirm && pFwdInfo->confirmed == 0) { @@ -1186,15 +1194,15 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { if (pSyncFwds->fwds > 0) { pthread_mutex_lock(&(pNode->mutex)); - for (int i=0; ifwds; ++i) { - SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first+i) % tsMaxFwdInfo; + for (int i = 0; i < pSyncFwds->fwds; ++i) { + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; if (time - pFwdInfo->time < 2000) break; syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); } syncRemoveConfirmedFwdInfo(pNode); pthread_mutex_unlock(&(pNode->mutex)); - } - + } + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); }