diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 398e1bf97c739406d1f8f568c998d891191b71bf..293e382519137f7e5e1d181ae262788a85a9e766 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -85,6 +85,9 @@ typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level); // when data file is synced successfully, notity app typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); +// get file version +typedef int32_t (*FGetFileVersion)(int32_t vgId, uint64_t *fver); + typedef struct { int32_t vgId; // vgroup ID uint64_t version; // initial version @@ -97,6 +100,7 @@ typedef struct { FNotifyRole notifyRole; FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; + FGetFileVersion getFileVersion; } SSyncInfo; typedef void *tsync_h; diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 05b7adc5f44b5853cd177443d062167c77681dce..7dc04d70d0e8a0815a1a58051bf79890696c7fa8 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -139,6 +139,7 @@ typedef struct SsyncPeer { char id[TSDB_EP_LEN + 32]; // peer vgId + end point uint64_t version; uint64_t sversion; // track the peer version in retrieve process + uint64_t lastVer; // track the file version while retrieve int32_t syncFd; int32_t peerFd; // forward FD int32_t numOfRetrieves; // number of retrieves tried @@ -172,6 +173,7 @@ typedef struct SSyncNode { FNotifyRole notifyRole; FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; + FGetFileVersion getFileVersion; pthread_mutex_t mutex; } SSyncNode; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index d2d6d2d7fae03e2341c58e73ff4984046655d682..af3241f2172dd12738618716011867e875c1ecfd 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -196,6 +196,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { pNode->confirmForward = pInfo->confirmForward; pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl; pNode->notifyFileSynced = pInfo->notifyFileSynced; + pNode->getFileVersion = pInfo->getFileVersion; pNode->selfIndex = -1; pNode->vgId = pInfo->vgId; diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index ed6b63c92deaa1f077311599ff3e88469bd77593..1f5b560c87f11a791e3b78c9c9c0e9e12a3e8c93 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -52,12 +52,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { SSyncNode *pNode = pPeer->pSyncNode; - SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info - SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info - SFileAck fileAck; + SFileInfo minfo = {0}; + SFileInfo sinfo = {0}; + SFileAck fileAck = {0}; int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; - uint32_t pindex = 0; // index in last restore + uint32_t pindex = 0; // index in last restore bool fileChanged = false; *fversion = 0; diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 060badba9dc188c2741d280d600c709ee2740099..e199ce98451a98345e3d5d79d2647e03d4f91b22 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -25,85 +25,32 @@ #include "tsync.h" #include "syncInt.h" -static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { - sDebug("%s, start to monitor:%s", pPeer->id, name); - - if (pPeer->notifyFd <= 0) { - pPeer->watchNum = 0; - pPeer->notifyFd = inotify_init1(IN_NONBLOCK); - if (pPeer->notifyFd < 0) { - sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno)); - return -1; - } - - if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles); - if (pPeer->watchFd == NULL) { - sError("%s, failed to allocate watchFd", pPeer->id); - return -1; - } - - memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles); - } - - int32_t *wd = pPeer->watchFd + pPeer->watchNum; - - if (*wd >= 0) { - if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) { - sError("%s, failed to remove wd:%d since %s", pPeer->id, *wd, strerror(errno)); - return -1; - } +static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { + if (pNode->getFileVersion == NULL) return 0; + + uint64_t fver = 0; + int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver); + if (code != 0) { + sInfo("%s, file are modified while retrieve, lastver:%" PRIu64, pPeer->id, pPeer->lastVer); + return code; } - *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_DELETE); - if (*wd == -1) { - sError("%s, failed to add %s since %s", pPeer->id, name, strerror(errno)); + if (fver != pPeer->lastVer) { + sInfo("%s, file are modified while retrieve, fver:%" PRIu64 " lastver:%" PRIu64, pPeer->id, fver, pPeer->lastVer); return -1; - } else { - sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum); } - pPeer->watchNum = (pPeer->watchNum + 1) % tsMaxWatchFiles; - return 0; } -static int32_t syncAreFilesModified(SSyncPeer *pPeer) { - if (pPeer->notifyFd <= 0) return 0; - - char buf[2048]; - int32_t len = read(pPeer->notifyFd, buf, sizeof(buf)); - if (len < 0 && errno != EAGAIN) { - sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno)); - return -1; - } - - int32_t code = 0; - if (len > 0) { - const struct inotify_event *event; - char *ptr; - for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) { - event = (const struct inotify_event *)ptr; - if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) { - sDebug("%s, processed file is changed", pPeer->id); - pPeer->fileChanged = 1; - code = 1; - break; - } - } - } - - return code; -} - static int32_t syncRetrieveFile(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; - SFileInfo fileInfo; - SFileAck fileAck; + SFileInfo fileInfo = {0}; + SFileAck fileAck = {0}; int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; - memset(&fileInfo, 0, sizeof(fileInfo)); - memset(&fileAck, 0, sizeof(fileAck)); + if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer); while (1) { // retrieve file info @@ -136,15 +83,6 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // set the peer sync version pPeer->sversion = fileInfo.fversion; - // get the full path to file - snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name); - - // add the file into watch list - if (syncAddIntoWatchList(pPeer, name) < 0) { - sError("%s, failed to watch file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); - break; - } - // if sync is not required, continue if (fileAck.sync == 0) { fileInfo.index++; @@ -152,6 +90,9 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { continue; } + // get the full path to file + snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name); + // send the file to peer int32_t sfd = open(name, O_RDONLY); if (sfd < 0) { @@ -170,10 +111,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { fileInfo.index++; // check if processed files are modified - if (syncAreFilesModified(pPeer) != 0) { - sInfo("%s, file:%s are modified while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); - break; - } + if (syncAreFilesModified(pNode, pPeer) != 0) break; } if (code < 0) { @@ -308,9 +246,9 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { SSyncNode *pNode = pPeer->pSyncNode; int32_t code = -1; - char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file + char fname[TSDB_FILENAME_LEN * 2] = {0}; // full path to wal file - if (syncAreFilesModified(pPeer) != 0) return -1; + if (syncAreFilesModified(pNode, pPeer) != 0) return -1; while (1) { int32_t once = 0; // last WAL has once ever been processed @@ -431,7 +369,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { index++; - if (syncAreFilesModified(pPeer) != 0) break; + if (syncAreFilesModified(pNode, pPeer) != 0) break; } if (code == 0) { diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 7fc9b100eff50bd6a68a1dd5f1f48acbb290c530..e468c2e83e1560b0711e42d827da9604473f47b0 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -44,6 +44,7 @@ typedef struct { int8_t role; int8_t accessState; int8_t isFull; + int8_t isCommiting; uint64_t version; // current version uint64_t fversion; // version on saved data file void *wqueue; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index cd6d2ea7c0a4842207140d2e4cab1d30722998ad..1b2f3ee93cb401a815b2eaba0ac29f5277b82fb3 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -38,6 +38,7 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level); static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); +static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver); #ifndef _SYNC int64_t syncStart(const SSyncInfo *info) { return NULL; } @@ -352,6 +353,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFileSynced = vnodeNotifyFileSynced; + syncInfo.getFileVersion = vnodeGetFileVersion; pVnode->sync = syncStart(&syncInfo); #ifndef _SYNC @@ -597,18 +599,19 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { vnodeRelease(pVnode); } -// TODO: this is a simple implement static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { SVnodeObj *pVnode = arg; if (eno != TSDB_CODE_SUCCESS) { vError("vgId:%d, failed to commit since %s, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, tstrerror(eno), pVnode->fversion, pVnode->version); + pVnode->isCommiting = 0; pVnode->isFull = 1; return 0; } if (status == TSDB_STATUS_COMMIT_START) { + pVnode->isCommiting = 1; pVnode->fversion = pVnode->version; vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); if (pVnode->status != TAOS_VN_STATUS_INIT) { @@ -619,6 +622,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { if (status == TSDB_STATUS_COMMIT_OVER) { vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); + pVnode->isCommiting = 0; pVnode->isFull = 0; if (pVnode->status != TAOS_VN_STATUS_INIT) { walRemoveOneOldFile(pVnode->wal); @@ -765,3 +769,21 @@ static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void vnodeRelease(pVnode); return code; } + +static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while write to cache", vgId); + return -1; + } + + int32_t code = 0; + if (pVnode->isCommiting) { + code = -1; + } else { + *fver = pVnode->fversion; + } + + vnodeRelease(pVnode); + return code; +} diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 57bd407cd1f47ab875c57326f361f80103ecc2b6..360a631e1df299db45ca2fea34741a29e0956c2e 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -288,7 +288,7 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { } else { code = vnodePerformFlowCtrl(pWrite); if (code == 0) { - vTrace("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId); + vDebug("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId); pWrite->processedCount = 0; taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); }