提交 ece8c475 编写于 作者: S Shengliang Guan

TD-1926

上级 21a0c741
......@@ -261,9 +261,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_VND_COMMITING, 0, 0x0904, "Vnode is commiting")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_FILE_CHNAGED, 0, 0x0905, "Vnode file is changed")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_APP_ERROR, 0, 0x1000, "Unexpected generic error in sync")
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
......
......@@ -86,7 +86,7 @@ typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
// get file version
typedef int32_t (*FGetFileVersion)(int32_t vgId, uint64_t *fver);
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
typedef struct {
int32_t vgId; // vgroup ID
......@@ -100,7 +100,7 @@ typedef struct {
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
FGetFileVersion getFileVersion;
FGetVersion getVersion;
} SSyncInfo;
typedef void *tsync_h;
......
......@@ -251,6 +251,16 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) {
sdbUpdateMnodeRoles();
}
static int32_t sdbNotifyFileSynced(int32_t vgId, uint64_t fversion) { return 0; }
static void sdbNotifyFlowCtrl(int32_t vgId, int32_t level) {}
static int32_t sdbGetSyncVersion(int32_t vgId, uint64_t *fver, uint64_t *vver) {
*fver = 0;
*vver = 0;
return 0;
}
// failed to forward, need revert insert
static void sdbHandleFailedConfirm(SSdbRow *pRow) {
SWalHead *pHead = pRow->pHead;
......@@ -372,11 +382,14 @@ void sdbUpdateSync(void *pMnodes) {
syncInfo.version = sdbGetVersion();
syncInfo.syncCfg = syncCfg;
sprintf(syncInfo.path, "%s", tsMnodeDir);
syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.getFileInfo = sdbGetFileInfo;
syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.writeToCache = sdbWriteFwdToQueue;
syncInfo.confirmForward = sdbConfirmForward;
syncInfo.notifyRole = sdbNotifyRole;
syncInfo.notifyFileSynced = sdbNotifyFileSynced;
syncInfo.notifyFlowCtrl = sdbNotifyFlowCtrl;
syncInfo.getVersion = sdbGetSyncVersion;
tsSdbMgmt.cfg = syncCfg;
if (tsSdbMgmt.sync) {
......
......@@ -139,16 +139,14 @@ typedef struct SsyncPeer {
char id[TSDB_EP_LEN + 32]; // peer vgId + end point
uint64_t version;
uint64_t sversion; // track the peer version in retrieve process
uint64_t lastVer; // track the file version while retrieve
uint64_t lastFileVer; // track the file version while retrieve
uint64_t lastWalVer; // track the wal version while retrieve
int32_t syncFd;
int32_t peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried
int32_t fileChanged; // a flag to indicate file is changed during retrieving process
void * timer;
void * pConn;
int32_t notifyFd;
int32_t watchNum;
int32_t *watchFd;
int32_t refCount; // reference count
struct SSyncNode *pSyncNode;
} SSyncPeer;
......@@ -173,7 +171,7 @@ typedef struct SSyncNode {
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
FGetFileVersion getFileVersion;
FGetVersion getVersion;
pthread_mutex_t mutex;
} SSyncNode;
......
......@@ -196,7 +196,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
pNode->confirmForward = pInfo->confirmForward;
pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl;
pNode->notifyFileSynced = pInfo->notifyFileSynced;
pNode->getFileVersion = pInfo->getFileVersion;
pNode->getVersion = pInfo->getVersion;
pNode->selfIndex = -1;
pNode->vgId = pInfo->vgId;
......@@ -498,7 +498,6 @@ int32_t syncDecPeerRef(SSyncPeer *pPeer) {
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
sDebug("%s, resource is freed", pPeer->id);
tfree(pPeer->watchFd);
tfree(pPeer);
return 0;
}
......
......@@ -26,35 +26,73 @@
#include "tsync.h"
#include "syncInt.h"
static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
if (pNode->getFileVersion == NULL) return TSDB_CODE_SUCCESS;
static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
return -1;
}
pPeer->lastWalVer = wver;
return code;
}
static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
return true;
}
if (wver != pPeer->lastWalVer) {
sDebug("%s, wal is modified while retrieve, wver:%" PRIu64 ", last:%" PRIu64, pPeer->id, wver, pPeer->lastWalVer);
return true;
}
return false;
}
uint64_t fver = 0;
int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver);
static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
if (code != 0) {
sInfo("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastVer);
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
return -1;
}
pPeer->lastFileVer = fver;
return code;
}
static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
pPeer->fileChanged = 1;
return TSDB_CODE_SYN_VND_COMMITING;
return true;
}
if (fver != pPeer->lastVer) {
sInfo("%s, files are modified while retrieve, fver:%" PRIu64 ", last fver:%" PRIu64, pPeer->id, fver, pPeer->lastVer);
if (fver != pPeer->lastFileVer) {
sDebug("%s, files are modified while retrieve, fver:%" PRIu64 ", last:%" PRIu64, pPeer->id, fver, pPeer->lastFileVer);
pPeer->fileChanged = 1;
return TSDB_CODE_SYN_FILE_CHNAGED;
return true;
}
pPeer->fileChanged = 0;
return TSDB_CODE_SUCCESS;
return false;
}
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
SFileAck fileAck = {0};
int32_t code = TSDB_CODE_SYN_APP_ERROR;
int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer);
if (syncGetFileVersion(pNode, pPeer) < 0) return -1;
while (1) {
// retrieve file info
......@@ -120,8 +158,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.index++;
// check if processed files are modified
code = syncAreFilesModified(pNode, pPeer);
if (code != TSDB_CODE_SUCCESS) break;
if (syncAreFilesModified(pNode, pPeer)) {
code = -1;
break;
}
}
if (code != TSDB_CODE_SUCCESS) {
......@@ -131,117 +171,90 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
return code;
}
/* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead, uint32_t *pEvent) {
int32_t ret;
// if only a partial record is read out, upper layer will reload the file to get a complete record
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
int32_t ret = read(sfd, pHead, sizeof(SWalHead));
if (ret < 0) {
sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret);
return -1;
}
ret = read(sfd, pHead, sizeof(SWalHead));
if (ret < 0) return -1;
if (ret == 0) return 0;
if (ret == 0) {
sDebug("sfd:%d, read to end of the wal, ret:%d", sfd, ret);
return 0;
}
if (ret != sizeof(SWalHead)) {
// file is not at end yet, it shall be reloaded
*pEvent = *pEvent | IN_MODIFY;
sDebug("sfd:%d, a partial wal head is read out, ret:%d", sfd, ret);
return 0;
}
assert(pHead->len <= TSDB_MAX_WAL_SIZE);
ret = read(sfd, pHead->cont, pHead->len);
if (ret < 0) return -1;
if (ret < 0) {
sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret);
return -1;
}
if (ret != pHead->len) {
// file is not at end yet, it shall be reloaded
*pEvent = *pEvent | IN_MODIFY;
sDebug("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret);
return 0;
}
return sizeof(SWalHead) + pHead->len;
}
static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
pPeer->watchNum = 0;
taosClose(pPeer->notifyFd);
pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) {
sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno));
return -1;
}
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles);
if (pPeer->watchFd == NULL) {
sError("%s, failed to allocate watchFd", pPeer->id);
return -1;
}
memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles);
int32_t *wd = pPeer->watchFd;
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE);
if (*wd == -1) {
sError("%s, failed to watch last wal since %s", pPeer->id, strerror(errno));
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) {
int32_t sfd = open(name, O_RDONLY);
if (sfd < 0) {
sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno));
return -1;
}
return 0;
}
static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
char buf[2048];
int32_t len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno));
int32_t code = taosLSeek(sfd, offset, SEEK_SET);
if (code < 0) {
sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno));
close(sfd);
return -1;
}
if (len == 0) return 0;
struct inotify_event *event;
for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
event = (struct inotify_event *)ptr;
if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY;
if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE;
}
if (pEvent != 0) sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent);
sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion);
return 0;
}
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
SWalHead *pHead = malloc(SYNC_MAX_SIZE);
int32_t code = -1;
int32_t bytes = 0;
int32_t sfd;
sfd = open(name, O_RDONLY);
if (sfd < 0) {
free(pHead);
return -1;
while (1) {
code = syncReadOneWalRecord(sfd, pHead);
if (code < 0) {
sError("%s, failed to read one record from wal:%s", pPeer->id, name);
break;
}
(void)lseek(sfd, offset, SEEK_SET);
sDebug("%s, retrieve last wal, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, offset, fversion);
while (1) {
int32_t wsize = syncReadOneWalRecord(sfd, pHead, pEvent);
if (wsize < 0) break;
if (wsize == 0) {
code = 0;
if (code == 0) {
code = bytes;
sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes);
break;
}
sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
int32_t wsize = code;
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
if (ret != wsize) break;
pPeer->sversion = pHead->version;
if (ret != wsize) {
code = -1;
sError("%s, failed to forward wal since %s, hver:%" PRIu64, pPeer->id, strerror(errno), pHead->version);
break;
}
pPeer->sversion = pHead->version;
bytes += wsize;
if (pHead->version >= fversion && fversion > 0) {
code = 0;
bytes = 0;
sDebug("%s, retrieve wal finished, fver:%" PRIu64, pPeer->id, fversion);
break;
}
}
......@@ -249,92 +262,61 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
free(pHead);
close(sfd);
if (code == 0) return bytes;
return -1;
return code;
}
static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
SSyncNode *pNode = pPeer->pSyncNode;
int32_t code = -1;
char fname[TSDB_FILENAME_LEN * 2] = {0}; // full path to wal file
if (syncAreFilesModified(pNode, pPeer) != 0) return -1;
while (1) {
int32_t once = 0; // last WAL has once ever been processed
int64_t offset = 0;
uint64_t fversion = 0;
uint32_t event = 0;
char fname[TSDB_FILENAME_LEN * 2] = {0}; // full path to wal file
// get full path to wal file
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
// monitor last wal
if (syncMonitorLastWal(pPeer, fname) < 0) break;
while (1) {
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event);
if (bytes < 0) break;
if (syncAreFilesModified(pNode, pPeer)) return -1;
if (syncGetWalVersion(pNode, pPeer) < 0) return -1;
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset);
if (bytes < 0) {
sDebug("%s, failed to retrieve last wal", pPeer->id);
return bytes;
}
// check file changes
if (syncCheckLastWalChanges(pPeer, &event) < 0) break;
bool walModified = syncIsWalModified(pNode, pPeer);
// if file is not updated or updated once, set the fversion and sstatus
if (((event & IN_MODIFY) == 0) || once) {
if (!walModified || once) {
if (fversion == 0) {
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
sDebug("%s, fversion is 0 then set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
fversion = nodeVersion; // must read data to fversion
sDebug("%s, fversion is 0 and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
}
}
// if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) {
code = 0;
sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes);
break;
return 0;
}
// if all data are read out, and no update
if ((bytes == 0) && ((event & IN_MODIFY) == 0)) {
// wal file is closed, break
if (event & IN_CLOSE_WRITE) {
code = 0;
sDebug("%s, current wal is closed", pPeer->id);
break;
}
if (bytes == 0 && !walModified) {
// wal not closed, it means some data not flushed to disk, wait for a while
usleep(10000);
}
// if bytes>0, file is updated, or fversion is not reached but file still open, read again
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again
once = 1;
offset += bytes;
sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes);
event = event & (~IN_MODIFY); // clear IN_MODIFY flag
}
if (code < 0) break;
if (pPeer->sversion >= fversion && fversion > 0) break;
index++;
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
if (code < 0) break;
if (wname[0] == 0) {
code = 0;
break;
}
// current last wal is closed, there is a new one
sDebug("%s, last wal is closed, try new one", pPeer->id);
sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64, pPeer->id, bytes, offset);
}
taosClose(pPeer->notifyFd);
return code;
return -1;
}
static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
......@@ -377,7 +359,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
close(sfd);
if (code < 0) break;
if (syncAreFilesModified(pNode, pPeer) != 0) break;
if (syncAreFilesModified(pNode, pPeer)) break;
}
if (code == 0) {
......@@ -474,7 +456,6 @@ void *syncRetrieveData(void *param) {
}
pPeer->fileChanged = 0;
taosClose(pPeer->notifyFd);
taosClose(pPeer->syncFd);
syncDecPeerRef(pPeer);
......
......@@ -38,7 +38,7 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level);
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver);
static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
#ifndef _SYNC
int64_t syncStart(const SSyncInfo *info) { return NULL; }
......@@ -353,7 +353,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
syncInfo.getFileVersion = vnodeGetFileVersion;
syncInfo.getVersion = vnodeGetVersion;
pVnode->sync = syncStart(&syncInfo);
#ifndef _SYNC
......@@ -771,7 +771,7 @@ static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void
return code;
}
static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) {
static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId);
......@@ -780,10 +780,11 @@ static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) {
int32_t code = 0;
if (pVnode->isCommiting) {
vDebug("vgId:%d, vnode is commiting while get file version", vgId);
vDebug("vgId:%d, vnode is commiting while get version", vgId);
code = -1;
} else {
*fver = pVnode->fversion;
*wver = pVnode->version;
}
vnodeRelease(pVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册