diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 4776d1cda79bf26cab94c9af52dfbf2c6da762de..be73d8d383e231235a4bdb07635f077803f1a03a 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -115,7 +115,7 @@ int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); -uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); +uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); // the TSDB repository info typedef struct STsdbRepoInfo { diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 2dfac7ec3282bc2f404eeffa4ccc7e85ad6cf78a..ff9c9901bd91e05f443ada68508cdccbc13ef066 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -64,7 +64,7 @@ typedef struct { if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated. */ -typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); +typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 3651aa8aade8d2f8b5393532d395c8f4809a715b..9efb804734558278b9d6e9efeb29c4d03b65e21d 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -224,7 +224,7 @@ void sdbUpdateMnodeRoles() { mnodeUpdateMnodeEpSet(); } -static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { +static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { sdbUpdateMnodeRoles(); return 0; } diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index be1e01cb2371b82acf4cf63e896a6fed68399222..cd1252f4b414f5ad2dcd56e2a7969ab433e40452 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -74,7 +74,7 @@ typedef struct { uint32_t magic; uint32_t index; uint64_t fversion; - int32_t size; + int64_t size; } SFileInfo; typedef struct { diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index b6babd41eb8d0be6986db3647bde747260ca84af..b92ef4cf260f831c3ddb3c1ebb34b7d4735f3056 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -108,8 +108,7 @@ static void syncModuleInitFunc() { tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); } -void *syncStart(const SSyncInfo *pInfo) -{ +void *syncStart(const SSyncInfo *pInfo) { const SSyncCfg *pCfg = &pInfo->syncCfg; SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1); @@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo) return pNode; } -void syncStop(void *param) -{ - SSyncNode *pNode = param; +void syncStop(void *param) { + SSyncNode * pNode = param; SSyncPeer *pPeer; if (pNode == NULL) return; @@ -215,9 +213,8 @@ void syncStop(void *param) syncDecNodeRef(pNode); } -int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) -{ - SSyncNode *pNode = param; +int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { + SSyncNode * pNode = param; int i, j; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; @@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) return 0; } -int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) -{ - SSyncNode *pNode = param; - SSyncPeer *pPeer; +int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { + SSyncNode * pNode = param; + SSyncPeer * pPeer; SSyncHead *pSyncHead; SWalHead *pWalHead = data; int fwdLen; @@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) return code; } -void syncConfirmForward(void *param, uint64_t version, int32_t code) -{ - SSyncNode *pNode = param; +void syncConfirmForward(void *param, uint64_t version, int32_t code) { + SSyncNode *pNode = param; if (pNode == NULL) return; if (pNode->quorum <= 1) return; @@ -387,10 +382,9 @@ void syncRecover(void *param) { pthread_mutex_unlock(&(pNode->mutex)); } -int syncGetNodesRole(void *param, SNodesRole *pNodesRole) -{ +int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { SSyncNode *pNode = param; - + pNodesRole->selfIndex = pNode->selfIndex; for (int i=0; ireplica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; @@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) return 0; } -static void syncAddArbitrator(SSyncNode *pNode) -{ +static void syncAddArbitrator(SSyncNode *pNode) { SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; // if not configured, return right away @@ -456,13 +449,11 @@ static void syncDecNodeRef(SSyncNode *pNode) } } -void syncAddPeerRef(SSyncPeer *pPeer) -{ +void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } -int syncDecPeerRef(SSyncPeer *pPeer) -{ +int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { syncDecNodeRef(pPeer->pSyncNode); @@ -475,18 +466,16 @@ int syncDecPeerRef(SSyncPeer *pPeer) return 1; } -static void syncClosePeerConn(SSyncPeer *pPeer) -{ +static void syncClosePeerConn(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); taosClose(pPeer->syncFd); - if (pPeer->peerFd >=0) { + if (pPeer->peerFd >= 0) { pPeer->peerFd = -1; taosFreeTcpConn(pPeer->pConn); } } -static void syncRemovePeer(SSyncPeer *pPeer) -{ +static void syncRemovePeer(SSyncPeer *pPeer) { sInfo("%s, it is removed", pPeer->id); pPeer->ip = 0; @@ -494,8 +483,7 @@ static void syncRemovePeer(SSyncPeer *pPeer) syncDecPeerRef(pPeer); } -static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) -{ +static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); if (ip == -1) return NULL; @@ -525,25 +513,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) return pPeer; } -void syncBroadcastStatus(SSyncNode *pNode) -{ +void syncBroadcastStatus(SSyncNode *pNode) { SSyncPeer *pPeer; for (int i = 0; i < pNode->replica; ++i) { - if ( i == pNode->selfIndex ) continue; + if (i == pNode->selfIndex) continue; pPeer = pNode->peerInfo[i]; syncSendPeersStatusMsgToPeer(pPeer, 1); } -} +} static void syncResetFlowCtrl(SSyncNode *pNode) { - for (int i = 0; i < pNode->replica; ++i) { pNode->peerInfo[i]->numOfRetrieves = 0; } - if (pNode->notifyFlowCtrl) - (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + if (pNode->notifyFlowCtrl) { + (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + } } static void syncChooseMaster(SSyncNode *pNode) { @@ -600,9 +587,9 @@ static void syncChooseMaster(SSyncNode *pNode) { } else { sDebug("vgId:%d, failed to choose master", pNode->vgId); } -} - -static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { +} + +static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { int onlineNum = 0; int index = -1; int replica = pNode->replica; @@ -619,7 +606,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { replica = pNode->replica + 1; } - if (onlineNum <= replica*0.5) { + if (onlineNum <= replica * 0.5) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; pNode->peerInfo[pNode->selfIndex]->role = nodeRole; @@ -627,13 +614,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); } } else { - for (int i=0; ireplica; ++i) { + for (int i = 0; i < pNode->replica; ++i) { SSyncPeer *pTemp = pNode->peerInfo[i]; - if ( pTemp->role != TAOS_SYNC_ROLE_MASTER ) continue; - if ( index < 0 ) { + if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue; + if (index < 0) { index = i; - } else { // multiple masters, it shall not happen - if ( i == pNode->selfIndex ) { + } else { // multiple masters, it shall not happen + if (i == pNode->selfIndex) { sError("%s, peer is master, work as slave instead", pTemp->id); nodeRole = TAOS_SYNC_ROLE_SLAVE; (*pNode->notifyRole)(pNode->ahandle, nodeRole); @@ -642,7 +629,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { } } - SSyncPeer *pMaster = (index>=0) ? pNode->peerInfo[index]:NULL; + SSyncPeer *pMaster = (index >= 0) ? pNode->peerInfo[index] : NULL; return pMaster; } @@ -651,7 +638,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) { int code = 0; if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { - sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); + sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); nodeRole = TAOS_SYNC_ROLE_UNSYNCED; (*pNode->notifyRole)(pNode->ahandle, nodeRole); code = -1; @@ -660,13 +647,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) { if ( i == pNode->selfIndex ) continue; syncRestartPeer(pNode->peerInfo[i]); } - } + } return code; } -static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) -{ +static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) { SSyncNode *pNode = pPeer->pSyncNode; int8_t peerOldRole = pPeer->role; int8_t selfOldRole = nodeRole; @@ -688,14 +674,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne if (syncValidateMaster(pPeer) < 0) return; if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { - if ( nodeVersion < pMaster->version) { + if (nodeVersion < pMaster->version) { syncRequired = 1; } else { sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version); nodeRole = TAOS_SYNC_ROLE_SLAVE; (*pNode->notifyRole)(pNode->ahandle, nodeRole); } - } else if ( nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { + } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { // nodeVersion = pMaster->version; } } else { @@ -736,20 +722,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) { pPeer->sstatus = TAOS_SYNC_STATUS_INIT; int ret = strcmp(pPeer->fqdn, tsNodeFqdn); - if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort) ) - taosTmrReset(syncCheckPeerConnection, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); + if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); } -void syncRestartConnection(SSyncPeer *pPeer) -{ +void syncRestartConnection(SSyncPeer *pPeer) { if (pPeer->ip == 0) return; syncRestartPeer(pPeer); syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE); } -static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) -{ +static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; sDebug("%s, sync-req is received", pPeer->id); @@ -784,8 +768,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) } } -static void syncNotStarted(void *param, void *tmrId) -{ +static void syncNotStarted(void *param, void *tmrId) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; @@ -805,14 +788,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { pthread_mutex_unlock(&(pNode->mutex)); } -static void syncRecoverFromMaster(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncRecoverFromMaster(SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; - if ( nodeSStatus != TAOS_SYNC_STATUS_INIT) { + if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus); return; - } + } taosTmrStopA(&pPeer->timer); if (tsSyncNum >= tsMaxSyncNum) { @@ -842,9 +824,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) return; } -static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SFwdRsp *pFwdRsp = (SFwdRsp *) cont; SSyncFwds *pSyncFwds = pNode->pSyncFwds; SFwdInfo *pFwdInfo; @@ -864,10 +845,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) } } - -static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SWalHead *pHead = (SWalHead *)cont; sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); @@ -886,9 +865,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) return; } -static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SPeersStatus *pPeersStatus = (SPeersStatus *)cont; sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, @@ -911,10 +889,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { } // head.len = htonl(head.len); - if (pHead->len <0) { + if (pHead->len < 0) { sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len); return -1; - } + } int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); if (bytes != pHead->len) { @@ -925,9 +903,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { return 0; } -static int syncProcessPeerMsg(void *param, void *buffer) -{ - SSyncPeer *pPeer = param; +static int syncProcessPeerMsg(void *param, void *buffer) { + SSyncPeer * pPeer = param; SSyncHead head; char *cont = (char *)buffer; @@ -955,8 +932,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) #define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA -static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) -{ +static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { SSyncNode *pNode = pPeer->pSyncNode; char msg[statusMsgLen] = {0}; @@ -1013,7 +989,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { firstPkt.port = tsSyncPort; firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId - if ( write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { + if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { sDebug("%s, connection to peer server is setup", pPeer->id); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; @@ -1026,8 +1002,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { } } -static void syncCheckPeerConnection(void *param, void *tmrId) -{ +static void syncCheckPeerConnection(void *param, void *tmrId) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; @@ -1039,8 +1014,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) pthread_mutex_unlock(&(pNode->mutex)); } -static void syncCreateRestoreDataThread(SSyncPeer *pPeer) -{ +static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); pthread_attr_t thattr; @@ -1061,8 +1035,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) } } -static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) -{ +static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { char ipstr[24]; int i; @@ -1139,8 +1112,7 @@ static void syncProcessBrokenLink(void *param) { syncDecNodeRef(pNode); } -static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) -{ +static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; uint64_t time = taosGetTimestampMs(); @@ -1162,8 +1134,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); } -static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) -{ +static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; int fwds = pSyncFwds->fwds; @@ -1180,8 +1151,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) } } -static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) -{ +static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) { int confirm = 0; if (pFwdInfo->code == 0) pFwdInfo->code = code; @@ -1202,8 +1172,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } } -static void syncMonitorFwdInfos(void *param, void *tmrId) -{ +static void syncMonitorFwdInfos(void *param, void *tmrId) { SSyncNode *pNode = param; SSyncFwds *pSyncFwds = pNode->pSyncFwds; uint64_t time = taosGetTimestampMs(); @@ -1222,6 +1191,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); } - - - diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 505882b20943e47322a81da6491e59dd550d6577..2a0bee3726d939cc9da436af36ce02d45dad34d5 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -28,7 +28,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind char fname[TSDB_FILENAME_LEN*3] = {0}; uint32_t magic; uint64_t fversion; - int32_t size; + int64_t size; uint32_t index = sindex; SSyncNode *pNode = pPeer->pSyncNode; @@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind } } -static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) -{ +static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info @@ -113,7 +112,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) close(dfd); if (ret<0) break; - sDebug("%s, %s is received, size:%d", pPeer->id, minfo.name, minfo.size); + sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size); } @@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) return code; } -static int syncRestoreWal(SSyncPeer *pPeer) -{ +static int syncRestoreWal(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; int ret, code = -1; @@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) return offset; } -static int syncProcessBufferedFwd(SSyncPeer *pPeer) -{ +static int syncProcessBufferedFwd(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; int forwards = 0; @@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) return pRecv->code; } -int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) -{ +int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { SSyncNode *pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; @@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) return pRecv->code; } -static void syncCloseRecvBuffer(SSyncNode *pNode) -{ +static void syncCloseRecvBuffer(SSyncNode *pNode) { if (pNode->pRecv) { taosTFree(pNode->pRecv->buffer); } @@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode) taosTFree(pNode->pRecv); } -static int syncOpenRecvBuffer(SSyncNode *pNode) -{ +static int syncOpenRecvBuffer(SSyncNode *pNode) { syncCloseRecvBuffer(pNode); SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); @@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode) return 0; } -static int syncRestoreDataStepByStep(SSyncPeer *pPeer) -{ +static int syncRestoreDataStepByStep(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; nodeSStatus = TAOS_SYNC_STATUS_FILE; uint64_t fversion = 0; @@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer) return 0; } -void *syncRestoreData(void *param) -{ - SSyncPeer *pPeer = (SSyncPeer *)param; - SSyncNode *pNode = pPeer->pSyncNode; +void *syncRestoreData(void *param) { + SSyncPeer *pPeer = (SSyncPeer *)param; + SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); __sync_fetch_and_add(&tsSyncNum, 1); @@ -326,4 +318,3 @@ void *syncRestoreData(void *param) return NULL; } - diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 1dd1cda343e3d507e1f251eedb427791bd812e63..8aa317b1ac7c770b6b102545716656541b8bb67e 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -27,11 +27,10 @@ #include "tsync.h" #include "syncInt.h" -static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) -{ +static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { sDebug("%s, start to monitor:%s", pPeer->id, name); - if (pPeer->notifyFd <=0) { + if (pPeer->notifyFd <= 0) { pPeer->watchNum = 0; pPeer->notifyFd = inotify_init1(IN_NONBLOCK); if (pPeer->notifyFd < 0) { @@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) return 0; } -static int syncAreFilesModified(SSyncPeer *pPeer) -{ - if (pPeer->notifyFd <=0) return 0; +static int syncAreFilesModified(SSyncPeer *pPeer) { + if (pPeer->notifyFd <= 0) return 0; char buf[2048]; int len = read(pPeer->notifyFd, buf, sizeof(buf)); @@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) } } - return code; + return code; } -static int syncRetrieveFile(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static int syncRetrieveFile(SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SFileInfo fileInfo; SFileAck fileAck; int code = -1; @@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) // wait for the ack from peer ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); - if (ret <0) break; + if (ret < 0) break; // set the peer sync version pPeer->sversion = fileInfo.fversion; @@ -148,13 +145,13 @@ static int syncRetrieveFile(SSyncPeer *pPeer) // send the file to peer int sfd = open(name, O_RDONLY); - if ( sfd < 0 ) break; + if (sfd < 0) break; ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); - close(sfd); - if (ret <0) break; + close(sfd); + if (ret < 0) break; - sDebug("%s, %s is sent, size:%d", pPeer->id, name, fileInfo.size); + sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size); fileInfo.index++; // check if processed files are modified @@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) /* if only a partial record is read out, set the IN_MODIFY flag in event, so upper layer will reload the file to get a complete record */ -static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) -{ +static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) { int ret; ret = read(sfd, pHead, sizeof(SWalHead)); @@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) } ret = read(sfd, pHead->cont, pHead->len); - if (ret <0) return -1; + if (ret < 0) return -1; if (ret != pHead->len) { // file is not at end yet, it shall be reloaded @@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) } return sizeof(SWalHead) + pHead->len; -} +} -static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) -{ +static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { pPeer->watchNum = 0; taosClose(pPeer->notifyFd); pPeer->notifyFd = inotify_init1(IN_NONBLOCK); @@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) return -1; } - return 0; + return 0; } -static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) -{ +static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { char buf[2048]; int len = read(pPeer->notifyFd, buf, sizeof(buf)); - if (len <0 && errno != EAGAIN) { - sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); + if (len < 0 && errno != EAGAIN) { + sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); return -1; } - + if (len == 0) return 0; struct inotify_event *event; @@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) return 0; } -static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) -{ +static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { SWalHead *pHead = (SWalHead *) malloc(640000); int code = -1; int32_t bytes = 0; @@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); while (1) { - int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); - if (wsize <0) break; - if (wsize == 0) { code = 0; break; } + int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); + if (wsize < 0) break; + if (wsize == 0) { + code = 0; + break; + } sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); @@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, return -1; } -static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) -{ +static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { SSyncNode *pNode = pPeer->pSyncNode; int code = -1; char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file @@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) } if (code < 0) break; - if (pPeer->sversion >= fversion && fversion > 0) break; + if (pPeer->sversion >= fversion && fversion > 0) break; - index++; wname[0] = 0; + index++; + wname[0] = 0; code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); - if ( code < 0) break; - if ( wname[0] == 0 ) {code = 0; break;} + if (code < 0) break; + if (wname[0] == 0) { + code = 0; + break; + } // current last wal is closed, there is a new one sDebug("%s, last wal is closed, try new one", pPeer->id); @@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) return code; } -static int syncRetrieveWal(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static int syncRetrieveWal(SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; char fname[TSDB_FILENAME_LEN * 3]; char wname[TSDB_FILENAME_LEN * 2]; int32_t size; @@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) // send wal file, // inotify is not required, old wal file won't be modified, even remove is ok - if ( stat(fname, &fstat) < 0 ) break; + if (stat(fname, &fstat) < 0) break; size = fstat.st_size; sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); @@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer) return code; } -static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; SFirstPkt firstPkt; memset(&firstPkt, 0, sizeof(firstPkt)); @@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) return 0; } -void *syncRetrieveData(void *param) -{ - SSyncPeer *pPeer = (SSyncPeer *)param; +void *syncRetrieveData(void *param) { + SSyncPeer * pPeer = (SSyncPeer *)param; SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index b523728bf9357e17602c75e3822284c2871e8511..2f064ceb3690e4e8111d67fd2fce224add74717c 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param); static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static void taosStopPoolThread(SThreadObj* pThread); -void *taosOpenTcpThreadPool(SPoolInfo *pInfo) -{ +void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { pthread_attr_t thattr; SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); @@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) return pPool; } -void taosCloseTcpThreadPool(void *param) -{ +void taosCloseTcpThreadPool(void *param) { SPoolObj *pPool = (SPoolObj *)param; SThreadObj *pThread; @@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param) uDebug("%p TCP pool is closed", pPool); } -void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) -{ +void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { struct epoll_event event; SPoolObj *pPool = (SPoolObj *)param; @@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) return pConn; } -void taosFreeTcpConn(void *param) -{ - SConnObj *pConn = (SConnObj *)param; +void taosFreeTcpConn(void *param) { + SConnObj * pConn = (SConnObj *)param; SThreadObj *pThread = pConn->pThread; uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c index 4e55cc475f0b48f9786debf75532af16a48db787..96e3392cc52b990497c207e2ec598b14f3c33cbe 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/tarbitrator.c @@ -152,9 +152,8 @@ static void arbProcessBrokenLink(void *param) { taosTFree(pNode); } -static int arbProcessPeerMsg(void *param, void *buffer) -{ - SNodeConn *pNode = param; +static int arbProcessPeerMsg(void *param, void *buffer) { + SNodeConn * pNode = param; SSyncHead head; int bytes = 0; char *cont = (char *)buffer; @@ -176,7 +175,6 @@ static int arbProcessPeerMsg(void *param, void *buffer) } static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) { - struct sigaction act = {{0}}; act.sa_handler = SIG_IGN; sigaction(SIGTERM, &act, NULL); @@ -188,4 +186,3 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) // inform main thread to exit tsem_post(&tsArbSem); } - diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 44019ad96ec2d3ce321db339578564fb74a9bca1..b802905038f5ac7cb67321d4e6db637283d7875e 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -234,7 +234,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); } -uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) +uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { uint32_t magic; struct stat fstat; diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 7936ea423f23e627cc86b0390699b6efb1ae5b53..e7a86798ee8e3d168bacfb374f13fd45638c43e9 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -475,7 +475,7 @@ int tsdbUpdateFileHeader(SFile* pFile); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); -void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int32_t* size); +void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); // ------------------ tsdbRWHelper.c diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 154d7280241ed1501b2058485e6863dd3055c10f..29e46a88afa5e74f85e0075ceeff3d7071d67a6b 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -424,7 +424,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { } } -void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int32_t *size) { +void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { char buf[TSDB_FILE_HEAD_SIZE] = "\0"; uint32_t version = 0; STsdbFileInfo info = {0}; @@ -445,7 +445,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int32_t *size) { close(fd); *magic = info.magic; - *size = (int32_t)offset; + *size = offset; return; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 827e5c3c6d9d7cdeb752ce3013eca4d7fc15bf2e..ea912ad1f44100a37c7deff16a3c6f7d139d8cb0 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -212,7 +212,7 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * return 0; } -uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size) { +uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { STsdbRepo *pRepo = (STsdbRepo *)repo; // STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h index 3b4e8e375717f71d7305aba765ed7bc3685a46d1..b2b0ff05f58478e3778d3abab72ae3511f683aca 100644 --- a/src/util/inc/tkvstore.h +++ b/src/util/inc/tkvstore.h @@ -58,7 +58,7 @@ int tdKVStoreStartCommit(SKVStore *pStore); int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen); int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid); int tdKVStoreEndCommit(SKVStore *pStore); -void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size); +void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size); #ifdef __cplusplus } diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 696819e5d0c9c806512ba14040c5a2a997bbe390..6ba1d87d92ecf216bfde346f9d3ff1563515d34d 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -332,7 +332,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) { return 0; } -void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) { +void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) { char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; SStoreInfo info = {0}; @@ -349,7 +349,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) { close(fd); *magic = info.magic; - *size = (int32_t)offset; + *size = offset; return; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index c723c766e1d1b63b57f781b6b0f83fd58ae8ab2c..176a45116837eeb9c62be490de1d871659f741ce 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -41,7 +41,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeCtrlFlow(void *handle, int32_t mseconds); @@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->sync = syncStart(&syncInfo); if (pVnode->sync == NULL) { + vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, + tstrerror(terrno)); vnodeCleanUp(pVnode); return terrno; } @@ -536,7 +538,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { return 0; } -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { SVnodeObj *pVnode = ahandle; *fversion = pVnode->fversion; return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);