diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 4dae86bbed538a0801251f62723a471215655e24..1c25197835d2977949d969c190e7374bd80b412c 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -56,16 +56,6 @@ typedef struct { int32_t role[TAOS_SYNC_MAX_REPLICA]; } SNodesRole; -/* - if name is empty(name[0] is zero), get the file from index or after, but not larger than eindex. If a file - is found between index and eindex, index shall be updated, name shall be set, size shall be set to - file size, and file magic number shall be returned. - - if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return - zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated. -*/ -typedef uint32_t (*FGetFileInfo)(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); - // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file typedef int32_t (*FGetWalInfo)(int32_t vgId, char *fileName, int64_t *fileId); @@ -83,24 +73,31 @@ typedef void (*FNotifyRole)(int32_t vgId, int8_t role); typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level); // when data file is synced successfully, notity app -typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); +typedef void (*FStartSyncFile)(int32_t vgId); +typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion); // get file version typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver); +typedef int32_t (*FSendFile)(void *tsdb, int32_t socketFd); +typedef int32_t (*FRecvFile)(void *tsdb, int32_t socketFd); + typedef struct { int32_t vgId; // vgroup ID uint64_t version; // initial version SSyncCfg syncCfg; // configuration from mgmt char path[TSDB_FILENAME_LEN]; // path to the file - FGetFileInfo getFileInfo; - FGetWalInfo getWalInfo; - FWriteToCache writeToCache; + void * pTsdb; + FGetWalInfo getWalInfoFp; + FWriteToCache writeToCacheFp; FConfirmForward confirmForward; - FNotifyRole notifyRole; - FNotifyFlowCtrl notifyFlowCtrl; - FNotifyFileSynced notifyFileSynced; - FGetVersion getVersion; + FNotifyRole notifyRoleFp; + FNotifyFlowCtrl notifyFlowCtrlFp; + FStartSyncFile startSyncFileFp; + FStopSyncFile stopSyncFileFp; + FGetVersion getVersionFp; + FSendFile sendFileFp; + FRecvFile recvFileFp; } SSyncInfo; typedef void *tsync_h; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index ae495108b3a3d0afd479057d9828ae4c69d4537f..21bfe568cd238499a94cf13214fad384e869ad91 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -242,11 +242,6 @@ void sdbUpdateMnodeRoles() { mnodeUpdateMnodeEpSet(NULL); } -static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { - sdbUpdateMnodeRoles(); - return 0; -} - static int32_t sdbGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { return walGetWalFile(tsSdbMgmt.wal, fileName, fileId); } @@ -262,7 +257,9 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) { sdbUpdateMnodeRoles(); } -static int32_t sdbNotifyFileSynced(int32_t vgId, uint64_t fversion) { return 0; } +static void sdbStartFileSync(int32_t vgId) {} + +static void sdbStopFileSync(int32_t vgId, uint64_t fversion) {} static void sdbNotifyFlowCtrl(int32_t vgId, int32_t level) {} @@ -396,14 +393,14 @@ int32_t sdbUpdateSync(void *pMnodes) { syncInfo.version = sdbGetVersion(); syncInfo.syncCfg = syncCfg; sprintf(syncInfo.path, "%s", tsMnodeDir); - syncInfo.getFileInfo = sdbGetFileInfo; - syncInfo.getWalInfo = sdbGetWalInfo; - syncInfo.writeToCache = sdbWriteFwdToQueue; + syncInfo.getWalInfoFp = sdbGetWalInfo; + syncInfo.writeToCacheFp = sdbWriteFwdToQueue; syncInfo.confirmForward = sdbConfirmForward; - syncInfo.notifyRole = sdbNotifyRole; - syncInfo.notifyFileSynced = sdbNotifyFileSynced; - syncInfo.notifyFlowCtrl = sdbNotifyFlowCtrl; - syncInfo.getVersion = sdbGetSyncVersion; + syncInfo.notifyRoleFp = sdbNotifyRole; + syncInfo.startSyncFileFp = sdbStartFileSync; + syncInfo.stopSyncFileFp = sdbStopFileSync; + syncInfo.notifyFlowCtrlFp = sdbNotifyFlowCtrl; + syncInfo.getVersionFp = sdbGetSyncVersion; tsSdbMgmt.cfg = syncCfg; if (tsSdbMgmt.sync) { diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index eef687d64753bdecfddaf63759ba1d3bdd8fa2bd..d0a667da654279edf9d5bd0976d85866715cfb12 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -108,14 +108,17 @@ typedef struct SSyncNode { SSyncFwds * pSyncFwds; // saved forward info if quorum >1 void * pFwdTimer; void * pRoleTimer; - FGetFileInfo getFileInfo; - FGetWalInfo getWalInfo; - FWriteToCache writeToCache; + void * pTsdb; + FGetWalInfo getWalInfoFp; + FWriteToCache writeToCacheFp; FConfirmForward confirmForward; - FNotifyRole notifyRole; - FNotifyFlowCtrl notifyFlowCtrl; - FNotifyFileSynced notifyFileSynced; - FGetVersion getVersion; + FNotifyRole notifyRoleFp; + FNotifyFlowCtrl notifyFlowCtrlFp; + FStartSyncFile startSyncFileFp; + FStopSyncFile stopSyncFileFp; + FGetVersion getVersionFp; + FSendFile sendFileFp; + FRecvFile recvFileFp; pthread_mutex_t mutex; } SSyncNode; diff --git a/src/sync/inc/syncMsg.h b/src/sync/inc/syncMsg.h index 73f4223c882ff9ce544984b37843070cf579c44a..5724927daf94df793ced6f9def6a0d25acab647c 100644 --- a/src/sync/inc/syncMsg.h +++ b/src/sync/inc/syncMsg.h @@ -98,16 +98,12 @@ typedef struct { typedef struct { SSyncHead head; - char name[TSDB_FILENAME_LEN]; - uint32_t magic; - uint32_t index; uint64_t fversion; - int64_t size; -} SFileInfo; +} SFileVersion; typedef struct { SSyncHead head; - int8_t sync; + int8_t ack; } SFileAck; typedef struct { @@ -134,7 +130,7 @@ void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId); void syncBuildFileAck(SFileAck *pMsg, int32_t vgId); -void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId); +void syncBuildFileVersion(SFileVersion *pMsg, int32_t vgId); #ifdef __cplusplus } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 1981196525161737255c9807af36c4babc9d20a3..510aa84d7d247193648713808b058fe39e6f1671 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -174,19 +174,22 @@ int64_t syncStart(const SSyncInfo *pInfo) { tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); pthread_mutex_init(&pNode->mutex, NULL); - pNode->getFileInfo = pInfo->getFileInfo; - pNode->getWalInfo = pInfo->getWalInfo; - pNode->writeToCache = pInfo->writeToCache; - pNode->notifyRole = pInfo->notifyRole; + pNode->getWalInfoFp = pInfo->getWalInfoFp; + pNode->writeToCacheFp = pInfo->writeToCacheFp; + pNode->notifyRoleFp = pInfo->notifyRoleFp; pNode->confirmForward = pInfo->confirmForward; - pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl; - pNode->notifyFileSynced = pInfo->notifyFileSynced; - pNode->getVersion = pInfo->getVersion; + pNode->notifyFlowCtrlFp = pInfo->notifyFlowCtrlFp; + pNode->startSyncFileFp = pInfo->startSyncFileFp; + pNode->stopSyncFileFp = pInfo->stopSyncFileFp; + pNode->getVersionFp = pInfo->getVersionFp; + pNode->sendFileFp = pInfo->sendFileFp; + pNode->recvFileFp = pInfo->recvFileFp; pNode->selfIndex = -1; pNode->vgId = pInfo->vgId; pNode->replica = pCfg->replica; pNode->quorum = pCfg->quorum; + pNode->pTsdb = pInfo->pTsdb; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; pNode->refCount = 1; @@ -248,8 +251,8 @@ int64_t syncStart(const SSyncInfo *pInfo) { syncAddArbitrator(pNode); taosHashPut(tsVgIdHash, &pNode->vgId, sizeof(int32_t), &pNode, sizeof(SSyncNode *)); - if (pNode->notifyRole) { - (*pNode->notifyRole)(pNode->vgId, nodeRole); + if (pNode->notifyRoleFp) { + (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); } syncStartCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb @@ -357,7 +360,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { if (pNewCfg->replica <= 1) { sInfo("vgId:%d, no peers are configured, work as master!", pNode->vgId); nodeRole = TAOS_SYNC_ROLE_MASTER; - (*pNode->notifyRole)(pNode->vgId, nodeRole); + (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); } syncStartCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb @@ -417,7 +420,7 @@ void syncRecover(int64_t rid) { // if take this node to unsync state, the whole system may not work nodeRole = TAOS_SYNC_ROLE_UNSYNCED; - (*pNode->notifyRole)(pNode->vgId, nodeRole); + (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); nodeVersion = 0; pthread_mutex_lock(&pNode->mutex); @@ -625,8 +628,8 @@ static void syncResetFlowCtrl(SSyncNode *pNode) { pNode->peerInfo[index]->numOfRetrieves = 0; } - if (pNode->notifyFlowCtrl) { - (*pNode->notifyFlowCtrl)(pNode->vgId, 0); + if (pNode->notifyFlowCtrlFp) { + (*pNode->notifyFlowCtrlFp)(pNode->vgId, 0); } } @@ -694,7 +697,7 @@ static void syncChooseMaster(SSyncNode *pNode) { taosMsleep(SYNC_WAIT_AFTER_CHOOSE_MASTER); syncResetFlowCtrl(pNode); - (*pNode->notifyRole)(pNode->vgId, nodeRole); + (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); } else { pPeer = pNode->peerInfo[index]; sInfo("%s, it shall work as master", pPeer->id); @@ -730,7 +733,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); } - (*pNode->notifyRole)(pNode->vgId, nodeRole); + (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); } } else { for (int32_t index = 0; index < pNode->replica; ++index) { @@ -742,7 +745,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { if (masterIndex == pNode->selfIndex) { sError("%s, peer is master, work as slave instead", pTemp->id); nodeRole = TAOS_SYNC_ROLE_SLAVE; - (*pNode->notifyRole)(pNode->vgId, nodeRole); + (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); } } } @@ -759,7 +762,7 @@ static int32_t syncValidateMaster(SSyncPeer *pPeer) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { sDebug("%s, peer has higher sver:%" PRIu64 ", restart all peer connections", pPeer->id, pPeer->version); nodeRole = TAOS_SYNC_ROLE_UNSYNCED; - (*pNode->notifyRole)(pNode->vgId, nodeRole); + (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); code = -1; for (int32_t index = 0; index < pNode->replica; ++index) { @@ -796,7 +799,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new } else { sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); nodeRole = TAOS_SYNC_ROLE_SLAVE; - (*pNode->notifyRole)(pNode->vgId, nodeRole); + (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); } } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); @@ -989,7 +992,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { // nodeVersion = pHead->version; - (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); + (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); } else { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { syncSaveIntoBuffer(pPeer, pHead); diff --git a/src/sync/src/syncMsg.c b/src/sync/src/syncMsg.c index 034f9a98a70c7373b74a84d24d478b52d7bf4df9..e16b9f5b4ae87dff72448adb2cfae2ea9132d3f5 100644 --- a/src/sync/src/syncMsg.c +++ b/src/sync/src/syncMsg.c @@ -101,9 +101,9 @@ void syncBuildFileAck(SFileAck *pMsg, int32_t vgId) { syncBuildHead(&pMsg->head); } -void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId) { +void syncBuildFileVersion(SFileVersion *pMsg, int32_t vgId) { pMsg->head.type = TAOS_SMSG_SYNC_FILE; pMsg->head.vgId = vgId; - pMsg->head.len = sizeof(SFileInfo) - sizeof(SSyncHead); + pMsg->head.len = sizeof(SFileVersion) - sizeof(SSyncHead); syncBuildHead(&pMsg->head); } \ No newline at end of file diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 78520c660875ccd25d42857b89359fbd15f33b35..2360ca7bf3f865da0f7f30a40b1cdf5e3cdf39b6 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -25,139 +25,44 @@ #include "tsync.h" #include "syncInt.h" -static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex) { - char name[TSDB_FILENAME_LEN * 2] = {0}; - char fname[TSDB_FILENAME_LEN * 3] = {0}; - uint32_t magic; - uint64_t fversion; - int64_t size; - uint32_t index = sindex; +static int32_t syncRecvFileVersion(SSyncPeer *pPeer, uint64_t *fversion) { SSyncNode *pNode = pPeer->pSyncNode; - if (sindex < 0 || eindex < sindex) return; - - sDebug("%s, extra files will be removed between sindex:%d and eindex:%d", pPeer->id, sindex, eindex); - - while (1) { - name[0] = 0; - magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion); - if (magic == 0) break; - - snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name); - (void)remove(fname); - sInfo("%s, %s is removed for its extra", pPeer->id, fname); + SFileVersion fileVersion; + memset(&fileVersion, 0, sizeof(SFileVersion)); + int32_t ret = taosReadMsg(pPeer->syncFd, &fileVersion, sizeof(SFileVersion)); + if (ret != sizeof(SFileVersion)) { + sError("%s, failed to read fver since %s", pPeer->id, strerror(errno)); + return -1; + } - index++; - if (index > eindex) break; + SFileAck fileVersionAck; + memset(&fileVersionAck, 0, sizeof(SFileAck)); + syncBuildFileAck(&fileVersionAck, pNode->vgId); + ret = taosReadMsg(pPeer->syncFd, &fileVersionAck, sizeof(SFileAck)); + if (ret != sizeof(SFileAck)) { + sError("%s, failed to write fver ack since %s", pPeer->id, strerror(errno)); + return -1; } + + *fversion = htobe64(fileVersion.fversion); + return 0; } static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { SSyncNode *pNode = pPeer->pSyncNode; - SFileInfo minfo; memset(&minfo, 0, sizeof(SFileInfo)); /* = {0}; */ - SFileInfo sinfo; memset(&sinfo, 0, sizeof(SFileInfo)); /* = {0}; */ - SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck)); - int32_t code = -1; - char name[TSDB_FILENAME_LEN * 2] = {0}; - uint32_t pindex = 0; // index in last restore - bool fileChanged = false; - - *fversion = 0; - sinfo.index = -1; - while (1) { - // read file info - minfo.index = -1; - int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo)); - if (ret != sizeof(SFileInfo) || minfo.index == -1) { - sError("%s, failed to read fileinfo while restore file since %s", pPeer->id, strerror(errno)); - break; - } - - assert(ret == sizeof(SFileInfo)); - ret = syncCheckHead((SSyncHead *)(&minfo)); - if (ret != 0) { - sError("%s, failed to check fileinfo while restore file since %s", pPeer->id, strerror(ret)); - break; - } - - // if no more file from master, break; - if (minfo.name[0] == 0 || minfo.magic == 0) { - sDebug("%s, no more files to restore", pPeer->id); - - // remove extra files after the current index - if (sinfo.index != -1) syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX); - code = 0; - break; - } - - sDebug("%s, file:%s info is received from master, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id, - minfo.name, minfo.index, minfo.size, minfo.fversion, minfo.magic); - - // remove extra files on slave between the current and last index - syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1); - pindex = minfo.index; - - // check the file info - sinfo = minfo; - sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion); - sDebug("%s, local file:%s info, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id, sinfo.name, - sinfo.index, sinfo.size, sinfo.fversion, sinfo.magic); - - // if file not there or magic is not the same, file shall be synced - memset(&fileAck, 0, sizeof(SFileAck)); - syncBuildFileAck(&fileAck, pNode->vgId); - fileAck.sync = (sinfo.magic != minfo.magic || sinfo.size != minfo.size || sinfo.name[0] == 0) ? 1 : 0; - // send file ack - ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck)); - if (ret != sizeof(SFileAck)) { - sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); - break; - } - - // if sync is not required, continue - if (fileAck.sync == 0) { - sDebug("%s, %s is the same", pPeer->id, minfo.name); - continue; - } else { - sDebug("%s, %s will be received, size:%" PRId64, pPeer->id, minfo.name, minfo.size); - } - - // if sync is required, open file, receive from master, and write to file - // get the full path to file - minfo.name[sizeof(minfo.name) - 1] = 0; - snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name); - - int32_t dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); - if (dfd < 0) { - sError("%s, failed to open file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno)); - break; - } - - ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size); - fsync(dfd); - close(dfd); - if (ret < 0) { - sError("%s, failed to copy file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno)); - break; - } - - fileChanged = true; - sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size); - } - - if (code == 0 && fileChanged) { - // data file is changed, code shall be set to 1 - *fversion = minfo.fversion; - code = 1; - sDebug("%s, file changed after restore file, fver:%" PRIu64, pPeer->id, *fversion); + if (pNode->recvFileFp && (*pNode->recvFileFp)(pNode->pTsdb, pPeer->syncFd) != 0) { + sError("%s, failed to restore file", pPeer->id); + return -1; } - if (code < 0) { - sError("%s, failed to restore %s since %s", pPeer->id, name, strerror(errno)); + if (syncRecvFileVersion(pPeer, fversion) < 0) { + return -1; } - return code; + sInfo("%s, all files are restored, fver:%" PRIu64, pPeer->id, *fversion); + return 0; } static int32_t syncRestoreWal(SSyncPeer *pPeer, uint64_t *wver) { @@ -195,7 +100,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer, uint64_t *wver) { } lastVer = pHead->version; - ret = (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL); + ret = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL); if (ret != 0) { sError("%s, failed to restore record since %s, hver:%" PRIu64, pPeer->id, tstrerror(ret), pHead->version); break; @@ -215,7 +120,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) { SSyncNode *pNode = pPeer->pSyncNode; SWalHead * pHead = (SWalHead *)offset; - (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); + (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); offset += pHead->len + sizeof(SWalHead); return offset; @@ -315,20 +220,15 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { sDebug("%s, send sync rsp to peer, tranId:%u", pPeer->id, rsp.tranId); sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); + (*pNode->startSyncFileFp)(pNode->vgId); + int32_t code = syncRestoreFile(pPeer, &fversion); if (code < 0) { sError("%s, failed to restore file", pPeer->id); return -1; } - // if code > 0, data file is changed, notify app, and pass the version - if (code > 0 && pNode->notifyFileSynced) { - if ((*pNode->notifyFileSynced)(pNode->vgId, fversion) < 0) { - sError("%s, app not in ready state", pPeer->id); - return -1; - } - } - + (*pNode->stopSyncFileFp)(pNode->vgId, fversion); nodeVersion = fversion; sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion); @@ -368,7 +268,7 @@ void *syncRestoreData(void *param) { __sync_fetch_and_add(&tsSyncNum, 1); sInfo("%s, start to restore data, sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); - (*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING); + (*pNode->notifyRoleFp)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING); if (syncOpenRecvBuffer(pNode) < 0) { sError("%s, failed to allocate recv buffer, restart connection", pPeer->id); @@ -385,7 +285,7 @@ void *syncRestoreData(void *param) { } } - (*pNode->notifyRole)(pNode->vgId, nodeRole); + (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); nodeSStatus = TAOS_SYNC_STATUS_INIT; sInfo("%s, restore data over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index cb2379583f853be70bc2e735c81f711a18dc9b52..71c1ebd0521cb8c18d021a747c04eb9f8276ae66 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -28,7 +28,7 @@ static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) { uint64_t fver, wver; - int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver); if (code != 0) { sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer); return -1; @@ -40,7 +40,7 @@ static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) { static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) { uint64_t fver, wver; - int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver); if (code != 0) { sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer); return true; @@ -56,7 +56,7 @@ static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) { static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) { uint64_t fver, wver; - int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver); if (code != 0) { sDebug("%s, vnode is commiting while get fver for retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); return -1; @@ -68,7 +68,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) { static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { uint64_t fver, wver; - int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver); if (code != 0) { sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); pPeer->fileChanged = 1; @@ -85,104 +85,54 @@ static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { return false; } -static int32_t syncRetrieveFile(SSyncPeer *pPeer) { +static int32_t syncSendFileVersion(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; - SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo)); - SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck)); - int32_t code = -1; - char name[TSDB_FILENAME_LEN * 2] = {0}; - if (syncGetFileVersion(pNode, pPeer) < 0) { - pPeer->fileChanged = 1; + SFileVersion fileVersion; + memset(&fileVersion, 0, sizeof(SFileVersion)); + syncBuildFileVersion(&fileVersion, pNode->vgId); + + uint64_t fver = pPeer->lastFileVer; + fileVersion.fversion = htobe64(fver); + int32_t ret = taosWriteMsg(pPeer->syncFd, &fileVersion, sizeof(SFileVersion)); + if (ret != sizeof(SFileVersion)) { + sError("%s, failed to write fver:%" PRIu64 " since %s", pPeer->id, fver, strerror(errno)); return -1; } - while (1) { - // retrieve file info - fileInfo.name[0] = 0; - fileInfo.size = 0; - fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, - &fileInfo.size, &fileInfo.fversion); - syncBuildFileInfo(&fileInfo, pNode->vgId); - sDebug("%s, file:%s info is sent, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id, fileInfo.name, - fileInfo.index, fileInfo.size, fileInfo.fversion, fileInfo.magic); - - // send the file info - int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo)); - if (ret != sizeof(SFileInfo)) { - code = -1; - sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); - break; - } - - // if no file anymore, break - if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { - code = 0; - sDebug("%s, no more files to sync", pPeer->id); - break; - } - - // wait for the ack from peer - ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck)); - if (ret != sizeof(SFileAck)) { - code = -1; - sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); - break; - } - - ret = syncCheckHead((SSyncHead*)(&fileAck)); - if (ret != 0) { - code = -1; - sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret)); - break; - } - - // set the peer sync version - pPeer->sversion = fileInfo.fversion; - - // if sync is not required, continue - if (fileAck.sync == 0) { - fileInfo.index++; - sDebug("%s, %s is the same, fver:%" PRIu64, pPeer->id, fileInfo.name, fileInfo.fversion); - continue; - } else { - sDebug("%s, %s will be sent, fver:%" PRIu64, pPeer->id, fileInfo.name, fileInfo.fversion); - } + SFileAck fileAck; + memset(&fileAck, 0, sizeof(SFileAck)); + ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck)); + if (ret != sizeof(SFileAck)) { + sError("%s, failed to read fver ack since %s", pPeer->id, strerror(errno)); + return -1; + } - // get the full path to file - snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name); + // set the peer sync version + pPeer->sversion = fver; - // send the file to peer - int32_t sfd = open(name, O_RDONLY); - if (sfd < 0) { - code = -1; - sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); - break; - } + return 0; +} - ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); - close(sfd); - if (ret < 0) { - code = -1; - sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); - break; - } +static int32_t syncRetrieveFile(SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; - sDebug("%s, file:%s is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); - fileInfo.index++; + if (syncGetFileVersion(pNode, pPeer) < 0) { + pPeer->fileChanged = 1; + return -1; + } - // check if processed files are modified - if (syncAreFilesModified(pNode, pPeer)) { - code = -1; - break; - } + if (pNode->sendFileFp && (*pNode->sendFileFp)(pNode->pTsdb, pPeer->syncFd) != 0) { + sError("%s, failed to retrieve file", pPeer->id); + return -1; } - if (code != TSDB_CODE_SUCCESS) { - sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code); + if (syncSendFileVersion(pPeer) < 0) { + return -1; } - return code; + sInfo("%s, all files are retrieved", pPeer->id); + return 0; } // if only a partial record is read out, upper layer will reload the file to get a complete record @@ -346,7 +296,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { while (1) { // retrieve wal info wname[0] = 0; - code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); + code = (*pNode->getWalInfoFp)(pNode->vgId, wname, &index); if (code < 0) { sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code); break; @@ -478,7 +428,7 @@ void *syncRetrieveData(void *param) { sInfo("%s, start to retrieve data, sstatus:%s, numOfRetrieves:%d", pPeer->id, syncStatus[pPeer->sstatus], pPeer->numOfRetrieves); - if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves); + if (pNode->notifyFlowCtrlFp) (*pNode->notifyFlowCtrlFp)(pNode->vgId, pPeer->numOfRetrieves); pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (pPeer->syncFd < 0) { @@ -498,10 +448,10 @@ void *syncRetrieveData(void *param) { pPeer->numOfRetrieves++; } else { pPeer->numOfRetrieves = 0; - // if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0); + // if (pNode->notifyFlowCtrlFp) (*pNode->notifyFlowCtrlFp)(pNode->vgId, 0); } - if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0); + if (pNode->notifyFlowCtrlFp) (*pNode->notifyFlowCtrlFp)(pNode->vgId, 0); pPeer->fileChanged = 0; taosClose(pPeer->syncFd); diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 161105d86c9a45fb1b8827f01701a43a3e08000c..eeaa6a08c2e47d103b62d6023dde74341585f65f 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -296,11 +296,10 @@ void initSync() { pCfg->replica = 1; pCfg->quorum = 1; syncInfo.vgId = 1; - syncInfo.getFileInfo = getFileInfo; - syncInfo.getWalInfo = getWalInfo; - syncInfo.writeToCache = writeToCache; + syncInfo.getWalInfoFp = getWalInfo; + syncInfo.writeToCacheFp = writeToCache; syncInfo.confirmForward = confirmForward; - syncInfo.notifyRole = notifyRole; + syncInfo.notifyRoleFp = notifyRole; pCfg->nodeInfo[0].nodeId = 1; pCfg->nodeInfo[0].nodePort = 7010; diff --git a/src/vnode/inc/vnodeMain.h b/src/vnode/inc/vnodeMain.h index e1ddcdc36aa1fbf434b138f8d6fef5966e1fbc3e..73591bc10d97dfbe519cf9e5c1f73a96c8fa0854 100644 --- a/src/vnode/inc/vnodeMain.h +++ b/src/vnode/inc/vnodeMain.h @@ -26,8 +26,6 @@ int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId); int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); - -int32_t vnodeReset(SVnodeObj *pVnode); void vnodeCleanUp(SVnodeObj *pVnode); void vnodeDestroy(SVnodeObj *pVnode); diff --git a/src/vnode/inc/vnodeSync.h b/src/vnode/inc/vnodeSync.h index ae02ca17cb50ed35480916d624004e33c74cc611..c9ac25c2274d81cd08c52a77cd3cc76a27c7a0d5 100644 --- a/src/vnode/inc/vnodeSync.h +++ b/src/vnode/inc/vnodeSync.h @@ -25,7 +25,8 @@ uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t ei int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); void vnodeNotifyRole(int32_t vgId, int8_t role); void vnodeCtrlFlow(int32_t vgId, int32_t level); -int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); +void vnodeStartSyncFile(int32_t vgId); +void vnodeStopSyncFile(int32_t vgId, uint64_t fversion); void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 049c4d57a4012c20e1ec49b1e6ca7e9b4322b57b..633651bf6397a04c88f97dcab6d7ebcdc2cfc576 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -294,14 +294,15 @@ int32_t vnodeOpen(int32_t vgId) { syncInfo.version = pVnode->version; syncInfo.syncCfg = pVnode->syncCfg; tstrncpy(syncInfo.path, rootDir, TSDB_FILENAME_LEN); - syncInfo.getWalInfo = vnodeGetWalInfo; - syncInfo.getFileInfo = vnodeGetFileInfo; - syncInfo.writeToCache = vnodeWriteToCache; + syncInfo.getWalInfoFp = vnodeGetWalInfo; + syncInfo.writeToCacheFp = vnodeWriteToCache; syncInfo.confirmForward = vnodeConfirmForard; - syncInfo.notifyRole = vnodeNotifyRole; - syncInfo.notifyFlowCtrl = vnodeCtrlFlow; - syncInfo.notifyFileSynced = vnodeNotifyFileSynced; - syncInfo.getVersion = vnodeGetVersion; + syncInfo.notifyRoleFp = vnodeNotifyRole; + syncInfo.notifyFlowCtrlFp = vnodeCtrlFlow; + syncInfo.startSyncFileFp = vnodeStartSyncFile; + syncInfo.stopSyncFileFp = vnodeStopSyncFile; + syncInfo.getVersionFp = vnodeGetVersion; + syncInfo.pTsdb = pVnode->tsdb; pVnode->sync = syncStart(&syncInfo); if (pVnode->sync <= 0) { @@ -453,34 +454,3 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { return 0; } - -int32_t vnodeReset(SVnodeObj *pVnode) { - if (!vnodeSetResetStatus(pVnode)) { - return -1; - } - - void *tsdb = pVnode->tsdb; - pVnode->tsdb = NULL; - - // acquire vnode - int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - - if (refCount > 3) { - tsem_wait(&pVnode->sem); - } - - // close tsdb, then open tsdb - tsdbCloseRepo(tsdb, 0); - STsdbAppH appH = {0}; - appH.appH = (void *)pVnode; - appH.notifyStatus = vnodeProcessTsdbStatus; - appH.cqH = pVnode->cq; - appH.cqCreateFunc = cqCreate; - appH.cqDropFunc = cqDrop; - pVnode->tsdb = tsdbOpenRepo(&(pVnode->tsdbCfg), &appH); - - vnodeSetReadyStatus(pVnode); - vnodeRelease(pVnode); - - return 0; -} \ No newline at end of file diff --git a/src/vnode/src/vnodeSync.c b/src/vnode/src/vnodeSync.c index c67132c41f2a925c5d3224f8b59f0e94b5e8f1c3..627783c391d45b37830cb2b6d851fa6dd3261819 100644 --- a/src/vnode/src/vnodeSync.c +++ b/src/vnode/src/vnodeSync.c @@ -20,6 +20,7 @@ #include "dnode.h" #include "vnodeVersion.h" #include "vnodeMain.h" +#include "vnodeStatus.h" uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver) { SVnodeObj *pVnode = vnodeAcquire(vgId); @@ -83,22 +84,34 @@ void vnodeCtrlFlow(int32_t vgId, int32_t level) { vnodeRelease(pVnode); } -int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) { +void vnodeStartSyncFile(int32_t vgId) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { - vError("vgId:%d, vnode not found while notify file synced", vgId); - return 0; + vError("vgId:%d, vnode not found while start filesync", vgId); + return; + } + + vDebug("vgId:%d, datafile will be synced", vgId); + vnodeSetResetStatus(pVnode); + + vnodeRelease(pVnode); +} + +void vnodeStopSyncFile(int32_t vgId, uint64_t fversion) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while stop filesync", vgId); + return; } pVnode->fversion = fversion; pVnode->version = fversion; vnodeSaveVersion(pVnode); - vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion); - int32_t code = vnodeReset(pVnode); + vDebug("vgId:%d, datafile is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion); + vnodeSetReadyStatus(pVnode); vnodeRelease(pVnode); - return code; } void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {