提交 40a591bd 编写于 作者: C Cary Xu

sync compatibility for upgrading

上级 466c05a4
...@@ -324,7 +324,7 @@ typedef struct { ...@@ -324,7 +324,7 @@ typedef struct {
} \ } \
} while (0); } while (0);
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver); void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver, uint8_t nFiles);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, uint32_t sfver); void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, uint32_t sfver);
......
...@@ -139,7 +139,7 @@ int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) { ...@@ -139,7 +139,7 @@ int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
if (did.level > TSDB_FSET_LEVEL(pSet)) { if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level // Need to move the FSET to higher level
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs)); tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs), TSDB_FILE_MAX);
if (tsdbCopyDFileSet(pSet, &nSet) < 0) { if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
...@@ -1598,7 +1598,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1598,7 +1598,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// Set and open commit FSET // Set and open commit FSET
if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) { if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
// Create a new FSET to write data // Create a new FSET to write data
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo))); tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_MAX);
if (tsdbCreateDFileSet(pWSet, true) < 0) { if (tsdbCreateDFileSet(pWSet, true) < 0) {
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo), tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
......
...@@ -197,7 +197,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { ...@@ -197,7 +197,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
} }
tsdbInitDFileSet(TSDB_COMPACT_WSET(pComph), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), tsdbInitDFileSet(TSDB_COMPACT_WSET(pComph), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo))); FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_MAX);
if (tsdbCreateDFileSet(TSDB_COMPACT_WSET(pComph), true) < 0) { if (tsdbCreateDFileSet(TSDB_COMPACT_WSET(pComph), true) < 0) {
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph); tsdbCompactFSetEnd(pComph);
......
...@@ -100,7 +100,8 @@ static int tsdbDecodeDFileSetArray(void **originBuf, void *buf, SArray *pArray, ...@@ -100,7 +100,8 @@ static int tsdbDecodeDFileSetArray(void **originBuf, void *buf, SArray *pArray,
buf = taosDecodeFixedU64(buf, &nset); buf = taosDecodeFixedU64(buf, &nset);
if (pSFSHeader->version == TSDB_FS_VER_0) { if (pSFSHeader->version == TSDB_FS_VER_0) {
uint64_t extendedSize = pSFSHeader->len + nset * TSDB_FILE_MAX * sizeof(TSDB_FVER_TYPE); // record fver in new version of 'current' file
uint64_t extendedSize = pSFSHeader->len + nset * TSDB_FILE_MAX * sizeof(TSDB_FVER_TYPE);
if (taosTSizeof(*originBuf) < extendedSize) { if (taosTSizeof(*originBuf) < extendedSize) {
size_t ptrDistance = POINTER_DISTANCE(buf, *originBuf); size_t ptrDistance = POINTER_DISTANCE(buf, *originBuf);
if (tsdbMakeRoom(originBuf, (size_t)extendedSize) < 0) { if (tsdbMakeRoom(originBuf, (size_t)extendedSize) < 0) {
......
...@@ -563,12 +563,12 @@ static int tsdbRollBackDFile(SDFile *pDFile) { ...@@ -563,12 +563,12 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
} }
// ============== Operations on SDFileSet // ============== Operations on SDFileSet
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver) { void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver, uint8_t nFiles) {
pSet->fid = fid; pSet->fid = fid;
pSet->state = 0; pSet->state = 0;
pSet->nFiles = TSDB_FILE_MAX; pSet->nFiles = nFiles;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < nFiles; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
tsdbInitDFile(pDFile, did, vid, fid, ver, ftype); tsdbInitDFile(pDFile, did, vid, fid, ver, ftype);
} }
...@@ -651,7 +651,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { ...@@ -651,7 +651,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
} }
int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) { int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) { if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) {
tsdbCloseDFileSet(pSet); tsdbCloseDFileSet(pSet);
tsdbRemoveDFileSet(pSet); tsdbRemoveDFileSet(pSet);
......
...@@ -466,7 +466,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { ...@@ -466,7 +466,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
return -1; return -1;
} }
tsdbInitDFileSet(&fset, did, REPO_ID(pRepo), pSynch->pdf->fid, FS_TXN_VERSION(pfs)); tsdbInitDFileSet(&fset, did, REPO_ID(pRepo), pSynch->pdf->fid, FS_TXN_VERSION(pfs), pSynch->pdf->nFiles);
// Create new FSET // Create new FSET
if (tsdbCreateDFileSet(&fset, false) < 0) { if (tsdbCreateDFileSet(&fset, false) < 0) {
...@@ -474,7 +474,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { ...@@ -474,7 +474,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
return -1; return -1;
} }
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < pSynch->pdf->nFiles; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); // local file SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); // local file
SDFile *pRDFile = TSDB_DFILE_IN_SET(pSynch->pdf, ftype); // remote file SDFile *pRDFile = TSDB_DFILE_IN_SET(pSynch->pdf, ftype); // remote file
...@@ -550,7 +550,10 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { ...@@ -550,7 +550,10 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
} }
static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) { static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { if (pSet1->nFiles != pSet2->nFiles) {
return false;
}
for (TSDB_FILE_T ftype = 0; ftype < pSet1->nFiles; ftype++) {
SDFile *pDFile1 = TSDB_DFILE_IN_SET(pSet1, ftype); SDFile *pDFile1 = TSDB_DFILE_IN_SET(pSet1, ftype);
SDFile *pDFile2 = TSDB_DFILE_IN_SET(pSet2, ftype); SDFile *pDFile2 = TSDB_DFILE_IN_SET(pSet2, ftype);
...@@ -592,7 +595,7 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { ...@@ -592,7 +595,7 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
if (toSend) { if (toSend) {
tsdbInfo("vgId:%d, fileset:%d will be sent", REPO_ID(pRepo), pSet->fid); tsdbInfo("vgId:%d, fileset:%d will be sent", REPO_ID(pRepo), pSet->fid);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
SDFile df = *TSDB_DFILE_IN_SET(pSet, ftype); SDFile df = *TSDB_DFILE_IN_SET(pSet, ftype);
if (tsdbOpenDFile(&df, O_RDONLY) < 0) { if (tsdbOpenDFile(&df, O_RDONLY) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册