提交 6151c818 编写于 作者: S Shengliang Guan


上级 46397968
......@@ -56,16 +56,6 @@ typedef struct {
int32_t role[TAOS_SYNC_MAX_REPLICA];
} SNodesRole;
if name is empty(name[0] is zero), get the file from index or after, but not larger than eindex. If a file
is found between index and eindex, index shall be updated, name shall be set, size shall be set to
file size, and file magic number shall be returned.
if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return
zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated.
typedef uint32_t (*FGetFileInfo)(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int32_t (*FGetWalInfo)(int32_t vgId, char *fileName, int64_t *fileId);
......@@ -83,24 +73,31 @@ typedef void (*FNotifyRole)(int32_t vgId, int8_t role);
typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
// when data file is synced successfully, notity app
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
typedef void (*FStartSyncFile)(int32_t vgId);
typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion);
// get file version
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
typedef int32_t (*FSendFile)(void *tsdb, int32_t socketFd);
typedef int32_t (*FRecvFile)(void *tsdb, int32_t socketFd);
typedef struct {
int32_t vgId; // vgroup ID
uint64_t version; // initial version
SSyncCfg syncCfg; // configuration from mgmt
char path[TSDB_FILENAME_LEN]; // path to the file
FGetFileInfo getFileInfo;
FGetWalInfo getWalInfo;
FWriteToCache writeToCache;
void * pTsdb;
FGetWalInfo getWalInfoFp;
FWriteToCache writeToCacheFp;
FConfirmForward confirmForward;
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
FGetVersion getVersion;
FNotifyRole notifyRoleFp;
FNotifyFlowCtrl notifyFlowCtrlFp;
FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp;
FSendFile sendFileFp;
FRecvFile recvFileFp;
} SSyncInfo;
typedef void *tsync_h;
......@@ -242,11 +242,6 @@ void sdbUpdateMnodeRoles() {
static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
return 0;
static int32_t sdbGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
return walGetWalFile(tsSdbMgmt.wal, fileName, fileId);
......@@ -262,7 +257,9 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) {
static int32_t sdbNotifyFileSynced(int32_t vgId, uint64_t fversion) { return 0; }
static void sdbStartFileSync(int32_t vgId) {}
static void sdbStopFileSync(int32_t vgId, uint64_t fversion) {}
static void sdbNotifyFlowCtrl(int32_t vgId, int32_t level) {}
......@@ -396,14 +393,14 @@ int32_t sdbUpdateSync(void *pMnodes) {
syncInfo.version = sdbGetVersion();
syncInfo.syncCfg = syncCfg;
sprintf(syncInfo.path, "%s", tsMnodeDir);
syncInfo.getFileInfo = sdbGetFileInfo;
syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.writeToCache = sdbWriteFwdToQueue;
syncInfo.getWalInfoFp = sdbGetWalInfo;
syncInfo.writeToCacheFp = sdbWriteFwdToQueue;
syncInfo.confirmForward = sdbConfirmForward;
syncInfo.notifyRole = sdbNotifyRole;
syncInfo.notifyFileSynced = sdbNotifyFileSynced;
syncInfo.notifyFlowCtrl = sdbNotifyFlowCtrl;
syncInfo.getVersion = sdbGetSyncVersion;
syncInfo.notifyRoleFp = sdbNotifyRole;
syncInfo.startSyncFileFp = sdbStartFileSync;
syncInfo.stopSyncFileFp = sdbStopFileSync;
syncInfo.notifyFlowCtrlFp = sdbNotifyFlowCtrl;
syncInfo.getVersionFp = sdbGetSyncVersion;
tsSdbMgmt.cfg = syncCfg;
if (tsSdbMgmt.sync) {
......@@ -108,14 +108,17 @@ typedef struct SSyncNode {
SSyncFwds * pSyncFwds; // saved forward info if quorum >1
void * pFwdTimer;
void * pRoleTimer;
FGetFileInfo getFileInfo;
FGetWalInfo getWalInfo;
FWriteToCache writeToCache;
void * pTsdb;
FGetWalInfo getWalInfoFp;
FWriteToCache writeToCacheFp;
FConfirmForward confirmForward;
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
FGetVersion getVersion;
FNotifyRole notifyRoleFp;
FNotifyFlowCtrl notifyFlowCtrlFp;
FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp;
FSendFile sendFileFp;
FRecvFile recvFileFp;
pthread_mutex_t mutex;
} SSyncNode;
......@@ -98,16 +98,12 @@ typedef struct {
typedef struct {
SSyncHead head;
uint32_t magic;
uint32_t index;
uint64_t fversion;
int64_t size;
} SFileInfo;
} SFileVersion;
typedef struct {
SSyncHead head;
int8_t sync;
int8_t ack;
} SFileAck;
typedef struct {
......@@ -134,7 +130,7 @@ void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId);
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId);
void syncBuildFileVersion(SFileVersion *pMsg, int32_t vgId);
#ifdef __cplusplus
......@@ -174,19 +174,22 @@ int64_t syncStart(const SSyncInfo *pInfo) {
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
pthread_mutex_init(&pNode->mutex, NULL);
pNode->getFileInfo = pInfo->getFileInfo;
pNode->getWalInfo = pInfo->getWalInfo;
pNode->writeToCache = pInfo->writeToCache;
pNode->notifyRole = pInfo->notifyRole;
pNode->getWalInfoFp = pInfo->getWalInfoFp;
pNode->writeToCacheFp = pInfo->writeToCacheFp;
pNode->notifyRoleFp = pInfo->notifyRoleFp;
pNode->confirmForward = pInfo->confirmForward;
pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl;
pNode->notifyFileSynced = pInfo->notifyFileSynced;
pNode->getVersion = pInfo->getVersion;
pNode->notifyFlowCtrlFp = pInfo->notifyFlowCtrlFp;
pNode->startSyncFileFp = pInfo->startSyncFileFp;
pNode->stopSyncFileFp = pInfo->stopSyncFileFp;
pNode->getVersionFp = pInfo->getVersionFp;
pNode->sendFileFp = pInfo->sendFileFp;
pNode->recvFileFp = pInfo->recvFileFp;
pNode->selfIndex = -1;
pNode->vgId = pInfo->vgId;
pNode->replica = pCfg->replica;
pNode->quorum = pCfg->quorum;
pNode->pTsdb = pInfo->pTsdb;
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
pNode->refCount = 1;
......@@ -248,8 +251,8 @@ int64_t syncStart(const SSyncInfo *pInfo) {
taosHashPut(tsVgIdHash, &pNode->vgId, sizeof(int32_t), &pNode, sizeof(SSyncNode *));
if (pNode->notifyRole) {
(*pNode->notifyRole)(pNode->vgId, nodeRole);
if (pNode->notifyRoleFp) {
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
syncStartCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb
......@@ -357,7 +360,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
if (pNewCfg->replica <= 1) {
sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
syncStartCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb
......@@ -417,7 +420,7 @@ void syncRecover(int64_t rid) {
// if take this node to unsync state, the whole system may not work
(*pNode->notifyRole)(pNode->vgId, nodeRole);
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
nodeVersion = 0;
......@@ -625,8 +628,8 @@ static void syncResetFlowCtrl(SSyncNode *pNode) {
pNode->peerInfo[index]->numOfRetrieves = 0;
if (pNode->notifyFlowCtrl) {
(*pNode->notifyFlowCtrl)(pNode->vgId, 0);
if (pNode->notifyFlowCtrlFp) {
(*pNode->notifyFlowCtrlFp)(pNode->vgId, 0);
......@@ -694,7 +697,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
(*pNode->notifyRole)(pNode->vgId, nodeRole);
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
} else {
pPeer = pNode->peerInfo[index];
sInfo("%s, it shall work as master", pPeer->id);
......@@ -730,7 +733,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
} else {
for (int32_t index = 0; index < pNode->replica; ++index) {
......@@ -742,7 +745,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
if (masterIndex == pNode->selfIndex) {
sError("%s, peer is master, work as slave instead", pTemp->id);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
......@@ -759,7 +762,7 @@ static int32_t syncValidateMaster(SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, peer has higher sver:%" PRIu64 ", restart all peer connections", pPeer->id, pPeer->version);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
code = -1;
for (int32_t index = 0; index < pNode->replica; ++index) {
......@@ -796,7 +799,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
} else {
sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
(*pNode->notifyRole)(pNode->vgId, nodeRole);
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
} else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
......@@ -989,7 +992,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
(*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
} else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead);
......@@ -101,9 +101,9 @@ void syncBuildFileAck(SFileAck *pMsg, int32_t vgId) {
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId) {
void syncBuildFileVersion(SFileVersion *pMsg, int32_t vgId) {
pMsg->head.type = TAOS_SMSG_SYNC_FILE;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SFileInfo) - sizeof(SSyncHead);
pMsg->head.len = sizeof(SFileVersion) - sizeof(SSyncHead);
\ No newline at end of file
......@@ -25,139 +25,44 @@
#include "tsync.h"
#include "syncInt.h"
static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex) {
char name[TSDB_FILENAME_LEN * 2] = {0};
char fname[TSDB_FILENAME_LEN * 3] = {0};
uint32_t magic;
uint64_t fversion;
int64_t size;
uint32_t index = sindex;
static int32_t syncRecvFileVersion(SSyncPeer *pPeer, uint64_t *fversion) {
SSyncNode *pNode = pPeer->pSyncNode;
if (sindex < 0 || eindex < sindex) return;
sDebug("%s, extra files will be removed between sindex:%d and eindex:%d", pPeer->id, sindex, eindex);
while (1) {
name[0] = 0;
magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion);
if (magic == 0) break;
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
sInfo("%s, %s is removed for its extra", pPeer->id, fname);
SFileVersion fileVersion;
memset(&fileVersion, 0, sizeof(SFileVersion));
int32_t ret = taosReadMsg(pPeer->syncFd, &fileVersion, sizeof(SFileVersion));
if (ret != sizeof(SFileVersion)) {
sError("%s, failed to read fver since %s", pPeer->id, strerror(errno));
return -1;
if (index > eindex) break;
SFileAck fileVersionAck;
memset(&fileVersionAck, 0, sizeof(SFileAck));
syncBuildFileAck(&fileVersionAck, pNode->vgId);
ret = taosReadMsg(pPeer->syncFd, &fileVersionAck, sizeof(SFileAck));
if (ret != sizeof(SFileAck)) {
sError("%s, failed to write fver ack since %s", pPeer->id, strerror(errno));
return -1;
*fversion = htobe64(fileVersion.fversion);
return 0;
static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(SFileInfo)); /* = {0}; */
SFileInfo sinfo; memset(&sinfo, 0, sizeof(SFileInfo)); /* = {0}; */
SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck));
int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
uint32_t pindex = 0; // index in last restore
bool fileChanged = false;
*fversion = 0;
sinfo.index = -1;
while (1) {
// read file info
minfo.index = -1;
int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo));
if (ret != sizeof(SFileInfo) || minfo.index == -1) {
sError("%s, failed to read fileinfo while restore file since %s", pPeer->id, strerror(errno));
assert(ret == sizeof(SFileInfo));
ret = syncCheckHead((SSyncHead *)(&minfo));
if (ret != 0) {
sError("%s, failed to check fileinfo while restore file since %s", pPeer->id, strerror(ret));
// if no more file from master, break;
if (minfo.name[0] == 0 || minfo.magic == 0) {
sDebug("%s, no more files to restore", pPeer->id);
// remove extra files after the current index
if (sinfo.index != -1) syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX);
code = 0;
sDebug("%s, file:%s info is received from master, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id,
minfo.name, minfo.index, minfo.size, minfo.fversion, minfo.magic);
// remove extra files on slave between the current and last index
syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1);
pindex = minfo.index;
// check the file info
sinfo = minfo;
sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion);
sDebug("%s, local file:%s info, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id, sinfo.name,
sinfo.index, sinfo.size, sinfo.fversion, sinfo.magic);
// if file not there or magic is not the same, file shall be synced
memset(&fileAck, 0, sizeof(SFileAck));
syncBuildFileAck(&fileAck, pNode->vgId);
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.size != minfo.size || sinfo.name[0] == 0) ? 1 : 0;
// send file ack
ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
if (ret != sizeof(SFileAck)) {
sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
// if sync is not required, continue
if (fileAck.sync == 0) {
sDebug("%s, %s is the same", pPeer->id, minfo.name);
} else {
sDebug("%s, %s will be received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
// if sync is required, open file, receive from master, and write to file
// get the full path to file
minfo.name[sizeof(minfo.name) - 1] = 0;
snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name);
int32_t dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (dfd < 0) {
sError("%s, failed to open file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno));
ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size);
if (ret < 0) {
sError("%s, failed to copy file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno));
fileChanged = true;
sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
if (code == 0 && fileChanged) {
// data file is changed, code shall be set to 1
*fversion = minfo.fversion;
code = 1;
sDebug("%s, file changed after restore file, fver:%" PRIu64, pPeer->id, *fversion);
if (pNode->recvFileFp && (*pNode->recvFileFp)(pNode->pTsdb, pPeer->syncFd) != 0) {
sError("%s, failed to restore file", pPeer->id);
return -1;
if (code < 0) {
sError("%s, failed to restore %s since %s", pPeer->id, name, strerror(errno));
if (syncRecvFileVersion(pPeer, fversion) < 0) {
return -1;
return code;
sInfo("%s, all files are restored, fver:%" PRIu64, pPeer->id, *fversion);
return 0;
static int32_t syncRestoreWal(SSyncPeer *pPeer, uint64_t *wver) {
......@@ -195,7 +100,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer, uint64_t *wver) {
lastVer = pHead->version;
ret = (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL);
ret = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL);
if (ret != 0) {
sError("%s, failed to restore record since %s, hver:%" PRIu64, pPeer->id, tstrerror(ret), pHead->version);
......@@ -215,7 +120,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)offset;
(*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
(*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
offset += pHead->len + sizeof(SWalHead);
return offset;
......@@ -315,20 +220,15 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
sDebug("%s, send sync rsp to peer, tranId:%u", pPeer->id, rsp.tranId);
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
int32_t code = syncRestoreFile(pPeer, &fversion);
if (code < 0) {
sError("%s, failed to restore file", pPeer->id);
return -1;
// if code > 0, data file is changed, notify app, and pass the version
if (code > 0 && pNode->notifyFileSynced) {
if ((*pNode->notifyFileSynced)(pNode->vgId, fversion) < 0) {
sError("%s, app not in ready state", pPeer->id);
return -1;
(*pNode->stopSyncFileFp)(pNode->vgId, fversion);
nodeVersion = fversion;
sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion);
......@@ -368,7 +268,7 @@ void *syncRestoreData(void *param) {
__sync_fetch_and_add(&tsSyncNum, 1);
sInfo("%s, start to restore data, sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
(*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING);
(*pNode->notifyRoleFp)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING);
if (syncOpenRecvBuffer(pNode) < 0) {
sError("%s, failed to allocate recv buffer, restart connection", pPeer->id);
......@@ -385,7 +285,7 @@ void *syncRestoreData(void *param) {
(*pNode->notifyRole)(pNode->vgId, nodeRole);
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
sInfo("%s, restore data over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
......@@ -28,7 +28,7 @@
static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
return -1;
......@@ -40,7 +40,7 @@ static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
return true;
......@@ -56,7 +56,7 @@ static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) {
static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while get fver for retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
return -1;
......@@ -68,7 +68,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
pPeer->fileChanged = 1;
......@@ -85,104 +85,54 @@ static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
return false;
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
static int32_t syncSendFileVersion(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck));
int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
if (syncGetFileVersion(pNode, pPeer) < 0) {
pPeer->fileChanged = 1;
SFileVersion fileVersion;
memset(&fileVersion, 0, sizeof(SFileVersion));
syncBuildFileVersion(&fileVersion, pNode->vgId);
uint64_t fver = pPeer->lastFileVer;
fileVersion.fversion = htobe64(fver);
int32_t ret = taosWriteMsg(pPeer->syncFd, &fileVersion, sizeof(SFileVersion));
if (ret != sizeof(SFileVersion)) {
sError("%s, failed to write fver:%" PRIu64 " since %s", pPeer->id, fver, strerror(errno));
return -1;
while (1) {
// retrieve file info
fileInfo.name[0] = 0;
fileInfo.size = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion);
syncBuildFileInfo(&fileInfo, pNode->vgId);
sDebug("%s, file:%s info is sent, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id, fileInfo.name,
fileInfo.index, fileInfo.size, fileInfo.fversion, fileInfo.magic);
// send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo));
if (ret != sizeof(SFileInfo)) {
code = -1;
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
// if no file anymore, break
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
code = 0;
sDebug("%s, no more files to sync", pPeer->id);
// wait for the ack from peer
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
if (ret != sizeof(SFileAck)) {
code = -1;
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
ret = syncCheckHead((SSyncHead*)(&fileAck));
if (ret != 0) {
code = -1;
sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret));
// set the peer sync version
pPeer->sversion = fileInfo.fversion;
// if sync is not required, continue
if (fileAck.sync == 0) {
sDebug("%s, %s is the same, fver:%" PRIu64, pPeer->id, fileInfo.name, fileInfo.fversion);
} else {
sDebug("%s, %s will be sent, fver:%" PRIu64, pPeer->id, fileInfo.name, fileInfo.fversion);
SFileAck fileAck;
memset(&fileAck, 0, sizeof(SFileAck));
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
if (ret != sizeof(SFileAck)) {
sError("%s, failed to read fver ack since %s", pPeer->id, strerror(errno));
return -1;
// get the full path to file
snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);
// set the peer sync version
pPeer->sversion = fver;
// send the file to peer
int32_t sfd = open(name, O_RDONLY);
if (sfd < 0) {
code = -1;
sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
return 0;
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
if (ret < 0) {
code = -1;
sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
sDebug("%s, file:%s is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
if (syncGetFileVersion(pNode, pPeer) < 0) {
pPeer->fileChanged = 1;
return -1;
// check if processed files are modified
if (syncAreFilesModified(pNode, pPeer)) {
code = -1;
if (pNode->sendFileFp && (*pNode->sendFileFp)(pNode->pTsdb, pPeer->syncFd) != 0) {
sError("%s, failed to retrieve file", pPeer->id);
return -1;
if (code != TSDB_CODE_SUCCESS) {
sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code);
if (syncSendFileVersion(pPeer) < 0) {
return -1;
return code;
sInfo("%s, all files are retrieved", pPeer->id);
return 0;
// if only a partial record is read out, upper layer will reload the file to get a complete record
......@@ -346,7 +296,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
while (1) {
// retrieve wal info
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
code = (*pNode->getWalInfoFp)(pNode->vgId, wname, &index);
if (code < 0) {
sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code);
......@@ -478,7 +428,7 @@ void *syncRetrieveData(void *param) {
sInfo("%s, start to retrieve data, sstatus:%s, numOfRetrieves:%d", pPeer->id, syncStatus[pPeer->sstatus],
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
if (pNode->notifyFlowCtrlFp) (*pNode->notifyFlowCtrlFp)(pNode->vgId, pPeer->numOfRetrieves);
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (pPeer->syncFd < 0) {
......@@ -498,10 +448,10 @@ void *syncRetrieveData(void *param) {
} else {
pPeer->numOfRetrieves = 0;
// if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
// if (pNode->notifyFlowCtrlFp) (*pNode->notifyFlowCtrlFp)(pNode->vgId, 0);
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
if (pNode->notifyFlowCtrlFp) (*pNode->notifyFlowCtrlFp)(pNode->vgId, 0);
pPeer->fileChanged = 0;
......@@ -296,11 +296,10 @@ void initSync() {
pCfg->replica = 1;
pCfg->quorum = 1;
syncInfo.vgId = 1;
syncInfo.getFileInfo = getFileInfo;
syncInfo.getWalInfo = getWalInfo;
syncInfo.writeToCache = writeToCache;
syncInfo.getWalInfoFp = getWalInfo;
syncInfo.writeToCacheFp = writeToCache;
syncInfo.confirmForward = confirmForward;
syncInfo.notifyRole = notifyRole;
syncInfo.notifyRoleFp = notifyRole;
pCfg->nodeInfo[0].nodeId = 1;
pCfg->nodeInfo[0].nodePort = 7010;
......@@ -26,8 +26,6 @@ int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
int32_t vnodeReset(SVnodeObj *pVnode);
void vnodeCleanUp(SVnodeObj *pVnode);
void vnodeDestroy(SVnodeObj *pVnode);
......@@ -25,7 +25,8 @@ uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t ei
int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
void vnodeNotifyRole(int32_t vgId, int8_t role);
void vnodeCtrlFlow(int32_t vgId, int32_t level);
int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
void vnodeStartSyncFile(int32_t vgId);
void vnodeStopSyncFile(int32_t vgId, uint64_t fversion);
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
......@@ -294,14 +294,15 @@ int32_t vnodeOpen(int32_t vgId) {
syncInfo.version = pVnode->version;
syncInfo.syncCfg = pVnode->syncCfg;
tstrncpy(syncInfo.path, rootDir, TSDB_FILENAME_LEN);
syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToCache;
syncInfo.getWalInfoFp = vnodeGetWalInfo;
syncInfo.writeToCacheFp = vnodeWriteToCache;
syncInfo.confirmForward = vnodeConfirmForard;
syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
syncInfo.getVersion = vnodeGetVersion;
syncInfo.notifyRoleFp = vnodeNotifyRole;
syncInfo.notifyFlowCtrlFp = vnodeCtrlFlow;
syncInfo.startSyncFileFp = vnodeStartSyncFile;
syncInfo.stopSyncFileFp = vnodeStopSyncFile;
syncInfo.getVersionFp = vnodeGetVersion;
syncInfo.pTsdb = pVnode->tsdb;
pVnode->sync = syncStart(&syncInfo);
if (pVnode->sync <= 0) {
......@@ -453,34 +454,3 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
return 0;
int32_t vnodeReset(SVnodeObj *pVnode) {
if (!vnodeSetResetStatus(pVnode)) {
return -1;
void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL;
// acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
if (refCount > 3) {
// close tsdb, then open tsdb
tsdbCloseRepo(tsdb, 0);
STsdbAppH appH = {0};
appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq;
appH.cqCreateFunc = cqCreate;
appH.cqDropFunc = cqDrop;
pVnode->tsdb = tsdbOpenRepo(&(pVnode->tsdbCfg), &appH);
return 0;
\ No newline at end of file
......@@ -20,6 +20,7 @@
#include "dnode.h"
#include "vnodeVersion.h"
#include "vnodeMain.h"
#include "vnodeStatus.h"
uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
......@@ -83,22 +84,34 @@ void vnodeCtrlFlow(int32_t vgId, int32_t level) {
int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) {
void vnodeStartSyncFile(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while notify file synced", vgId);
return 0;
vError("vgId:%d, vnode not found while start filesync", vgId);
vDebug("vgId:%d, datafile will be synced", vgId);
void vnodeStopSyncFile(int32_t vgId, uint64_t fversion) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while stop filesync", vgId);
pVnode->fversion = fversion;
pVnode->version = fversion;
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
int32_t code = vnodeReset(pVnode);
vDebug("vgId:%d, datafile is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
return code;
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册