From 9189ef5a876b7ffc5e7d20c165ee76e9ed6512b8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 18 Jan 2021 10:25:15 +0000 Subject: [PATCH] first sync draft --- src/inc/taoserror.h | 1 + src/tsdb/inc/tsdbFile.h | 8 +- src/tsdb/inc/tsdbint.h | 1 + src/tsdb/src/tsdbCommit.c | 8 +- src/tsdb/src/tsdbFile.c | 26 +-- src/tsdb/src/tsdbSync.c | 365 +++++++++++++++++++++++++++++++++++++- 6 files changed, 391 insertions(+), 18 deletions(-) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 6e62457261..43ac666f70 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -243,6 +243,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "File alrea TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "Need to reconfigure table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, 0, 0x0612, "Invalid information to create table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, 0, 0x0613, "No available disk") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, 0, 0x0614, "TSDB messed message") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "Invalid handle") diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 6ab439518e..200a310e18 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -58,10 +58,12 @@ void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile); int tsdbEncodeSMFile(void** buf, SMFile* pMFile); void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); int tsdbApplyMFileChange(SMFile* from, SMFile* to); -int tsdbCreateMFile(SMFile* pMFile); +int tsdbCreateMFile(SMFile* pMFile, bool updateHeader); int tsdbUpdateMFileHeader(SMFile* pMFile); int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo); int tsdbScanAndTryFixMFile(SMFile* pMFile); +int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); +void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; } @@ -171,7 +173,7 @@ void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile); int tsdbEncodeSDFile(void** buf, SDFile* pDFile); void* tsdbDecodeSDFile(void* buf, SDFile* pDFile); -int tsdbCreateDFile(SDFile* pDFile); +int tsdbCreateDFile(SDFile* pDFile, bool updateHeader); int tsdbUpdateDFileHeader(SDFile* pDFile); static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; } @@ -300,7 +302,7 @@ void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); -int tsdbCreateDFileSet(SDFileSet* pSet); +int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader); int tsdbUpdateDFileSetHeader(SDFileSet* pSet); int tsdbScanAndTryFixDFileSet(SDFileSet* pSet); diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 65aae83550..fcd5ffa6a7 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -40,6 +40,7 @@ #include "hash.h" #include "tarray.h" #include "tfs.h" +#include "tsocket.h" #include "tsdb.h" diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 3c28b43df8..1bed22bf23 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -145,7 +145,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { did.id = TFS_PRIMARY_ID; tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); - if (tsdbCreateMFile(&mf) < 0) { + if (tsdbCreateMFile(&mf, true) < 0) { return -1; } } else { @@ -1285,7 +1285,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid // Create a new FSET to write data tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo))); - if (tsdbCreateDFileSet(pWSet) < 0) { + if (tsdbCreateDFileSet(pWSet, true) < 0) { if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); } @@ -1304,7 +1304,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid // TSDB_FILE_HEAD SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); - if (tsdbCreateDFile(pWHeadf) < 0) { + if (tsdbCreateDFile(pWHeadf, true) < 0) { if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; @@ -1344,7 +1344,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); pCommith->isLFileSame = false; - if (tsdbCreateDFile(pWLastf) < 0) { + if (tsdbCreateDFile(pWLastf, true) < 0) { tsdbCloseDFileSet(pWSet); tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 8ed93b2015..74dee5e98b 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -24,8 +24,6 @@ static const char *TSDB_FNAME_SUFFIX[] = { }; static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); -static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo); -static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo); static int tsdbRollBackMFile(SMFile *pMFile); static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo); static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); @@ -86,17 +84,21 @@ int tsdbApplyMFileChange(SMFile *from, SMFile *to) { return 0; } -int tsdbCreateMFile(SMFile *pMFile) { +int tsdbCreateMFile(SMFile *pMFile, bool updateHeader) { ASSERT(pMFile->info.size == 0 && pMFile->info.magic == TSDB_FILE_INIT_MAGIC); char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), O_WRONLY | O_CREAT | O_EXCL, 0755); + pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), O_WRONLY | O_CREAT | O_TRUNC, 0755); if (pMFile->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + if (!updateHeader) { + return 0; + } + void *ptr = buf; tsdbEncodeMFInfo(&ptr, &(pMFile->info)); @@ -179,7 +181,7 @@ int tsdbScanAndTryFixMFile(SMFile *pMFile) { return 0; } -static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { +int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { int tlen = 0; tlen += taosEncodeVariantI64(buf, pInfo->size); @@ -191,7 +193,7 @@ static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { return tlen; } -static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) { +void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) { buf = taosDecodeVariantI64(buf, &(pInfo->size)); buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); @@ -260,17 +262,21 @@ void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) { return buf; } -int tsdbCreateDFile(SDFile *pDFile) { +int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) { ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC); char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_EXCL, 0755); + pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC, 0755); if (pDFile->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + if (!updateHeader) { + return 0; + } + void *ptr = buf; tsdbEncodeDFInfo(&ptr, &(pDFile->info)); @@ -457,9 +463,9 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { return 0; } -int tsdbCreateDFileSet(SDFileSet *pSet) { +int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { + if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) { tsdbCloseDFileSet(pSet); tsdbRemoveDFileSet(pSet); return -1; diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c index 6dea4a4e57..4e7dcb23e9 100644 --- a/src/tsdb/src/tsdbSync.c +++ b/src/tsdb/src/tsdbSync.c @@ -11,4 +11,367 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "tsdbint.h" + +static int tsdbSyncSendMeta(STsdbRepo *pRepo, int socketFd, SMFile *pmf); +static int tsdbSyncRecvMeta(STsdbRepo *pRepo, int socketFd); +static int tsdbSyncSendDFileSet(STsdbRepo *pRepo, int socketFd, SDFileSet *pOSet); +static int tsdbSyncRecvDFileSet(STsdbRepo *pRepo, int socketFd); +static bool tsdbIsFSetSame(SDFileSet *pSet1, SDFileSet *pSet2); + +int tsdbSyncSend(STsdbRepo *pRepo, int socketFd) { + STsdbFS * pfs = REPO_FS(pRepo); + SFSIter fsiter; + SDFileSet *pSet; + + // Disable commit while syncing TSDB files + sem_wait(&(pRepo->readyToCommit)); + + // Sync send meta file + if (tsdbSyncSendMeta(pRepo, socketFd, pfs->cstatus->pmf) < 0) { + tsdbError("vgId:%d failed to sync send meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); + sem_post(&(pRepo->readyToCommit)); + return -1; + } + + // Sync send SDFileSet + tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); + + while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) { + if (tsdbSyncSendDFileSet(pRepo, socketFd, pSet) < 0) { + sem_post(&(pRepo->readyToCommit)); + return -1; + } + } + + // Enable commit + sem_post(&(pRepo->readyToCommit)); + return 0; +} + +int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd) { + SFSIter fsiter; + SDFileSet *pSet; + SDFileSet dset; + SDFileSet *pRecvSet = &dset; + uint32_t tlen; + char buf[128]; + void * pBuf = NULL; + + tsdbStartFSTxn(pRepo, 0, 0); + + // Sync recv meta file from remote + if (tsdbSyncRecvMeta(pRepo, socketFd) < 0) { + // TODO + goto _err; + } + + // Sync recv SDFileSet + tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); + pSet = tsdbFSIterNext(&fsiter); + + if (taosReadMsg(socketFd, buf, sizeof(uint32_t)) < 0) { + // TODO + goto _err; + } + + taosDecodeFixedU32(buf, &tlen); + if (tlen == 0) { + // No more remote files + pRecvSet = NULL; + } else { + // Has remote files + if (tsdbMakeRoom(&pBuf, tlen) < 0) { + // TODO + goto _err; + } + + if (taosReadMsg(socketFd, pBuf, tlen) < tlen) { + // TODO + goto _err; + } + + pRecvSet = &dset; + tsdbDecodeDFileSet(pBuf, pRecvSet); + } + + while (true) { + if (pSet == NULL && pRecvSet == NULL) break; + + if (pSet == NULL) { + // TODO: local not has, copy from remote + // Process the next remote fset(next pRecvSet) + } else { + if (pRecvSet == NULL) { + // Remote not has, just remove this file + pSet = tsdbFSIterNext(&fsiter); + } else { + if (pSet->fid == pRecvSet->fid) { + if (tsdbIsFSetSame(pSet, pRecvSet)) { + tsdbUpdateDFileSet(REPO_FS(pRepo), pSet); + } else { + // Copy from remote + } + pSet = tsdbFSIterNext(&fsiter); + // Process the next remote fset + } else if (pSet->fid < pRecvSet->fid) { + // Remote has not, just remove this file + pSet = tsdbFSIterNext(&fsiter); + } else { + // not has, copy pRecvSet from remote + // Process the next remote fset + } + } + + } + } + + tsdbEndFSTxn(pRepo); + return 0; + +_err: + taosTZfree(pBuf); + tsdbEndFSTxnWithError(REPO_FS(pRepo)); + return -1; +} + +static int tsdbSyncSendMeta(STsdbRepo *pRepo, int socketFd, SMFile *pmf) { + void * pBuf = NULL; + uint32_t tlen = 0; + void * ptr; + SMFile mf; + SMFile * pMFile = NULL; + + if (pmf) { + // copy out + mf = *pmf; + pMFile = &mf; + } + + if (pMFile) { + tlen = tsdbEncodeMFInfo(NULL, TSDB_FILE_INFO(pMFile)) + sizeof(TSCKSUM); + } + + if (tsdbMakeRoom(&pBuf, sizeof(tlen) + tlen) < 0) { + return -1; + } + + ptr = pBuf; + taosEncodeFixedU32(&ptr, tlen); + if (pMFile) { + tsdbEncodeMFInfo(&ptr, TSDB_FILE_INFO(pMFile)); + taosCalcChecksumAppend(0, (uint8_t *)pBuf, POINTER_DISTANCE(ptr, pBuf)); + ptr = POINTER_SHIFT(ptr, sizeof(TSCKSUM)); + } + + if (taosWriteMsg(socketFd, pBuf, POINTER_DISTANCE(ptr, pBuf)) < POINTER_DISTANCE(ptr, pBuf)) { + tsdbError("vgId:%d failed to sync meta file since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (pMFile == NULL) { + // No meta file, no need to send + return 0; + } + + bool shouldSend = false; + { + // TODO: Recv command to know if need to send file + } + + if (shouldSend) { + if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { + tsdbError("vgId:%d failed to open meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + if (taosSendFile(socketFd, TSDB_FILE_FD(pMFile), 0, pMFile->info.size) < pMFile->info.size) { + tsdbError("vgId:%d failed to send meta file since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseMFile(pMFile); + goto _err; + } + + tsdbCloseMFile(pMFile); + } + + return 0; + +_err: + taosTZfree(pBuf); + return -1; +} + +static int tsdbSyncRecvMeta(STsdbRepo *pRepo, int socketFd) { + uint32_t tlen; + char buf[128]; + void * pBuf = NULL; + SMFInfo mfInfo; + SMFile * pMFile = pRepo->fs->cstatus->pmf; + SMFile mf; + + if (taosReadMsg(socketFd, (void *)buf, sizeof(int32_t)) < sizeof(int32_t)) { + tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + taosDecodeFixedU32(buf, &tlen); + + // Remote not has meta file, just remove meta file (do nothing) + if (tlen == 0) { + // TODO: need to notify remote? + return 0; + } + + if (tsdbMakeRoom(&pBuf, tlen) < 0) { + tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + if (taosReadMsg(socketFd, pBuf, tlen) < tlen) { + tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (!taosCheckChecksumWhole((uint8_t *)pBuf, tlen)) { + tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TSDB_CODE_TDB_MESSED_MSG; + goto _err; + } + + void *ptr = pBuf; + ptr = tsdbDecodeMFInfo(ptr, &mfInfo); + + if (pMFile != NULL && memcmp(&(pMFile->info), &mfInfo, sizeof(SMInfo)) == 0) { + // has file and same as remote, just keep the old one + tsdbUpdateMFile(REPO_FS(pRepo), pMFile); + // Notify remote that no need to send meta file + { + // TODO + } + } else { + // Need to copy meta file from remote + SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID}; + tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); + mf.info = mfInfo; + + // Create new file + if (tsdbCreateMFile(&mf, false) < 0) { + tsdbError("vgId:%d failed to create meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + // Notify remote to send meta file + { + // TODO + } + + if (taosCopyFds(socketFd, mf.fd, mfInfo.size) < 0) { + tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseMFile(&mf); + tsdbRemoveMFile(&mf); + goto _err; + } + + TSDB_FILE_FSYNC(&mf); + tsdbCloseMFile(&mf); + tsdbUpdateMFile(REPO_FS(pRepo), &mf); + } + + return 0; + +_err: + taosTZfree(pBuf); + return -1; +} + +static int tsdbSyncSendDFileSet(STsdbRepo *pRepo, int socketFd, SDFileSet *pOSet) { + void * pBuf = NULL; + uint32_t tlen = 0; + void * ptr; + SDFileSet dset; + SDFileSet *pSet = NULL; + + if (pOSet) { + dset = *pOSet; + pSet = &dset; + } + + if (pSet) { + tlen = tsdbEncodeDFileSet(NULL, pSet) + sizeof(TSCKSUM); + } + + if (tsdbMakeRoom(&pBuf, sizeof(tlen) + tlen) < 0) { + return -1; + } + + ptr = pBuf; + taosEncodeFixedU32(&ptr, tlen); + if (pSet) { + tsdbEncodeDFileSet(&ptr, pSet); + taosCalcChecksumAppend(0, (uint8_t *)pBuf, tlen); + ptr = POINTER_SHIFT(ptr, sizeof(TSCKSUM)); + } + + if (taosWriteMsg(socketFd, pBuf, POINTER_DISTANCE(ptr, pBuf)) < POINTER_DISTANCE(ptr, pBuf)) { + // TODO + goto _err; + } + + if (pSet == NULL) { + // No need to wait + return 0; + } + + bool shouldSend = false; + { + // TODO: Recv command to know if need to send file + } + + if (shouldSend) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); + + if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) { + // TODO + goto _err; + } + + if (taosSendFile(socketFd, TSDB_FILE_FD(pDFile), 0, pDFile->info.size) < pDFile->info.size) { + // TODO + tsdbCloseDFile(pDFile); + goto _err; + } + + tsdbCloseDFile(pDFile); + } + } + + taosTZfree(pBuf); + return 0; + +_err: + taosTZfree(pBuf); + return -1; +} + +static UNUSED_FUNC int tsdbSyncRecvDFileSet(STsdbRepo *pRepo, int socketFd) { + // TODO + return 0; +} + +static bool tsdbIsFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + if (memcmp(TSDB_FILE_INFO(TSDB_DFILE_IN_SET(pSet1, ftype)), TSDB_FILE_INFO(TSDB_DFILE_IN_SET(pSet2, ftype)), + sizeof(SDFInfo)) != 0) { + return false; + } + } + + return true; +} \ No newline at end of file -- GitLab