From 2caeccf623584feeacadb178c59a1d63359b170d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 27 Sep 2020 15:34:44 +0000 Subject: [PATCH] TD-1617 --- src/sync/inc/syncInt.h | 1 - src/sync/inc/taosTcpPool.h | 1 - src/sync/src/syncMain.c | 34 +++--- src/sync/src/syncRestore.c | 79 ++++++------- src/sync/src/syncRetrieve.c | 179 ++++++++++++++--------------- src/sync/src/taosTcpPool.c | 81 ++++++------- src/sync/test/syncClient.c | 90 +++++++-------- src/sync/test/syncServer.c | 219 +++++++++++++++++------------------- 8 files changed, 336 insertions(+), 348 deletions(-) diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 94df664219..f681810646 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -171,7 +171,6 @@ void syncBroadcastStatus(SSyncNode *pNode); void syncAddPeerRef(SSyncPeer *pPeer); int syncDecPeerRef(SSyncPeer *pPeer); - #ifdef __cplusplus } #endif diff --git a/src/sync/inc/taosTcpPool.h b/src/sync/inc/taosTcpPool.h index 5f7ca9ede5..261d190ad3 100644 --- a/src/sync/inc/taosTcpPool.h +++ b/src/sync/inc/taosTcpPool.h @@ -38,7 +38,6 @@ void taosCloseTcpThreadPool(ttpool_h); void *taosAllocateTcpConn(void *, void *ahandle, int connFd); void taosFreeTcpConn(void *); - #ifdef __cplusplus } #endif diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index ec5ffbfca7..c7b8959926 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -134,7 +134,7 @@ void syncCleanUp() { void *syncStart(const SSyncInfo *pInfo) { const SSyncCfg *pCfg = &pInfo->syncCfg; - SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1); + SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1); if (pNode == NULL) { sError("no memory to allocate syncNode"); terrno = TAOS_SYSTEM_ERROR(errno); @@ -168,7 +168,7 @@ void *syncStart(const SSyncInfo *pInfo) { } syncAddNodeRef(pNode); - + if (pNode->selfIndex < 0) { sInfo("vgId:%d, this node is not configured", pNode->vgId); terrno = TSDB_CODE_SYN_INVALID_CONFIG; @@ -176,11 +176,12 @@ void *syncStart(const SSyncInfo *pInfo) { return NULL; } - nodeVersion = pInfo->version; // set the initial version + nodeVersion = pInfo->version; // set the initial version nodeRole = (pNode->replica > 1) ? TAOS_SYNC_ROLE_UNSYNCED : TAOS_SYNC_ROLE_MASTER; - sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]); + sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, + syncRole[nodeRole]); - pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo*sizeof(SFwdInfo), 1); + pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1); if (pNode->pSyncFwds == NULL) { sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); terrno = TAOS_SYSTEM_ERROR(errno); @@ -443,9 +444,7 @@ static void syncAddArbitrator(SSyncNode *pNode) { pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); } -static void syncAddNodeRef(SSyncNode *pNode) { - atomic_add_fetch_8(&pNode->refCount, 1); -} +static void syncAddNodeRef(SSyncNode *pNode) { atomic_add_fetch_8(&pNode->refCount, 1); } static void syncDecNodeRef(SSyncNode *pNode) { if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { @@ -456,9 +455,7 @@ static void syncDecNodeRef(SSyncNode *pNode) { } } -void syncAddPeerRef(SSyncPeer *pPeer) { - atomic_add_fetch_8(&pPeer->refCount, 1); -} +void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { @@ -501,6 +498,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn)); pPeer->ip = ip; pPeer->port = pInfo->nodePort; + pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0; snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%d", pNode->vgId, pPeer->fqdn, pPeer->port); pPeer->peerFd = -1; @@ -573,10 +571,10 @@ static void syncChooseMaster(SSyncNode *pNode) { replica = pNode->replica + 1; } - if (index < 0 && onlineNum > replica/2.0) { + if (index < 0 && onlineNum > replica / 2.0) { // over half of nodes are online for (int i = 0; i < pNode->replica; ++i) { - //slave with highest version shall be master + // slave with highest version shall be master pPeer = pNode->peerInfo[i]; if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) { if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) { @@ -622,7 +620,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { if (onlineNum <= replica * 0.5) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; - //pNode->peerInfo[pNode->selfIndex]->role = nodeRole; + // pNode->peerInfo[pNode->selfIndex]->role = nodeRole; (*pNode->notifyRole)(pNode->ahandle, nodeRole); sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); } @@ -648,7 +646,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { static int syncValidateMaster(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; - int code = 0; + int code = 0; if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); @@ -671,7 +669,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne int8_t selfOldRole = nodeRole; int8_t i, syncRequired = 0; - //pNode->peerInfo[pNode->selfIndex]->version = nodeVersion; + // pNode->peerInfo[pNode->selfIndex]->version = nodeVersion; pPeer->role = newRole; sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]); @@ -877,8 +875,6 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version); } } - - return; } static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { @@ -1066,7 +1062,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { return; } - int32_t vgId = firstPkt.syncHead.vgId; + int32_t vgId = firstPkt.syncHead.vgId; SSyncNode **ppNode = (SSyncNode **)taosHashGet(vgIdHash, (const char *)&vgId, sizeof(int32_t)); if (ppNode == NULL || *ppNode == NULL) { sError("vgId:%d, vgId could not be found", vgId); diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 2a0bee3726..ebb6c3a0a9 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -23,10 +23,10 @@ #include "tsync.h" #include "syncInt.h" -static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eindex) { - char name[TSDB_FILENAME_LEN*2] = {0}; - char fname[TSDB_FILENAME_LEN*3] = {0}; - uint32_t magic; +static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex) { + char name[TSDB_FILENAME_LEN * 2] = {0}; + char fname[TSDB_FILENAME_LEN * 3] = {0}; + uint32_t magic; uint64_t fversion; int64_t size; uint32_t index = sindex; @@ -40,12 +40,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind if (magic == 0) break; snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name); - remove(fname); + (void)remove(fname); sDebug("%s, %s is removed", pPeer->id, fname); index++; if (index > eindex) break; - } + } } static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { @@ -62,35 +62,36 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { while (1) { // read file info int ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); - if (ret < 0 ) break; + if (ret < 0) break; // if no more file from master, break; if (minfo.name[0] == 0 || minfo.magic == 0) { sDebug("%s, no more files to restore", pPeer->id); // remove extra files after the current index - syncRemoveExtraFile(pPeer, sinfo.index+1, TAOS_SYNC_MAX_INDEX); - code = 0; + syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX); + code = 0; break; } - + // remove extra files on slave between the current and last index - syncRemoveExtraFile(pPeer, pindex+1, minfo.index-1); + syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1); pindex = minfo.index; // check the file info sinfo = minfo; sDebug("%s, get file info:%s", pPeer->id, minfo.name); - sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion); + sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, + &sinfo.fversion); // if file not there or magic is not the same, file shall be synced memset(&fileAck, 0, sizeof(fileAck)); - fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1:0; + fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0; // send file ack ret = taosWriteMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); - if (ret <0) break; - + if (ret < 0) break; + // if sync is not required, continue if (fileAck.sync == 0) { sDebug("%s, %s is the same", pPeer->id, minfo.name); @@ -99,10 +100,11 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { // if sync is required, open file, receive from master, and write to file // get the full path to file + minfo.name[sizeof(minfo.name) - 1] = 0; snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name); int dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); - if ( dfd < 0 ) { + if (dfd < 0) { sError("%s, failed to open file:%s", pPeer->id, name); break; } @@ -110,16 +112,15 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size); fsync(dfd); close(dfd); - if (ret<0) break; + if (ret < 0) break; sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size); - } if (code == 0 && (minfo.fversion != sinfo.fversion)) { - // data file is changed, code shall be set to 1 + // data file is changed, code shall be set to 1 *fversion = minfo.fversion; - code = 1; + code = 1; } if (code < 0) { @@ -130,8 +131,8 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { } static int syncRestoreWal(SSyncPeer *pPeer) { - SSyncNode *pNode = pPeer->pSyncNode; - int ret, code = -1; + SSyncNode *pNode = pPeer->pSyncNode; + int ret, code = -1; void *buffer = calloc(1024000, 1); // size for one record if (buffer == NULL) return -1; @@ -140,18 +141,21 @@ static int syncRestoreWal(SSyncPeer *pPeer) { while (1) { ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); - if (ret <0) break; + if (ret < 0) break; + + if (pHead->len == 0) { + code = 0; + break; + } // wal sync over - if (pHead->len == 0) {code = 0; break;} // wal sync over - ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); - if (ret <0) break; + if (ret < 0) break; sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL); } - if (code<0) { + if (code < 0) { sError("%s, failed to restore wal(%s)", pPeer->id, strerror(errno)); } @@ -159,10 +163,9 @@ static int syncRestoreWal(SSyncPeer *pPeer) { return code; } -static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) -{ +static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) { SSyncNode *pNode = pPeer->pSyncNode; - SWalHead *pHead = (SWalHead *) offset; + SWalHead * pHead = (SWalHead *)offset; (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); offset += pHead->len + sizeof(SWalHead); @@ -171,7 +174,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) } static int syncProcessBufferedFwd(SSyncPeer *pPeer) { - SSyncNode *pNode = pPeer->pSyncNode; + SSyncNode * pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; int forwards = 0; @@ -182,7 +185,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) { offset = syncProcessOneBufferedFwd(pPeer, offset); forwards++; } - + pthread_mutex_lock(&pNode->mutex); while (forwards < pRecv->forwards && pRecv->code == 0) { @@ -199,7 +202,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) { } int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { - SSyncNode *pNode = pPeer->pSyncNode; + SSyncNode * pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; if (pRecv == NULL) return -1; @@ -259,9 +262,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer) { return -1; } - // if code > 0, data file is changed, notify app, and pass the version + // if code > 0, data file is changed, notify app, and pass the version if (code > 0 && pNode->notifyFileSynced) { - if ( (*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0 ) { + if ((*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0) { sError("%s, app not in ready state", pPeer->id); return -1; } @@ -296,8 +299,8 @@ void *syncRestoreData(void *param) { if (syncOpenRecvBuffer(pNode) < 0) { sError("%s, failed to allocate recv buffer", pPeer->id); - } else { - if ( syncRestoreDataStepByStep(pPeer) == 0) { + } else { + if (syncRestoreDataStepByStep(pPeer) == 0) { sInfo("%s, it is synced successfully", pPeer->id); nodeRole = TAOS_SYNC_ROLE_SLAVE; syncBroadcastStatus(pNode); @@ -311,7 +314,7 @@ void *syncRestoreData(void *param) { (*pNode->notifyRole)(pNode->ahandle, nodeRole); nodeSStatus = TAOS_SYNC_STATUS_INIT; - taosClose(pPeer->syncFd) + taosClose(pPeer->syncFd); syncCloseRecvBuffer(pNode); __sync_fetch_and_sub(&tsSyncNum, 1); syncDecPeerRef(pPeer); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 8aa317b1ac..6d0b847afe 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -38,13 +38,13 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { return -1; } - if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles); + if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles); if (pPeer->watchFd == NULL) { sError("%s, failed to allocate watchFd", pPeer->id); return -1; } - memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles); + memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles); } int *wd = pPeer->watchFd + pPeer->watchNum; @@ -64,7 +64,7 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum); } - pPeer->watchNum = (pPeer->watchNum +1) % tsMaxWatchFiles; + pPeer->watchNum = (pPeer->watchNum + 1) % tsMaxWatchFiles; return 0; } @@ -72,20 +72,20 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { static int syncAreFilesModified(SSyncPeer *pPeer) { if (pPeer->notifyFd <= 0) return 0; - char buf[2048]; - int len = read(pPeer->notifyFd, buf, sizeof(buf)); + char buf[2048]; + int len = read(pPeer->notifyFd, buf, sizeof(buf)); if (len < 0 && errno != EAGAIN) { - sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); return -1; } - - int code = 0; - if (len > 0) { + + int code = 0; + if (len > 0) { const struct inotify_event *event; char *ptr; for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) { - event = (const struct inotify_event *) ptr; - if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) { + event = (const struct inotify_event *)ptr; + if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) { sDebug("%s, processed file is changed", pPeer->id); pPeer->fileChanged = 1; code = 1; @@ -98,11 +98,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) { } static int syncRetrieveFile(SSyncPeer *pPeer) { - SSyncNode * pNode = pPeer->pSyncNode; - SFileInfo fileInfo; - SFileAck fileAck; - int code = -1; - char name[TSDB_FILENAME_LEN * 2] = {0}; + SSyncNode *pNode = pPeer->pSyncNode; + SFileInfo fileInfo; + SFileAck fileAck; + int code = -1; + char name[TSDB_FILENAME_LEN * 2] = {0}; memset(&fileInfo, 0, sizeof(fileInfo)); memset(&fileAck, 0, sizeof(fileAck)); @@ -110,17 +110,19 @@ static int syncRetrieveFile(SSyncPeer *pPeer) { while (1) { // retrieve file info fileInfo.name[0] = 0; - fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); - //fileInfo.size = htonl(size); + fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, + &fileInfo.size, &fileInfo.fversion); + // fileInfo.size = htonl(size); // send the file info int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); - if (ret < 0 ) break; + if (ret < 0) break; // if no file anymore, break - if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { - sDebug("%s, no more files to sync", pPeer->id); - code = 0; break; + if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { + sDebug("%s, no more files to sync", pPeer->id); + code = 0; + break; } // wait for the ack from peer @@ -132,29 +134,29 @@ static int syncRetrieveFile(SSyncPeer *pPeer) { // get the full path to file snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name); - + // add the file into watch list - if ( syncAddIntoWatchList(pPeer, name) <0) break; + if (syncAddIntoWatchList(pPeer, name) < 0) break; // if sync is not required, continue if (fileAck.sync == 0) { - fileInfo.index++; - sDebug("%s, %s is the same", pPeer->id, fileInfo.name); - continue; + fileInfo.index++; + sDebug("%s, %s is the same", pPeer->id, fileInfo.name); + continue; } // send the file to peer int sfd = open(name, O_RDONLY); if (sfd < 0) break; - ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); + ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); close(sfd); if (ret < 0) break; - sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size); - fileInfo.index++; + sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size); + fileInfo.index++; - // check if processed files are modified + // check if processed files are modified if (syncAreFilesModified(pPeer) != 0) break; } @@ -201,15 +203,15 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { return -1; } - if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles); + if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles); if (pPeer->watchFd == NULL) { sError("%s, failed to allocate watchFd", pPeer->id); return -1; } - memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles); + memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles); int *wd = pPeer->watchFd; - + *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE); if (*wd == -1) { sError("%s, failed to watch last wal(%s)", pPeer->id, strerror(errno)); @@ -219,8 +221,8 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { return 0; } -static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { - char buf[2048]; +static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { + char buf[2048]; int len = read(pPeer->notifyFd, buf, sizeof(buf)); if (len < 0 && errno != EAGAIN) { sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); @@ -231,26 +233,29 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { struct inotify_event *event; for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) { - event = (struct inotify_event *) ptr; + event = (struct inotify_event *)ptr; if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY; if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE; } - if (pEvent != 0) - sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent); + if (pEvent != 0) sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent); return 0; } static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { - SWalHead *pHead = (SWalHead *) malloc(640000); - int code = -1; - int32_t bytes = 0; - int sfd; + SWalHead *pHead = malloc(640000); + int code = -1; + int32_t bytes = 0; + int sfd; sfd = open(name, O_RDONLY); - if (sfd < 0) return -1; - lseek(sfd, offset, SEEK_SET); + if (sfd < 0) { + free(pHead); + return -1; + } + + (void)lseek(sfd, offset, SEEK_SET); sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); while (1) { @@ -263,34 +268,34 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); - if ( ret != wsize ) break; + if (ret != wsize) break; pPeer->sversion = pHead->version; bytes += wsize; - + if (pHead->version >= fversion && fversion > 0) { - code = 0; - bytes = 0; + code = 0; + bytes = 0; break; } } free(pHead); - taosClose(sfd); + close(sfd); if (code == 0) return bytes; return -1; } static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { - SSyncNode *pNode = pPeer->pSyncNode; - int code = -1; - char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file + SSyncNode *pNode = pPeer->pSyncNode; + int code = -1; + char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file if (syncAreFilesModified(pPeer) != 0) return -1; while (1) { - int32_t once = 0; // last WAL has once ever been processed + int32_t once = 0; // last WAL has once ever been processed int64_t offset = 0; uint64_t fversion = 0; uint32_t event = 0; @@ -300,48 +305,48 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname); // monitor last wal - if (syncMonitorLastWal(pPeer, fname) <0) break; + if (syncMonitorLastWal(pPeer, fname) < 0) break; while (1) { int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event); if (bytes < 0) break; // check file changes - if (syncCheckLastWalChanges(pPeer, &event) <0) break; + if (syncCheckLastWalChanges(pPeer, &event) < 0) break; // if file is not updated or updated once, set the fversion and sstatus if (((event & IN_MODIFY) == 0) || once) { if (fversion == 0) { pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt - fversion = nodeVersion; // must read data to fversion + fversion = nodeVersion; // must read data to fversion } } // if all data up to fversion is read out, it is over if (pPeer->sversion >= fversion && fversion > 0) { - code = 0; + code = 0; sDebug("%s, data up to fversion:%ld has been read out, bytes:%d", pPeer->id, fversion, bytes); break; - } + } // if all data are read out, and no update if ((bytes == 0) && ((event & IN_MODIFY) == 0)) { // wal file is closed, break - if (event & IN_CLOSE_WRITE) { - code = 0; + if (event & IN_CLOSE_WRITE) { + code = 0; sDebug("%s, current wal is closed", pPeer->id); break; } - + // wal not closed, it means some data not flushed to disk, wait for a while usleep(10000); } - // if bytes>0, file is updated, or fversion is not reached but file still open, read again + // if bytes>0, file is updated, or fversion is not reached but file still open, read again once = 1; - offset += bytes; + offset += bytes; sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes); - event = event & (~IN_MODIFY); // clear IN_MODIFY flag + event = event & (~IN_MODIFY); // clear IN_MODIFY flag } if (code < 0) break; @@ -356,7 +361,7 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { break; } - // current last wal is closed, there is a new one + // current last wal is closed, there is a new one sDebug("%s, last wal is closed, try new one", pPeer->id); } @@ -377,14 +382,14 @@ static int syncRetrieveWal(SSyncPeer *pPeer) { while (1) { // retrieve wal info wname[0] = 0; - code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); + code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); if (code < 0) break; // error if (wname[0] == 0) { // no wal file sDebug("%s, no wal file", pPeer->id); break; - } - - if (code == 0) { // last wal + } + + if (code == 0) { // last wal code = syncProcessLastWal(pPeer, wname, index); break; } @@ -392,26 +397,26 @@ static int syncRetrieveWal(SSyncPeer *pPeer) { // get the full path to wal file snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); - // send wal file, + // send wal file, // inotify is not required, old wal file won't be modified, even remove is ok if (stat(fname, &fstat) < 0) break; size = fstat.st_size; - sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); + sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); int sfd = open(fname, O_RDONLY); if (sfd < 0) break; - code = taosTSendFile(pPeer->syncFd, sfd, NULL, size); - close(sfd); - if (code <0) break; + code = taosTSendFile(pPeer->syncFd, sfd, NULL, size); + close(sfd); + if (code < 0) break; - index++; + index++; - if (syncAreFilesModified(pPeer) != 0) break; + if (syncAreFilesModified(pPeer) != 0) break; } if (code == 0) { - sDebug("%s, wal retrieve is finished", pPeer->id); + sDebug("%s, wal retrieve is finished", pPeer->id); pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; SWalHead walHead; memset(&walHead, 0, sizeof(walHead)); @@ -433,12 +438,12 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; - if (write(pPeer->syncFd, (char *) &firstPkt, sizeof(firstPkt)) < 0) { + if (write(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) { sError("%s, failed to send syncCmd", pPeer->id); return -1; } - pPeer->sversion = 0; + pPeer->sversion = 0; pPeer->sstatus = TAOS_SYNC_STATUS_FILE; sDebug("%s, start to retrieve file", pPeer->id); if (syncRetrieveFile(pPeer) < 0) { @@ -447,8 +452,7 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { } // if no files are synced, there must be wal to sync, sversion must be larger than one - if (pPeer->sversion == 0) - pPeer->sversion = 1; + if (pPeer->sversion == 0) pPeer->sversion = 1; sDebug("%s, start to retrieve wal", pPeer->id); if (syncRetrieveWal(pPeer) < 0) { @@ -460,8 +464,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { } void *syncRetrieveData(void *param) { - SSyncPeer * pPeer = (SSyncPeer *)param; - SSyncNode *pNode = pPeer->pSyncNode; + SSyncPeer *pPeer = (SSyncPeer *)param; + SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); pPeer->fileChanged = 0; @@ -470,7 +474,7 @@ void *syncRetrieveData(void *param) { sError("%s, failed to open socket to sync", pPeer->id); } else { sInfo("%s, sync tcp is setup", pPeer->id); - + if (syncRetrieveDataStepByStep(pPeer) == 0) { sDebug("%s, sync retrieve process is successful", pPeer->id); } else { @@ -482,12 +486,11 @@ void *syncRetrieveData(void *param) { if (pPeer->fileChanged) { // if file is changed 3 times continuously, start flow control pPeer->numOfRetrieves++; - if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) + if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2)); } else { pPeer->numOfRetrieves = 0; - if (pNode->notifyFlowCtrl) - (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); } pPeer->fileChanged = 0; diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index 2f064ceb36..eda822b1ec 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -45,8 +45,8 @@ typedef struct { static void *taosAcceptPeerTcpConnection(void *argv); static void *taosProcessTcpData(void *param); +static void taosStopPoolThread(SThreadObj *pThread); static SThreadObj *taosGetTcpThread(SPoolObj *pPool); -static void taosStopPoolThread(SThreadObj* pThread); void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { pthread_attr_t thattr; @@ -58,8 +58,8 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { } pPool->info = *pInfo; - - pPool->pThread = (SThreadObj **) calloc(sizeof(SThreadObj *), pInfo->numOfThreads); + + pPool->pThread = (SThreadObj **)calloc(sizeof(SThreadObj *), pInfo->numOfThreads); if (pPool->pThread == NULL) { uError("TCP server, no enough memory"); free(pPool); @@ -68,17 +68,19 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port); if (pPool->acceptFd < 0) { - free(pPool->pThread); free(pPool); + free(pPool->pThread); + free(pPool); uError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno)); return NULL; } pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&(pPool->thread), &thattr, (void *) taosAcceptPeerTcpConnection, pPool) != 0) { + if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) { uError("TCP server, failed to create accept thread, reason:%s", strerror(errno)); close(pPool->acceptFd); - free(pPool->pThread); free(pPool); + free(pPool->pThread); + free(pPool); return NULL; } @@ -89,29 +91,30 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { } void taosCloseTcpThreadPool(void *param) { - SPoolObj *pPool = (SPoolObj *)param; - SThreadObj *pThread; + SPoolObj * pPool = (SPoolObj *)param; + SThreadObj *pThread; - shutdown(pPool->acceptFd, SHUT_RD); + shutdown(pPool->acceptFd, SHUT_RD); pthread_join(pPool->thread, NULL); for (int i = 0; i < pPool->info.numOfThreads; ++i) { pThread = pPool->pThread[i]; - if (pThread) taosStopPoolThread(pThread); + if (pThread) taosStopPoolThread(pThread); } + uDebug("%p TCP pool is closed", pPool); + taosTFree(pPool->pThread); free(pPool); - uDebug("%p TCP pool is closed", pPool); } void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { struct epoll_event event; SPoolObj *pPool = (SPoolObj *)param; - SConnObj *pConn = (SConnObj *) calloc(sizeof(SConnObj), 1); + SConnObj *pConn = (SConnObj *)calloc(sizeof(SConnObj), 1); if (pConn == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } @@ -131,7 +134,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { uError("failed to add fd:%d(%s)", connFd, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); free(pConn); pConn = NULL; } else { @@ -143,8 +146,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { } void taosFreeTcpConn(void *param) { - SConnObj * pConn = (SConnObj *)param; - SThreadObj *pThread = pConn->pThread; + SConnObj * pConn = (SConnObj *)param; + SThreadObj *pThread = pConn->pThread; uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); pConn->closedByApp = 1; @@ -153,9 +156,9 @@ void taosFreeTcpConn(void *param) { static void taosProcessBrokenLink(SConnObj *pConn) { SThreadObj *pThread = pConn->pThread; - SPoolObj *pPool = pThread->pPool; - SPoolInfo *pInfo = &pPool->info; - + SPoolObj * pPool = pThread->pPool; + SPoolInfo * pInfo = &pPool->info; + if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR); (*pInfo->processBrokenLink)(pConn->ahandle); @@ -169,24 +172,24 @@ static void taosProcessBrokenLink(SConnObj *pConn) { #define maxEvents 10 static void *taosProcessTcpData(void *param) { - SThreadObj *pThread = (SThreadObj *) param; - SPoolObj *pPool = pThread->pPool; - SPoolInfo *pInfo = &pPool->info; - SConnObj *pConn = NULL; + SThreadObj *pThread = (SThreadObj *)param; + SPoolObj * pPool = pThread->pPool; + SPoolInfo * pInfo = &pPool->info; + SConnObj * pConn = NULL; struct epoll_event events[maxEvents]; void *buffer = malloc(pInfo->bufferSize); taosBlockSIGPIPE(); while (1) { - if (pThread->stop) break; + if (pThread->stop) break; int fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); if (pThread->stop) { uDebug("%p TCP epoll thread is exiting...", pThread); break; } - if (fdNum < 0) { + if (fdNum < 0) { uError("epoll_wait failed (%s)", strerror(errno)); continue; } @@ -215,27 +218,28 @@ static void *taosProcessTcpData(void *param) { taosFreeTcpConn(pConn); continue; } - } + } } } + uDebug("%p TCP epoll thread exits", pThread); + close(pThread->pollFd); free(pThread); free(buffer); - uDebug("%p TCP epoll thread exits", pThread); - return NULL; + return NULL; } static void *taosAcceptPeerTcpConnection(void *argv) { - SPoolObj *pPool = (SPoolObj *)argv; - SPoolInfo *pInfo = &pPool->info; + SPoolObj * pPool = (SPoolObj *)argv; + SPoolInfo *pInfo = &pPool->info; taosBlockSIGPIPE(); while (1) { struct sockaddr_in clientAddr; socklen_t addrlen = sizeof(clientAddr); - int connFd = accept(pPool->acceptFd, (struct sockaddr *) &clientAddr, &addrlen); + int connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); if (connFd < 0) { if (errno == EINVAL) { uDebug("%p TCP server accept is exiting...", pPool); @@ -246,7 +250,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) { } } - //uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port); + // uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port); taosKeepTcpAlive(connFd); (*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr); } @@ -260,7 +264,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { if (pThread) return pThread; - pThread = (SThreadObj *) calloc(1, sizeof(SThreadObj)); + pThread = (SThreadObj *)calloc(1, sizeof(SThreadObj)); if (pThread == NULL) return NULL; pThread->pPool = pPool; @@ -273,7 +277,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - int ret = pthread_create(&(pThread->thread), &thattr, (void *) taosProcessTcpData, pThread); + int ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread); pthread_attr_destroy(&thattr); if (ret != 0) { @@ -290,20 +294,20 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { return pThread; } -static void taosStopPoolThread(SThreadObj* pThread) { +static void taosStopPoolThread(SThreadObj *pThread) { pThread->stop = true; - + if (pThread->thread == pthread_self()) { pthread_detach(pthread_self()); return; } - // save thread ID into a local variable, since pThread is freed when the thread exits + // save thread ID into a local variable, since pThread is freed when the thread exits pthread_t thread = pThread->thread; // signal the thread to stop, try graceful method first, // and use pthread_cancel when failed - struct epoll_event event = { .events = EPOLLIN }; + struct epoll_event event = {.events = EPOLLIN}; eventfd_t fd = eventfd(1, 0); if (fd == -1) { // failed to create eventfd, call pthread_cancel instead, which may result in data corruption @@ -319,4 +323,3 @@ static void taosStopPoolThread(SThreadObj* pThread) { pthread_join(thread, NULL); taosClose(fd); } - diff --git a/src/sync/test/syncClient.c b/src/sync/test/syncClient.c index 16053d1088..23264dc8a0 100644 --- a/src/sync/test/syncClient.c +++ b/src/sync/test/syncClient.c @@ -25,31 +25,32 @@ typedef struct { int num; int numOfReqs; int msgSize; - tsem_t rspSem; - tsem_t *pOverSem; + tsem_t rspSem; + tsem_t * pOverSem; pthread_t thread; - void *pRpc; + void * pRpc; } SInfo; void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; - uDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); + uDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + pMsg->code); if (pEpSet) pInfo->epSet = *pEpSet; rpcFreeCont(pMsg->pCont); - tsem_post(&pInfo->rspSem); + tsem_post(&pInfo->rspSem); } int tcount = 0; void *sendRequest(void *param) { - SInfo *pInfo = (SInfo *)param; - SRpcMsg rpcMsg = {0}; - + SInfo * pInfo = (SInfo *)param; + SRpcMsg rpcMsg = {0}; + uDebug("thread:%d, start to send request", pInfo->index); - while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { + while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { pInfo->num++; rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.contLen = pInfo->msgSize; @@ -57,8 +58,9 @@ void *sendRequest(void *param) { rpcMsg.msgType = 1; uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); - if ( pInfo->num % 20000 == 0 ) + if (pInfo->num % 20000 == 0) { uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + } tsem_wait(&pInfo->rspSem); } @@ -72,12 +74,12 @@ int main(int argc, char *argv[]) { SRpcInit rpcInit; SRpcEpSet epSet; char secret[TSDB_KEY_LEN] = "mypassword"; - int msgSize = 128; - int numOfReqs = 0; - int appThreads = 1; - char serverIp[40] = "127.0.0.1"; - struct timeval systemTime; - int64_t startTime, endTime; + int msgSize = 128; + int numOfReqs = 0; + int appThreads = 1; + char serverIp[40] = "127.0.0.1"; + struct timeval systemTime; + int64_t startTime, endTime; pthread_attr_t thattr; // server info @@ -102,30 +104,30 @@ int main(int argc, char *argv[]) { rpcInit.spi = 1; rpcInit.connType = TAOS_CONN_CLIENT; - for (int i=1; iindex = i; pInfo->epSet = epSet; pInfo->numOfReqs = numOfReqs; @@ -177,18 +179,16 @@ int main(int argc, char *argv[]) { do { usleep(1); - } while ( tcount < appThreads); + } while (tcount < appThreads); gettimeofday(&systemTime, NULL); - endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; - float usedTime = (endTime - startTime)/1000.0; // mseconds + endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + float usedTime = (endTime - startTime) / 1000.0; // mseconds - uInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); - uInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); + uInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); + uInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize); taosCloseLog(); return 0; } - - diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index b802905038..8e769a461e 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -24,28 +24,27 @@ #include "twal.h" #include "tsync.h" -int msgSize = 128; -int commit = 0; -int dataFd = -1; -void *qhandle = NULL; -int walNum = 0; +int msgSize = 128; +int commit = 0; +int dataFd = -1; +void * qhandle = NULL; +int walNum = 0; uint64_t tversion = 0; -void *syncHandle; -int role; -int nodeId; -char path[256]; -int numOfWrites ; +void * syncHandle; +int role; +int nodeId; +char path[256]; +int numOfWrites; SSyncInfo syncInfo; SSyncCfg *pCfg; -int writeIntoWal(SWalHead *pHead) -{ +int writeIntoWal(SWalHead *pHead) { if (dataFd < 0) { - char walName[280]; + char walName[280]; snprintf(walName, sizeof(walName), "%s/wal/wal.%d", path, walNum); - remove(walName); - dataFd = open(walName, O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO); - if (dataFd < 0) { + (void)remove(walName); + dataFd = open(walName, O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO); + if (dataFd < 0) { uInfo("failed to open wal file:%s(%s)", walName, strerror(errno)); return -1; } else { @@ -67,54 +66,52 @@ int writeIntoWal(SWalHead *pHead) dataFd = -1; numOfWrites = 0; } - + return 0; } -void confirmForward(void *ahandle, void *mhandle, int32_t code) -{ - SRpcMsg *pMsg = (SRpcMsg *)mhandle; +void confirmForward(void *ahandle, void *mhandle, int32_t code) { + SRpcMsg * pMsg = (SRpcMsg *)mhandle; SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); uDebug("ver:%" PRIu64 ", confirm is received", pHead->version); rpcFreeCont(pMsg->pCont); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; rpcMsg.handle = pMsg->handle; rpcMsg.code = code; rpcSendResponse(&rpcMsg); - taosFreeQitem(mhandle); + taosFreeQitem(mhandle); } int processRpcMsg(void *item) { - SRpcMsg *pMsg = (SRpcMsg *)item; - SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); - int code = -1; + SRpcMsg * pMsg = (SRpcMsg *)item; + SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead)); + int code = -1; if (role != TAOS_SYNC_ROLE_MASTER) { uError("not master, write failed, role:%s", syncRole[role]); } else { - pHead->version = ++tversion; pHead->msgType = pMsg->msgType; pHead->len = pMsg->contLen; uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version); - writeIntoWal(pHead); + writeIntoWal(pHead); syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC); code = 0; } - if (pCfg->quorum <= 1) { - taosFreeQitem(item); + if (pCfg->quorum <= 1) { rpcFreeCont(pMsg->pCont); + taosFreeQitem(item); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; rpcMsg.handle = pMsg->handle; @@ -126,7 +123,6 @@ int processRpcMsg(void *item) { } int processFwdMsg(void *item) { - SWalHead *pHead = (SWalHead *)item; if (pHead->version <= tversion) { @@ -142,11 +138,11 @@ int processFwdMsg(void *item) { // write into cache -/* - if (pHead->handle) { - syncSendFwdAck(syncHandle, pHead->handle, 0); - } -*/ + /* + if (pHead->handle) { + syncSendFwdAck(syncHandle, pHead->handle, 0); + } + */ taosFreeQitem(item); @@ -154,7 +150,6 @@ int processFwdMsg(void *item) { } int processWalMsg(void *item) { - SWalHead *pHead = (SWalHead *)item; if (pHead->version <= tversion) { @@ -168,11 +163,11 @@ int processWalMsg(void *item) { // write into cache -/* - if (pHead->handle) { - syncSendFwdAck(syncHandle, pHead->handle, 0); - } -*/ + /* + if (pHead->handle) { + syncSendFwdAck(syncHandle, pHead->handle, 0); + } + */ taosFreeQitem(item); @@ -180,15 +175,15 @@ int processWalMsg(void *item) { } void *processWriteQueue(void *param) { - int type; - void *item; + int type; + void *item; while (1) { int ret = taosReadQitem(qhandle, &type, &item); if (ret <= 0) { usleep(1000); continue; - } + } if (type == TAOS_QTYPE_RPC) { processRpcMsg(item); @@ -196,8 +191,7 @@ void *processWriteQueue(void *param) { processWalMsg(item); } else if (type == TAOS_QTYPE_FWD) { processFwdMsg(item); - } - + } } return NULL; @@ -224,21 +218,19 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char } void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg *pTemp; pTemp = taosAllocateQitem(sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - + uDebug("request is received, type:%d, len:%d", pMsg->msgType, pMsg->contLen); - taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); + taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); } -uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) -{ - uint32_t magic; - struct stat fstat; - char aname[280]; +uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { + uint32_t magic; + struct stat fstat; + char aname[280]; if (*index == 2) { uInfo("wait for a while ....."); @@ -246,15 +238,15 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex } if (name[0] == 0) { - // find the file + // find the file snprintf(aname, sizeof(aname), "%s/data/data.%d", path, *index); - sprintf(name, "data/data.%d", *index); + sprintf(name, "data/data.%d", *index); } else { snprintf(aname, sizeof(aname), "%s/%s", path, name); } uInfo("get file info:%s", aname); - if ( stat(aname, &fstat) < 0 ) return 0; + if (stat(aname, &fstat) < 0) return 0; *size = fstat.st_size; magic = fstat.st_size; @@ -262,24 +254,22 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex return magic; } -int getWalInfo(void *ahandle, char *name, uint32_t *index) { - - struct stat fstat; - char aname[280]; +int getWalInfo(void *ahandle, char *name, uint32_t *index) { + struct stat fstat; + char aname[280]; name[0] = 0; - if (*index + 1> walNum) return 0; + if (*index + 1 > walNum) return 0; snprintf(aname, sizeof(aname), "%s/wal/wal.%d", path, *index); - sprintf(name, "wal/wal.%d", *index); + sprintf(name, "wal/wal.%d", *index); uInfo("get wal info:%s", aname); - if ( stat(aname, &fstat) < 0 ) return -1; + if (stat(aname, &fstat) < 0) return -1; - if (*index >= walNum-1) return 0; // no more + if (*index >= walNum - 1) return 0; // no more return 1; - } int writeToCache(void *ahandle, void *data, int type) { @@ -290,24 +280,19 @@ int writeToCache(void *ahandle, void *data, int type) { int msgSize = pHead->len + sizeof(SWalHead); void *pMsg = taosAllocateQitem(msgSize); memcpy(pMsg, pHead, msgSize); - taosWriteQitem(qhandle, type, pMsg); + taosWriteQitem(qhandle, type, pMsg); return 0; } -void confirmFwd(void *ahandle, int64_t version) { - - return; -} +void confirmFwd(void *ahandle, int64_t version) { return; } void notifyRole(void *ahandle, int8_t r) { role = r; printf("current role:%s\n", syncRole[role]); } - void initSync() { - pCfg->replica = 1; pCfg->quorum = 1; syncInfo.vgId = 1; @@ -339,20 +324,18 @@ void initSync() { taosGetFqdn(pCfg->nodeInfo[4].nodeFqdn); } -void doSync() -{ - for (int i=0; i<5; ++i) { - if (tsSyncPort == pCfg->nodeInfo[i].nodePort) - nodeId = pCfg->nodeInfo[i].nodeId; +void doSync() { + for (int i = 0; i < 5; ++i) { + if (tsSyncPort == pCfg->nodeInfo[i].nodePort) nodeId = pCfg->nodeInfo[i].nodeId; } snprintf(path, sizeof(path), "/root/test/d%d", nodeId); - strcpy(syncInfo.path, path); + tstrncpy(syncInfo.path, path, sizeof(syncInfo.path)); - if ( syncHandle == NULL) { - syncHandle = syncStart(&syncInfo); + if (syncHandle == NULL) { + syncHandle = syncStart(&syncInfo); } else { - if (syncReconfig(syncHandle, pCfg) < 0) syncHandle = NULL; + if (syncReconfig(syncHandle, pCfg) < 0) syncHandle = NULL; } uInfo("nodeId:%d path:%s syncPort:%d", nodeId, path, tsSyncPort); @@ -361,39 +344,39 @@ void doSync() int main(int argc, char *argv[]) { SRpcInit rpcInit; char dataName[20] = "server.data"; - pCfg = &syncInfo.syncCfg; + pCfg = &syncInfo.syncCfg; initSync(); memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 7000; - rpcInit.label = "SER"; + rpcInit.localPort = 7000; + rpcInit.label = "SER"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processRequestMsg; - rpcInit.sessions = 1000; - rpcInit.idleTime = tsShellActivityTimer*1500; - rpcInit.afp = retrieveAuthInfo; + rpcInit.cfp = processRequestMsg; + rpcInit.sessions = 1000; + rpcInit.idleTime = tsShellActivityTimer * 1500; + rpcInit.afp = retrieveAuthInfo; - for (int i=1; ireplica = atoi(argv[++i]); - } else if (strcmp(argv[i], "-q")==0 && i < argc-1) { + } else if (strcmp(argv[i], "-q") == 0 && i < argc - 1) { pCfg->quorum = atoi(argv[++i]); - } else if (strcmp(argv[i], "-d")==0 && i < argc-1) { + } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) { rpcDebugFlag = atoi(argv[++i]); } else { printf("\nusage: %s [options] \n", argv[0]); @@ -411,10 +394,10 @@ int main(int argc, char *argv[]) { exit(0); } } - + uDebugFlag = rpcDebugFlag; - dDebugFlag = rpcDebugFlag; - //tmrDebugFlag = rpcDebugFlag; + dDebugFlag = rpcDebugFlag; + // tmrDebugFlag = rpcDebugFlag; tsAsyncLog = 0; taosInitLog("server.log", 1000000, 10); @@ -443,35 +426,39 @@ int main(int argc, char *argv[]) { SNodesRole nroles; while (1) { - char c = getchar(); + int c = getchar(); - switch(c) { + switch (c) { case '1': - pCfg->replica = 1; doSync(); - break; + pCfg->replica = 1; + doSync(); + break; case '2': - pCfg->replica = 2; doSync(); + pCfg->replica = 2; + doSync(); break; case '3': - pCfg->replica = 3; doSync(); + pCfg->replica = 3; + doSync(); break; case '4': - pCfg->replica = 4; doSync(); + pCfg->replica = 4; + doSync(); break; case '5': - pCfg->replica = 5; doSync(); + pCfg->replica = 5; + doSync(); break; case 's': syncGetNodesRole(syncHandle, &nroles); - for (int i=0; ireplica; ++i) + for (int i = 0; i < pCfg->replica; ++i) printf("=== nodeId:%d role:%s\n", nroles.nodeId[i], syncRole[nroles.role[i]]); break; default: break; } - if (c=='q') break; - + if (c == 'q') break; } syncStop(syncHandle); @@ -483,5 +470,3 @@ int main(int argc, char *argv[]) { return 0; } - - -- GitLab