提交 9189ef5a 编写于 作者: H Hongze Cheng

first sync draft

上级 a52ca80a
...@@ -243,6 +243,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "File alrea ...@@ -243,6 +243,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "File alrea
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "Need to reconfigure table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "Need to reconfigure table")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, 0, 0x0612, "Invalid information to create table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, 0, 0x0612, "Invalid information to create table")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, 0, 0x0613, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, 0, 0x0613, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, 0, 0x0614, "TSDB messed message")
// query // query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "Invalid handle") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "Invalid handle")
......
...@@ -58,10 +58,12 @@ void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile); ...@@ -58,10 +58,12 @@ void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile);
int tsdbEncodeSMFile(void** buf, SMFile* pMFile); int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
int tsdbApplyMFileChange(SMFile* from, SMFile* to); int tsdbApplyMFileChange(SMFile* from, SMFile* to);
int tsdbCreateMFile(SMFile* pMFile); int tsdbCreateMFile(SMFile* pMFile, bool updateHeader);
int tsdbUpdateMFileHeader(SMFile* pMFile); int tsdbUpdateMFileHeader(SMFile* pMFile);
int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo); int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo);
int tsdbScanAndTryFixMFile(SMFile* pMFile); int tsdbScanAndTryFixMFile(SMFile* pMFile);
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; } static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; }
...@@ -171,7 +173,7 @@ void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, ...@@ -171,7 +173,7 @@ void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver,
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile); void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile); int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile); void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
int tsdbCreateDFile(SDFile* pDFile); int tsdbCreateDFile(SDFile* pDFile, bool updateHeader);
int tsdbUpdateDFileHeader(SDFile* pDFile); int tsdbUpdateDFileHeader(SDFile* pDFile);
static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; } static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; }
...@@ -300,7 +302,7 @@ void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); ...@@ -300,7 +302,7 @@ void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet); void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
int tsdbCreateDFileSet(SDFileSet* pSet); int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet); int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(SDFileSet* pSet); int tsdbScanAndTryFixDFileSet(SDFileSet* pSet);
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include "hash.h" #include "hash.h"
#include "tarray.h" #include "tarray.h"
#include "tfs.h" #include "tfs.h"
#include "tsocket.h"
#include "tsdb.h" #include "tsdb.h"
......
...@@ -145,7 +145,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { ...@@ -145,7 +145,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
did.id = TFS_PRIMARY_ID; did.id = TFS_PRIMARY_ID;
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateMFile(&mf) < 0) { if (tsdbCreateMFile(&mf, true) < 0) {
return -1; return -1;
} }
} else { } else {
...@@ -1285,7 +1285,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1285,7 +1285,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// Create a new FSET to write data // Create a new FSET to write data
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo))); tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateDFileSet(pWSet) < 0) { if (tsdbCreateDFileSet(pWSet, true) < 0) {
if (pCommith->isRFileSet) { if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh)); tsdbCloseAndUnsetFSet(&(pCommith->readh));
} }
...@@ -1304,7 +1304,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1304,7 +1304,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// TSDB_FILE_HEAD // TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pWHeadf) < 0) { if (tsdbCreateDFile(pWHeadf, true) < 0) {
if (pCommith->isRFileSet) { if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh)); tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1; return -1;
...@@ -1344,7 +1344,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1344,7 +1344,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false; pCommith->isLFileSame = false;
if (tsdbCreateDFile(pWLastf) < 0) { if (tsdbCreateDFile(pWLastf, true) < 0) {
tsdbCloseDFileSet(pWSet); tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf); tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) { if (pCommith->isRFileSet) {
......
...@@ -24,8 +24,6 @@ static const char *TSDB_FNAME_SUFFIX[] = { ...@@ -24,8 +24,6 @@ static const char *TSDB_FNAME_SUFFIX[] = {
}; };
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo);
static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo);
static int tsdbRollBackMFile(SMFile *pMFile); static int tsdbRollBackMFile(SMFile *pMFile);
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo); static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo);
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo);
...@@ -86,17 +84,21 @@ int tsdbApplyMFileChange(SMFile *from, SMFile *to) { ...@@ -86,17 +84,21 @@ int tsdbApplyMFileChange(SMFile *from, SMFile *to) {
return 0; return 0;
} }
int tsdbCreateMFile(SMFile *pMFile) { int tsdbCreateMFile(SMFile *pMFile, bool updateHeader) {
ASSERT(pMFile->info.size == 0 && pMFile->info.magic == TSDB_FILE_INIT_MAGIC); ASSERT(pMFile->info.size == 0 && pMFile->info.magic == TSDB_FILE_INIT_MAGIC);
char buf[TSDB_FILE_HEAD_SIZE] = "\0"; char buf[TSDB_FILE_HEAD_SIZE] = "\0";
pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), O_WRONLY | O_CREAT | O_EXCL, 0755); pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), O_WRONLY | O_CREAT | O_TRUNC, 0755);
if (pMFile->fd < 0) { if (pMFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (!updateHeader) {
return 0;
}
void *ptr = buf; void *ptr = buf;
tsdbEncodeMFInfo(&ptr, &(pMFile->info)); tsdbEncodeMFInfo(&ptr, &(pMFile->info));
...@@ -179,7 +181,7 @@ int tsdbScanAndTryFixMFile(SMFile *pMFile) { ...@@ -179,7 +181,7 @@ int tsdbScanAndTryFixMFile(SMFile *pMFile) {
return 0; return 0;
} }
static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeVariantI64(buf, pInfo->size); tlen += taosEncodeVariantI64(buf, pInfo->size);
...@@ -191,7 +193,7 @@ static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { ...@@ -191,7 +193,7 @@ static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) {
return tlen; return tlen;
} }
static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) { void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) {
buf = taosDecodeVariantI64(buf, &(pInfo->size)); buf = taosDecodeVariantI64(buf, &(pInfo->size));
buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
...@@ -260,17 +262,21 @@ void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) { ...@@ -260,17 +262,21 @@ void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) {
return buf; return buf;
} }
int tsdbCreateDFile(SDFile *pDFile) { int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) {
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC); ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
char buf[TSDB_FILE_HEAD_SIZE] = "\0"; char buf[TSDB_FILE_HEAD_SIZE] = "\0";
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_EXCL, 0755); pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC, 0755);
if (pDFile->fd < 0) { if (pDFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (!updateHeader) {
return 0;
}
void *ptr = buf; void *ptr = buf;
tsdbEncodeDFInfo(&ptr, &(pDFile->info)); tsdbEncodeDFInfo(&ptr, &(pDFile->info));
...@@ -457,9 +463,9 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { ...@@ -457,9 +463,9 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
return 0; return 0;
} }
int tsdbCreateDFileSet(SDFileSet *pSet) { int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
tsdbCloseDFileSet(pSet); tsdbCloseDFileSet(pSet);
tsdbRemoveDFileSet(pSet); tsdbRemoveDFileSet(pSet);
return -1; return -1;
......
...@@ -12,3 +12,366 @@ ...@@ -12,3 +12,366 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tsdbint.h"
static int tsdbSyncSendMeta(STsdbRepo *pRepo, int socketFd, SMFile *pmf);
static int tsdbSyncRecvMeta(STsdbRepo *pRepo, int socketFd);
static int tsdbSyncSendDFileSet(STsdbRepo *pRepo, int socketFd, SDFileSet *pOSet);
static int tsdbSyncRecvDFileSet(STsdbRepo *pRepo, int socketFd);
static bool tsdbIsFSetSame(SDFileSet *pSet1, SDFileSet *pSet2);
int tsdbSyncSend(STsdbRepo *pRepo, int socketFd) {
STsdbFS * pfs = REPO_FS(pRepo);
SFSIter fsiter;
SDFileSet *pSet;
// Disable commit while syncing TSDB files
sem_wait(&(pRepo->readyToCommit));
// Sync send meta file
if (tsdbSyncSendMeta(pRepo, socketFd, pfs->cstatus->pmf) < 0) {
tsdbError("vgId:%d failed to sync send meta file since %s", REPO_ID(pRepo), tstrerror(terrno));
sem_post(&(pRepo->readyToCommit));
return -1;
}
// Sync send SDFileSet
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) {
if (tsdbSyncSendDFileSet(pRepo, socketFd, pSet) < 0) {
sem_post(&(pRepo->readyToCommit));
return -1;
}
}
// Enable commit
sem_post(&(pRepo->readyToCommit));
return 0;
}
int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd) {
SFSIter fsiter;
SDFileSet *pSet;
SDFileSet dset;
SDFileSet *pRecvSet = &dset;
uint32_t tlen;
char buf[128];
void * pBuf = NULL;
tsdbStartFSTxn(pRepo, 0, 0);
// Sync recv meta file from remote
if (tsdbSyncRecvMeta(pRepo, socketFd) < 0) {
// TODO
goto _err;
}
// Sync recv SDFileSet
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
pSet = tsdbFSIterNext(&fsiter);
if (taosReadMsg(socketFd, buf, sizeof(uint32_t)) < 0) {
// TODO
goto _err;
}
taosDecodeFixedU32(buf, &tlen);
if (tlen == 0) {
// No more remote files
pRecvSet = NULL;
} else {
// Has remote files
if (tsdbMakeRoom(&pBuf, tlen) < 0) {
// TODO
goto _err;
}
if (taosReadMsg(socketFd, pBuf, tlen) < tlen) {
// TODO
goto _err;
}
pRecvSet = &dset;
tsdbDecodeDFileSet(pBuf, pRecvSet);
}
while (true) {
if (pSet == NULL && pRecvSet == NULL) break;
if (pSet == NULL) {
// TODO: local not has, copy from remote
// Process the next remote fset(next pRecvSet)
} else {
if (pRecvSet == NULL) {
// Remote not has, just remove this file
pSet = tsdbFSIterNext(&fsiter);
} else {
if (pSet->fid == pRecvSet->fid) {
if (tsdbIsFSetSame(pSet, pRecvSet)) {
tsdbUpdateDFileSet(REPO_FS(pRepo), pSet);
} else {
// Copy from remote
}
pSet = tsdbFSIterNext(&fsiter);
// Process the next remote fset
} else if (pSet->fid < pRecvSet->fid) {
// Remote has not, just remove this file
pSet = tsdbFSIterNext(&fsiter);
} else {
// not has, copy pRecvSet from remote
// Process the next remote fset
}
}
}
}
tsdbEndFSTxn(pRepo);
return 0;
_err:
taosTZfree(pBuf);
tsdbEndFSTxnWithError(REPO_FS(pRepo));
return -1;
}
static int tsdbSyncSendMeta(STsdbRepo *pRepo, int socketFd, SMFile *pmf) {
void * pBuf = NULL;
uint32_t tlen = 0;
void * ptr;
SMFile mf;
SMFile * pMFile = NULL;
if (pmf) {
// copy out
mf = *pmf;
pMFile = &mf;
}
if (pMFile) {
tlen = tsdbEncodeMFInfo(NULL, TSDB_FILE_INFO(pMFile)) + sizeof(TSCKSUM);
}
if (tsdbMakeRoom(&pBuf, sizeof(tlen) + tlen) < 0) {
return -1;
}
ptr = pBuf;
taosEncodeFixedU32(&ptr, tlen);
if (pMFile) {
tsdbEncodeMFInfo(&ptr, TSDB_FILE_INFO(pMFile));
taosCalcChecksumAppend(0, (uint8_t *)pBuf, POINTER_DISTANCE(ptr, pBuf));
ptr = POINTER_SHIFT(ptr, sizeof(TSCKSUM));
}
if (taosWriteMsg(socketFd, pBuf, POINTER_DISTANCE(ptr, pBuf)) < POINTER_DISTANCE(ptr, pBuf)) {
tsdbError("vgId:%d failed to sync meta file since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (pMFile == NULL) {
// No meta file, no need to send
return 0;
}
bool shouldSend = false;
{
// TODO: Recv command to know if need to send file
}
if (shouldSend) {
if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) {
tsdbError("vgId:%d failed to open meta file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (taosSendFile(socketFd, TSDB_FILE_FD(pMFile), 0, pMFile->info.size) < pMFile->info.size) {
tsdbError("vgId:%d failed to send meta file since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseMFile(pMFile);
goto _err;
}
tsdbCloseMFile(pMFile);
}
return 0;
_err:
taosTZfree(pBuf);
return -1;
}
static int tsdbSyncRecvMeta(STsdbRepo *pRepo, int socketFd) {
uint32_t tlen;
char buf[128];
void * pBuf = NULL;
SMFInfo mfInfo;
SMFile * pMFile = pRepo->fs->cstatus->pmf;
SMFile mf;
if (taosReadMsg(socketFd, (void *)buf, sizeof(int32_t)) < sizeof(int32_t)) {
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosDecodeFixedU32(buf, &tlen);
// Remote not has meta file, just remove meta file (do nothing)
if (tlen == 0) {
// TODO: need to notify remote?
return 0;
}
if (tsdbMakeRoom(&pBuf, tlen) < 0) {
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (taosReadMsg(socketFd, pBuf, tlen) < tlen) {
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)pBuf, tlen)) {
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno));
terrno = TSDB_CODE_TDB_MESSED_MSG;
goto _err;
}
void *ptr = pBuf;
ptr = tsdbDecodeMFInfo(ptr, &mfInfo);
if (pMFile != NULL && memcmp(&(pMFile->info), &mfInfo, sizeof(SMInfo)) == 0) {
// has file and same as remote, just keep the old one
tsdbUpdateMFile(REPO_FS(pRepo), pMFile);
// Notify remote that no need to send meta file
{
// TODO
}
} else {
// Need to copy meta file from remote
SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID};
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
mf.info = mfInfo;
// Create new file
if (tsdbCreateMFile(&mf, false) < 0) {
tsdbError("vgId:%d failed to create meta file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Notify remote to send meta file
{
// TODO
}
if (taosCopyFds(socketFd, mf.fd, mfInfo.size) < 0) {
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseMFile(&mf);
tsdbRemoveMFile(&mf);
goto _err;
}
TSDB_FILE_FSYNC(&mf);
tsdbCloseMFile(&mf);
tsdbUpdateMFile(REPO_FS(pRepo), &mf);
}
return 0;
_err:
taosTZfree(pBuf);
return -1;
}
static int tsdbSyncSendDFileSet(STsdbRepo *pRepo, int socketFd, SDFileSet *pOSet) {
void * pBuf = NULL;
uint32_t tlen = 0;
void * ptr;
SDFileSet dset;
SDFileSet *pSet = NULL;
if (pOSet) {
dset = *pOSet;
pSet = &dset;
}
if (pSet) {
tlen = tsdbEncodeDFileSet(NULL, pSet) + sizeof(TSCKSUM);
}
if (tsdbMakeRoom(&pBuf, sizeof(tlen) + tlen) < 0) {
return -1;
}
ptr = pBuf;
taosEncodeFixedU32(&ptr, tlen);
if (pSet) {
tsdbEncodeDFileSet(&ptr, pSet);
taosCalcChecksumAppend(0, (uint8_t *)pBuf, tlen);
ptr = POINTER_SHIFT(ptr, sizeof(TSCKSUM));
}
if (taosWriteMsg(socketFd, pBuf, POINTER_DISTANCE(ptr, pBuf)) < POINTER_DISTANCE(ptr, pBuf)) {
// TODO
goto _err;
}
if (pSet == NULL) {
// No need to wait
return 0;
}
bool shouldSend = false;
{
// TODO: Recv command to know if need to send file
}
if (shouldSend) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) {
// TODO
goto _err;
}
if (taosSendFile(socketFd, TSDB_FILE_FD(pDFile), 0, pDFile->info.size) < pDFile->info.size) {
// TODO
tsdbCloseDFile(pDFile);
goto _err;
}
tsdbCloseDFile(pDFile);
}
}
taosTZfree(pBuf);
return 0;
_err:
taosTZfree(pBuf);
return -1;
}
static UNUSED_FUNC int tsdbSyncRecvDFileSet(STsdbRepo *pRepo, int socketFd) {
// TODO
return 0;
}
static bool tsdbIsFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (memcmp(TSDB_FILE_INFO(TSDB_DFILE_IN_SET(pSet1, ftype)), TSDB_FILE_INFO(TSDB_DFILE_IN_SET(pSet2, ftype)),
sizeof(SDFInfo)) != 0) {
return false;
}
}
return true;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册