提交 1e94822a 编写于 作者: S Shengliang Guan

TD-1926

上级 810023ca
......@@ -85,6 +85,9 @@ typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
// when data file is synced successfully, notity app
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
// get file version
typedef int32_t (*FGetFileVersion)(int32_t vgId, uint64_t *fver);
typedef struct {
int32_t vgId; // vgroup ID
uint64_t version; // initial version
......@@ -97,6 +100,7 @@ typedef struct {
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
FGetFileVersion getFileVersion;
} SSyncInfo;
typedef void *tsync_h;
......
......@@ -139,6 +139,7 @@ typedef struct SsyncPeer {
char id[TSDB_EP_LEN + 32]; // peer vgId + end point
uint64_t version;
uint64_t sversion; // track the peer version in retrieve process
uint64_t lastVer; // track the file version while retrieve
int32_t syncFd;
int32_t peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried
......@@ -172,6 +173,7 @@ typedef struct SSyncNode {
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
FGetFileVersion getFileVersion;
pthread_mutex_t mutex;
} SSyncNode;
......
......@@ -196,6 +196,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
pNode->confirmForward = pInfo->confirmForward;
pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl;
pNode->notifyFileSynced = pInfo->notifyFileSynced;
pNode->getFileVersion = pInfo->getFileVersion;
pNode->selfIndex = -1;
pNode->vgId = pInfo->vgId;
......
......@@ -52,12 +52,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info
SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info
SFileAck fileAck;
SFileInfo minfo = {0};
SFileInfo sinfo = {0};
SFileAck fileAck = {0};
int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
uint32_t pindex = 0; // index in last restore
uint32_t pindex = 0; // index in last restore
bool fileChanged = false;
*fversion = 0;
......
......@@ -25,85 +25,32 @@
#include "tsync.h"
#include "syncInt.h"
static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
sDebug("%s, start to monitor:%s", pPeer->id, name);
if (pPeer->notifyFd <= 0) {
pPeer->watchNum = 0;
pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) {
sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno));
return -1;
}
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles);
if (pPeer->watchFd == NULL) {
sError("%s, failed to allocate watchFd", pPeer->id);
return -1;
}
memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles);
}
int32_t *wd = pPeer->watchFd + pPeer->watchNum;
if (*wd >= 0) {
if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) {
sError("%s, failed to remove wd:%d since %s", pPeer->id, *wd, strerror(errno));
return -1;
}
static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
if (pNode->getFileVersion == NULL) return 0;
uint64_t fver = 0;
int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver);
if (code != 0) {
sInfo("%s, file are modified while retrieve, lastver:%" PRIu64, pPeer->id, pPeer->lastVer);
return code;
}
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_DELETE);
if (*wd == -1) {
sError("%s, failed to add %s since %s", pPeer->id, name, strerror(errno));
if (fver != pPeer->lastVer) {
sInfo("%s, file are modified while retrieve, fver:%" PRIu64 " lastver:%" PRIu64, pPeer->id, fver, pPeer->lastVer);
return -1;
} else {
sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum);
}
pPeer->watchNum = (pPeer->watchNum + 1) % tsMaxWatchFiles;
return 0;
}
static int32_t syncAreFilesModified(SSyncPeer *pPeer) {
if (pPeer->notifyFd <= 0) return 0;
char buf[2048];
int32_t len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno));
return -1;
}
int32_t code = 0;
if (len > 0) {
const struct inotify_event *event;
char *ptr;
for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
event = (const struct inotify_event *)ptr;
if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) {
sDebug("%s, processed file is changed", pPeer->id);
pPeer->fileChanged = 1;
code = 1;
break;
}
}
}
return code;
}
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo;
SFileAck fileAck;
SFileInfo fileInfo = {0};
SFileAck fileAck = {0};
int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
memset(&fileInfo, 0, sizeof(fileInfo));
memset(&fileAck, 0, sizeof(fileAck));
if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer);
while (1) {
// retrieve file info
......@@ -136,15 +83,6 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// set the peer sync version
pPeer->sversion = fileInfo.fversion;
// get the full path to file
snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);
// add the file into watch list
if (syncAddIntoWatchList(pPeer, name) < 0) {
sError("%s, failed to watch file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
// if sync is not required, continue
if (fileAck.sync == 0) {
fileInfo.index++;
......@@ -152,6 +90,9 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
continue;
}
// get the full path to file
snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);
// send the file to peer
int32_t sfd = open(name, O_RDONLY);
if (sfd < 0) {
......@@ -170,10 +111,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.index++;
// check if processed files are modified
if (syncAreFilesModified(pPeer) != 0) {
sInfo("%s, file:%s are modified while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
if (syncAreFilesModified(pNode, pPeer) != 0) break;
}
if (code < 0) {
......@@ -308,9 +246,9 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
SSyncNode *pNode = pPeer->pSyncNode;
int32_t code = -1;
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
char fname[TSDB_FILENAME_LEN * 2] = {0}; // full path to wal file
if (syncAreFilesModified(pPeer) != 0) return -1;
if (syncAreFilesModified(pNode, pPeer) != 0) return -1;
while (1) {
int32_t once = 0; // last WAL has once ever been processed
......@@ -431,7 +369,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
index++;
if (syncAreFilesModified(pPeer) != 0) break;
if (syncAreFilesModified(pNode, pPeer) != 0) break;
}
if (code == 0) {
......
......@@ -44,6 +44,7 @@ typedef struct {
int8_t role;
int8_t accessState;
int8_t isFull;
int8_t isCommiting;
uint64_t version; // current version
uint64_t fversion; // version on saved data file
void *wqueue;
......
......@@ -38,6 +38,7 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level);
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver);
#ifndef _SYNC
int64_t syncStart(const SSyncInfo *info) { return NULL; }
......@@ -352,6 +353,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
syncInfo.getFileVersion = vnodeGetFileVersion;
pVnode->sync = syncStart(&syncInfo);
#ifndef _SYNC
......@@ -597,18 +599,19 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vnodeRelease(pVnode);
}
// TODO: this is a simple implement
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
SVnodeObj *pVnode = arg;
if (eno != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to commit since %s, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, tstrerror(eno),
pVnode->fversion, pVnode->version);
pVnode->isCommiting = 0;
pVnode->isFull = 1;
return 0;
}
if (status == TSDB_STATUS_COMMIT_START) {
pVnode->isCommiting = 1;
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
if (pVnode->status != TAOS_VN_STATUS_INIT) {
......@@ -619,6 +622,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
pVnode->isCommiting = 0;
pVnode->isFull = 0;
if (pVnode->status != TAOS_VN_STATUS_INIT) {
walRemoveOneOldFile(pVnode->wal);
......@@ -765,3 +769,21 @@ static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void
vnodeRelease(pVnode);
return code;
}
static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId);
return -1;
}
int32_t code = 0;
if (pVnode->isCommiting) {
code = -1;
} else {
*fver = pVnode->fversion;
}
vnodeRelease(pVnode);
return code;
}
......@@ -288,7 +288,7 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
} else {
code = vnodePerformFlowCtrl(pWrite);
if (code == 0) {
vTrace("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId);
vDebug("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId);
pWrite->processedCount = 0;
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册