diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 59f25a0765b1ef37c792333f658bbc47817a1669..1bdab4d5038078132d749d367cb6cdd1016599e0 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -264,9 +264,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired") -TAOS_DEFINE_ERROR(TSDB_CODE_SYN_VND_COMMITING, 0, 0x0904, "Vnode is commiting") -TAOS_DEFINE_ERROR(TSDB_CODE_SYN_FILE_CHNAGED, 0, 0x0905, "Vnode file is changed") -TAOS_DEFINE_ERROR(TSDB_CODE_SYN_APP_ERROR, 0, 0x1000, "Unexpected generic error in sync") // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 293e382519137f7e5e1d181ae262788a85a9e766..1303195ef149a98b6ddd7ffe55615d27eaeba04d 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -86,7 +86,7 @@ typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level); typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); // get file version -typedef int32_t (*FGetFileVersion)(int32_t vgId, uint64_t *fver); +typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver); typedef struct { int32_t vgId; // vgroup ID @@ -100,7 +100,7 @@ typedef struct { FNotifyRole notifyRole; FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; - FGetFileVersion getFileVersion; + FGetVersion getVersion; } SSyncInfo; typedef void *tsync_h; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 2e6ba8175d4d1d3f9780bd051e88f655da8f0f7d..96b840ab195621515c4c78e9c985e2eec6650cc4 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -251,6 +251,16 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) { sdbUpdateMnodeRoles(); } +static int32_t sdbNotifyFileSynced(int32_t vgId, uint64_t fversion) { return 0; } + +static void sdbNotifyFlowCtrl(int32_t vgId, int32_t level) {} + +static int32_t sdbGetSyncVersion(int32_t vgId, uint64_t *fver, uint64_t *vver) { + *fver = 0; + *vver = 0; + return 0; +} + // failed to forward, need revert insert static void sdbHandleFailedConfirm(SSdbRow *pRow) { SWalHead *pHead = pRow->pHead; @@ -372,11 +382,14 @@ void sdbUpdateSync(void *pMnodes) { syncInfo.version = sdbGetVersion(); syncInfo.syncCfg = syncCfg; sprintf(syncInfo.path, "%s", tsMnodeDir); - syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; + syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.writeToCache = sdbWriteFwdToQueue; syncInfo.confirmForward = sdbConfirmForward; syncInfo.notifyRole = sdbNotifyRole; + syncInfo.notifyFileSynced = sdbNotifyFileSynced; + syncInfo.notifyFlowCtrl = sdbNotifyFlowCtrl; + syncInfo.getVersion = sdbGetSyncVersion; tsSdbMgmt.cfg = syncCfg; if (tsSdbMgmt.sync) { diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 7dc04d70d0e8a0815a1a58051bf79890696c7fa8..6d0c52284f7f2429ccfa20389ccd66b2447610b9 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -139,16 +139,14 @@ typedef struct SsyncPeer { char id[TSDB_EP_LEN + 32]; // peer vgId + end point uint64_t version; uint64_t sversion; // track the peer version in retrieve process - uint64_t lastVer; // track the file version while retrieve + uint64_t lastFileVer; // track the file version while retrieve + uint64_t lastWalVer; // track the wal version while retrieve int32_t syncFd; int32_t peerFd; // forward FD int32_t numOfRetrieves; // number of retrieves tried int32_t fileChanged; // a flag to indicate file is changed during retrieving process void * timer; void * pConn; - int32_t notifyFd; - int32_t watchNum; - int32_t *watchFd; int32_t refCount; // reference count struct SSyncNode *pSyncNode; } SSyncPeer; @@ -173,7 +171,7 @@ typedef struct SSyncNode { FNotifyRole notifyRole; FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; - FGetFileVersion getFileVersion; + FGetVersion getVersion; pthread_mutex_t mutex; } SSyncNode; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index ce85c940ffeded796414386f85e6e9520d663310..adac532f2d4d9ad9f15459d8d406c196d0de0308 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -196,7 +196,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { pNode->confirmForward = pInfo->confirmForward; pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl; pNode->notifyFileSynced = pInfo->notifyFileSynced; - pNode->getFileVersion = pInfo->getFileVersion; + pNode->getVersion = pInfo->getVersion; pNode->selfIndex = -1; pNode->vgId = pInfo->vgId; @@ -498,7 +498,6 @@ int32_t syncDecPeerRef(SSyncPeer *pPeer) { taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); sDebug("%s, resource is freed", pPeer->id); - tfree(pPeer->watchFd); tfree(pPeer); return 0; } diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 305ec492b928a9a7d3e6cda92be29a53e0bd0b1d..36b197dd46484c7ce676391aea5dc4efaf00b9d8 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -26,39 +26,78 @@ #include "tsync.h" #include "syncInt.h" -static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { - if (pNode->getFileVersion == NULL) return TSDB_CODE_SUCCESS; +static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) { + uint64_t fver, wver; + int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + if (code != 0) { + sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer); + return -1; + } + + pPeer->lastWalVer = wver; + return code; +} + +static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) { + uint64_t fver, wver; + int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + if (code != 0) { + sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer); + return true; + } + + if (wver != pPeer->lastWalVer) { + sDebug("%s, wal is modified while retrieve, wver:%" PRIu64 ", last:%" PRIu64, pPeer->id, wver, pPeer->lastWalVer); + return true; + } + + return false; +} - uint64_t fver = 0; - int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver); +static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) { + uint64_t fver, wver; + int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); if (code != 0) { - sInfo("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastVer); + sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); + return -1; + } + + pPeer->lastFileVer = fver; + return code; +} + +static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { + uint64_t fver, wver; + int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + if (code != 0) { + sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); pPeer->fileChanged = 1; - return TSDB_CODE_SYN_VND_COMMITING; + return true; } - if (fver != pPeer->lastVer) { - sInfo("%s, files are modified while retrieve, fver:%" PRIu64 ", last fver:%" PRIu64, pPeer->id, fver, pPeer->lastVer); + if (fver != pPeer->lastFileVer) { + sDebug("%s, files are modified while retrieve, fver:%" PRIu64 ", last:%" PRIu64, pPeer->id, fver, pPeer->lastFileVer); pPeer->fileChanged = 1; - return TSDB_CODE_SYN_FILE_CHNAGED; + return true; } pPeer->fileChanged = 0; - return TSDB_CODE_SUCCESS; + return false; } static int32_t syncRetrieveFile(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo)); SFileAck fileAck = {0}; - int32_t code = TSDB_CODE_SYN_APP_ERROR; + int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; - if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer); + if (syncGetFileVersion(pNode, pPeer) < 0) return -1; while (1) { // retrieve file info fileInfo.name[0] = 0; + fileInfo.size = 0; fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); // fileInfo.size = htonl(size); @@ -67,14 +106,14 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // send the file info int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } // if no file anymore, break if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { - code = TSDB_CODE_SUCCESS; + code = 0; sDebug("%s, no more files to sync", pPeer->id); break; } @@ -82,7 +121,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // wait for the ack from peer ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -103,7 +142,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // send the file to peer int32_t sfd = open(name, O_RDONLY); if (sfd < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -111,7 +150,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); close(sfd); if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -120,128 +159,103 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { fileInfo.index++; // check if processed files are modified - code = syncAreFilesModified(pNode, pPeer); - if (code != TSDB_CODE_SUCCESS) break; + if (syncAreFilesModified(pNode, pPeer)) { + code = -1; + break; + } } if (code != TSDB_CODE_SUCCESS) { - sError("%s, failed to retrieve file since %s", pPeer->id, tstrerror(code)); + sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code); } return code; } -/* if only a partial record is read out, set the IN_MODIFY flag in event, - so upper layer will reload the file to get a complete record */ -static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead, uint32_t *pEvent) { - int32_t ret; +// if only a partial record is read out, upper layer will reload the file to get a complete record +static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { + int32_t ret = read(sfd, pHead, sizeof(SWalHead)); + if (ret < 0) { + sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret); + return -1; + } - ret = read(sfd, pHead, sizeof(SWalHead)); - if (ret < 0) return -1; - if (ret == 0) return 0; + if (ret == 0) { + sTrace("sfd:%d, read to the end of file, ret:%d", sfd, ret); + return 0; + } if (ret != sizeof(SWalHead)) { // file is not at end yet, it shall be reloaded - *pEvent = *pEvent | IN_MODIFY; + sDebug("sfd:%d, a partial wal head is read out, ret:%d", sfd, ret); return 0; } assert(pHead->len <= TSDB_MAX_WAL_SIZE); ret = read(sfd, pHead->cont, pHead->len); - if (ret < 0) return -1; + if (ret < 0) { + sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret); + return -1; + } if (ret != pHead->len) { // file is not at end yet, it shall be reloaded - *pEvent = *pEvent | IN_MODIFY; + sDebug("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret); return 0; } return sizeof(SWalHead) + pHead->len; } -static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) { - pPeer->watchNum = 0; - taosClose(pPeer->notifyFd); - pPeer->notifyFd = inotify_init1(IN_NONBLOCK); - if (pPeer->notifyFd < 0) { - sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno)); - return -1; - } - - if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles); - if (pPeer->watchFd == NULL) { - sError("%s, failed to allocate watchFd", pPeer->id); - return -1; - } - - memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles); - int32_t *wd = pPeer->watchFd; - - *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE); - if (*wd == -1) { - sError("%s, failed to watch last wal since %s", pPeer->id, strerror(errno)); +static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) { + int32_t sfd = open(name, O_RDONLY); + if (sfd < 0) { + sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno)); return -1; } - return 0; -} - -static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { - char buf[2048]; - int32_t len = read(pPeer->notifyFd, buf, sizeof(buf)); - if (len < 0 && errno != EAGAIN) { - sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno)); + int32_t code = taosLSeek(sfd, offset, SEEK_SET); + if (code < 0) { + sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno)); + close(sfd); return -1; } - if (len == 0) return 0; - - struct inotify_event *event; - for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) { - event = (struct inotify_event *)ptr; - if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY; - if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE; - } - - if (pEvent != 0) sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent); - - return 0; -} + sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion); -static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { SWalHead *pHead = malloc(SYNC_MAX_SIZE); - int32_t code = -1; int32_t bytes = 0; - int32_t sfd; - - sfd = open(name, O_RDONLY); - if (sfd < 0) { - free(pHead); - return -1; - } - - (void)lseek(sfd, offset, SEEK_SET); - sDebug("%s, retrieve last wal, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, offset, fversion); while (1) { - int32_t wsize = syncReadOneWalRecord(sfd, pHead, pEvent); - if (wsize < 0) break; - if (wsize == 0) { - code = 0; + code = syncReadOneWalRecord(sfd, pHead); + if (code < 0) { + sError("%s, failed to read one record from wal:%s", pPeer->id, name); + break; + } + + if (code == 0) { + code = bytes; + sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes); break; } sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); + + int32_t wsize = code; int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); - if (ret != wsize) break; - pPeer->sversion = pHead->version; + if (ret != wsize) { + code = -1; + sError("%s, failed to forward wal since %s, hver:%" PRIu64, pPeer->id, strerror(errno), pHead->version); + break; + } + pPeer->sversion = pHead->version; bytes += wsize; if (pHead->version >= fversion && fversion > 0) { code = 0; - bytes = 0; + sDebug("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion); break; } } @@ -249,92 +263,62 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi free(pHead); close(sfd); - if (code == 0) return bytes; - return -1; + return code; } static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { SSyncNode *pNode = pPeer->pSyncNode; - int32_t code = -1; + int32_t once = 0; // last WAL has once ever been processed + int64_t offset = 0; + uint64_t fversion = 0; char fname[TSDB_FILENAME_LEN * 2] = {0}; // full path to wal file - if (syncAreFilesModified(pNode, pPeer) != 0) return -1; + // get full path to wal file + snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); + sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname); while (1) { - int32_t once = 0; // last WAL has once ever been processed - int64_t offset = 0; - uint64_t fversion = 0; - uint32_t event = 0; - - // get full path to wal file - snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); - sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname); - - // monitor last wal - if (syncMonitorLastWal(pPeer, fname) < 0) break; - - while (1) { - int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event); - if (bytes < 0) break; + if (syncAreFilesModified(pNode, pPeer)) return -1; + if (syncGetWalVersion(pNode, pPeer) < 0) return -1; - // check file changes - if (syncCheckLastWalChanges(pPeer, &event) < 0) break; - - // if file is not updated or updated once, set the fversion and sstatus - if (((event & IN_MODIFY) == 0) || once) { - if (fversion == 0) { - pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt - sDebug("%s, fversion is 0 then set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - fversion = nodeVersion; // must read data to fversion - } - } + int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset); + if (bytes < 0) { + sDebug("%s, failed to retrieve last wal", pPeer->id); + return bytes; + } - // if all data up to fversion is read out, it is over - if (pPeer->sversion >= fversion && fversion > 0) { - code = 0; - sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes); - break; - } + // check file changes + bool walModified = syncIsWalModified(pNode, pPeer); - // if all data are read out, and no update - if ((bytes == 0) && ((event & IN_MODIFY) == 0)) { - // wal file is closed, break - if (event & IN_CLOSE_WRITE) { - code = 0; - sDebug("%s, current wal is closed", pPeer->id); - break; - } - - // wal not closed, it means some data not flushed to disk, wait for a while - usleep(10000); + // if file is not updated or updated once, set the fversion and sstatus + if (!walModified || once) { + if (fversion == 0) { + pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt + fversion = nodeVersion; // must read data to fversion + sDebug("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion); } - - // if bytes>0, file is updated, or fversion is not reached but file still open, read again - once = 1; - offset += bytes; - sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes); - event = event & (~IN_MODIFY); // clear IN_MODIFY flag } - if (code < 0) break; - if (pPeer->sversion >= fversion && fversion > 0) break; + // if all data up to fversion is read out, it is over + if (pPeer->sversion >= fversion && fversion > 0) { + sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes, + pPeer->sversion); + return 0; + } - index++; - wname[0] = 0; - code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); - if (code < 0) break; - if (wname[0] == 0) { - code = 0; - break; + // if all data are read out, and no update + if (bytes == 0 && !walModified) { + // wal not closed, it means some data not flushed to disk, wait for a while + usleep(10000); } - // current last wal is closed, there is a new one - sDebug("%s, last wal is closed, try new one", pPeer->id); + // if bytes > 0, file is updated, or fversion is not reached but file still open, read again + once = 1; + offset += bytes; + sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64, pPeer->id, bytes, offset); } - taosClose(pPeer->notifyFd); - - return code; + return -1; } static int32_t syncRetrieveWal(SSyncPeer *pPeer) { @@ -342,7 +326,6 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { char fname[TSDB_FILENAME_LEN * 3]; char wname[TSDB_FILENAME_LEN * 2]; int32_t size; - struct stat fstat; int32_t code = -1; int64_t index = 0; @@ -350,9 +333,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { // retrieve wal info wname[0] = 0; code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); - if (code < 0) break; // error + if (code < 0) { + sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code); + break; + } + if (wname[0] == 0) { // no wal file - sDebug("%s, no wal file", pPeer->id); + code = 0; + sDebug("%s, no wal file anymore", pPeer->id); break; } @@ -364,20 +352,35 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { // get the full path to wal file snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); - // send wal file, - // inotify is not required, old wal file won't be modified, even remove is ok - if (stat(fname, &fstat) < 0) break; - size = fstat.st_size; + // send wal file, old wal file won't be modified, even remove is ok + struct stat fstat; + if (stat(fname, &fstat) < 0) { + code = -1; + sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + break; + } + size = fstat.st_size; sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); + int32_t sfd = open(fname, O_RDONLY); - if (sfd < 0) break; + if (sfd < 0) { + code = -1; + sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + break; + } code = taosSendFile(pPeer->syncFd, sfd, NULL, size); close(sfd); - if (code < 0) break; + if (code < 0) { + sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + break; + } - if (syncAreFilesModified(pNode, pPeer) != 0) break; + if (syncAreFilesModified(pNode, pPeer)) { + code = -1; + break; + } } if (code == 0) { @@ -386,9 +389,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { SWalHead walHead; memset(&walHead, 0, sizeof(walHead)); - code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); + taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); } else { - sError("%s, failed to send wal since %s", pPeer->id, strerror(errno)); + sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code); } return code; @@ -428,7 +431,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { pPeer->sversion = 0; pPeer->sstatus = TAOS_SYNC_STATUS_FILE; sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - if (syncRetrieveFile(pPeer) < 0) { + if (syncRetrieveFile(pPeer) != 0) { sError("%s, failed to retrieve files", pPeer->id); return -1; } @@ -437,8 +440,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { if (pPeer->sversion == 0) pPeer->sversion = 1; sInfo("%s, start to retrieve wals", pPeer->id); - if (syncRetrieveWal(pPeer) < 0) { - sError("%s, failed to retrieve wals", pPeer->id); + int32_t code = syncRetrieveWal(pPeer); + if (code != 0) { + sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code); return -1; } @@ -474,7 +478,6 @@ void *syncRetrieveData(void *param) { } pPeer->fileChanged = 0; - taosClose(pPeer->notifyFd); taosClose(pPeer->syncFd); syncDecPeerRef(pPeer); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 0b29fcc9088195384d42318e0e71e69535571976..204dd2f3dd5e460b4110fcf3279a68ae13c85096 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -38,7 +38,7 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level); static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); -static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver); +static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); #ifndef _SYNC int64_t syncStart(const SSyncInfo *info) { return NULL; } @@ -353,7 +353,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFileSynced = vnodeNotifyFileSynced; - syncInfo.getFileVersion = vnodeGetFileVersion; + syncInfo.getVersion = vnodeGetVersion; pVnode->sync = syncStart(&syncInfo); #ifndef _SYNC @@ -771,7 +771,7 @@ static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void return code; } -static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) { +static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { vError("vgId:%d, vnode not found while write to cache", vgId); @@ -780,10 +780,11 @@ static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) { int32_t code = 0; if (pVnode->isCommiting) { - vDebug("vgId:%d, vnode is commiting while get file version", vgId); + vDebug("vgId:%d, vnode is commiting while get version", vgId); code = -1; } else { *fver = pVnode->fversion; + *wver = pVnode->version; } vnodeRelease(pVnode);