diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index faae5e062538342ccf7755b8cfaea6702572475c..2e6cb4c406e513f360f59003c0f7bd6ecf910a78 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -16,9 +16,9 @@ #ifndef _TD_VNODE_TSDB_H_ #define _TD_VNODE_TSDB_H_ -//#include "../tsdb/tsdbFile2.h" -//#include "../tsdb/tsdbMerge.h" -//#include "../tsdb/tsdbSttFileRW.h" +// #include "../tsdb/tsdbFile2.h" +// #include "../tsdb/tsdbMerge.h" +// #include "../tsdb/tsdbSttFileRW.h" #include "tsimplehash.h" #include "vnodeInt.h" @@ -703,20 +703,20 @@ typedef struct { typedef struct SSttBlockLoadInfo { SBlockData blockData[2]; - void* pBlockArray; - - SArray *aSttBlk; - int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. - int32_t currentLoadBlockIndex; - int32_t loadBlocks; - double elapsedTime; - STSchema *pSchema; - int16_t *colIds; - int32_t numOfCols; - bool checkRemainingRow; - bool isLast; - bool sttBlockLoaded; - int32_t numOfStt; + void *pBlockArray; + + SArray *aSttBlk; + int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. + int32_t currentLoadBlockIndex; + int32_t loadBlocks; + double elapsedTime; + STSchema *pSchema; + int16_t *colIds; + int32_t numOfCols; + bool checkRemainingRow; + bool isLast; + bool sttBlockLoaded; + int32_t numOfStt; // keep the last access position, this position may be used to reduce the binary times for // starting last block data for a new table @@ -775,19 +775,19 @@ struct SDiskDataBuilder { }; typedef struct SLDataIter { - SRBTreeNode node; - SSttBlk *pSttBlk; - int32_t iStt; - int8_t backward; - int32_t iSttBlk; - int32_t iRow; - SRowInfo rInfo; - uint64_t uid; - STimeWindow timeWindow; - SVersionRange verRange; - SSttBlockLoadInfo *pBlockLoadInfo; - bool ignoreEarlierTs; - struct SSttFileReader* pReader; + SRBTreeNode node; + SSttBlk *pSttBlk; + int32_t iStt; + int8_t backward; + int32_t iSttBlk; + int32_t iRow; + SRowInfo rInfo; + uint64_t uid; + STimeWindow timeWindow; + SVersionRange verRange; + SSttBlockLoadInfo *pBlockLoadInfo; + bool ignoreEarlierTs; + struct SSttFileReader *pReader; } SLDataIter; #define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row)) @@ -795,21 +795,21 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); -int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint64_t suid, uint64_t uid, +int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter, void* pCurrentFileSet); - + bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter, + void *pCurrentFileSet); -void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); -bool tMergeTreeNext(SMergeTree *pMTree); -bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); -void tMergeTreeClose(SMergeTree *pMTree); +void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); +bool tMergeTreeNext(SMergeTree *pMTree); +bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); +void tMergeTreeClose(SMergeTree *pMTree); SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); -void destroySttBlockReader(SLDataIter* pLDataIter, int32_t numOfIter); +void destroySttBlockReader(SLDataIter *pLDataIter, int32_t numOfIter); // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 4de7705919790da4e0be2e4867aa845f59f8592d..db7abf443c111ac2efba988efb442733c0f5b81f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -142,10 +142,10 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { if (record->ekey < committer->ctx->minKey) { - continue; + goto _next; } else if (record->skey > committer->ctx->maxKey) { committer->ctx->maxKey = TMIN(record->skey, committer->ctx->maxKey); - continue; + goto _next; } if (record->ekey > committer->ctx->maxKey) { @@ -158,6 +158,7 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { code = tsdbFSetWriteTombRecord(committer->writer, record); TSDB_CHECK_CODE(code, lino, _exit); + _next: code = tsdbIterMergerNext(committer->tombIterMerger); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 36902286ebdca336d6d1a956c1e77b3ec39c9b78..3c1dccc4f3cbfce446961bf723e3893833e426ce 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -269,7 +269,7 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) { return 0; } -static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { +void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { SVnode *pVnode = pTsdb->pVnode; if (pVnode->pTfs) { if (current) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 776fc5d7290847927d6147b4acf1999d590786d0..5cc2771565fe19d860a16e57c28ea2f136b1e0a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -14,6 +14,7 @@ */ #include "tsdbFS2.h" +#include "tsdbUpgrade.h" extern int vnodeScheduleTask(int (*execute)(void *), void *arg); extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); @@ -28,12 +29,6 @@ enum { TSDB_FS_STATE_CLOSE, }; -typedef enum { - TSDB_FCURRENT = 1, - TSDB_FCURRENT_C, // for commit - TSDB_FCURRENT_M, // for merge -} EFCurrentT; - static const char *gCurrentFname[] = { [TSDB_FCURRENT] = "current.json", [TSDB_FCURRENT_C] = "current.c.json", @@ -73,7 +68,7 @@ static int32_t destroy_fs(STFileSystem **fs) { return 0; } -static int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) { +int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) { if (pTsdb->pVnode->pTfs) { snprintf(fname, // TSDB_FILENAME_LEN, // @@ -161,7 +156,7 @@ _exit: return code; } -static int32_t save_fs(const TFileSetArray *arr, const char *fname) { +int32_t save_fs(const TFileSetArray *arr, const char *fname) { int32_t code = 0; int32_t lino = 0; @@ -375,11 +370,6 @@ static int32_t tsdbFSScanAndFix(STFileSystem *fs) { return 0; } -static int32_t update_fs_if_needed(STFileSystem *pFS) { - // TODO - return 0; -} - static int32_t tsdbFSDupState(STFileSystem *fs) { int32_t code; @@ -405,9 +395,6 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) { int32_t lino = 0; STsdb *pTsdb = fs->tsdb; - code = update_fs_if_needed(fs); - TSDB_CHECK_CODE(code, lino, _exit); - char fCurrent[TSDB_FILENAME_LEN]; char cCurrent[TSDB_FILENAME_LEN]; char mCurrent[TSDB_FILENAME_LEN]; @@ -483,10 +470,13 @@ static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSe } static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) { - int32_t code = 0; - int32_t lino = 0; - TFileSetArray *fsetArray = fs->fSetArrTmp; + int32_t code = 0; + int32_t lino = 0; + code = tsdbFSDupState(fs); + if (code) return code; + + TFileSetArray *fsetArray = fs->fSetArrTmp; STFileSet *fset = NULL; const STFileOp *op; TARRAY2_FOREACH_PTR(opArray, op) { @@ -527,6 +517,9 @@ int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) { int32_t code; int32_t lino; + code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback); + TSDB_CHECK_CODE(code, lino, _exit); + code = create_fs(pTsdb, fs); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 36156d066206760181bd0d9098244d0106b42711..8dff77a6bca6c78177bd4b12f92257137891cb58 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -38,6 +38,12 @@ typedef enum { TSDB_BG_TASK_COMPACT, } EFSBgTaskT; +typedef enum { + TSDB_FCURRENT = 1, + TSDB_FCURRENT_C, // for commit + TSDB_FCURRENT_M, // for merge +} EFCurrentT; + /* Exposed APIs */ // open/close int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback); diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c new file mode 100644 index 0000000000000000000000000000000000000000..22be8de23aef5f5b6f520b9e38e0ba92d52d5dba --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbUpgrade.h" + +// old +extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t); + +// new +extern int32_t save_fs(const TFileSetArray *arr, const char *fname); +extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); + +static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArray *fileSetArray) { + int32_t code = 0; + int32_t lino = 0; + + SDataFReader *reader; + + code = tsdbDataFReaderOpen(&reader, tsdb, pDFileSet); + TSDB_CHECK_CODE(code, lino, _exit); + + // .head + { + SArray *aBlockIdx = NULL; + SMapData mDataBlk[1] = {0}; + SBrinBlock brinBlock[1] = {0}; + TBrinBlkArray brinBlkArray[1] = {0}; + + if ((aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbReadBlockIdx(reader, aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) { + SBlockIdx *pBlockIdx = taosArrayGet(aBlockIdx, i); + + code = tsdbReadDataBlk(reader, pBlockIdx, mDataBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t j = 0; j < mDataBlk->nItem; ++j) { + SDataBlk dataBlk[1]; + + tMapDataGetItemByIdx(mDataBlk, j, dataBlk, tGetDataBlk); + + SBrinRecord record = { + .suid = pBlockIdx->suid, + .uid = pBlockIdx->uid, + .firstKey = dataBlk->minKey.ts, + .firstKeyVer = dataBlk->minKey.version, + .lastKey = dataBlk->maxKey.ts, + .lastKeyVer = dataBlk->maxKey.version, + .minVer = dataBlk->minVer, + .maxVer = dataBlk->maxVer, + .blockOffset = dataBlk->aSubBlock->offset, + .smaOffset = dataBlk->smaInfo.offset, + .blockSize = dataBlk->aSubBlock->szBlock, + .blockKeySize = dataBlk->aSubBlock->szKey, + .smaSize = dataBlk->smaInfo.size, + .numRow = dataBlk->nRow, + .count = dataBlk->nRow, + }; + + if (dataBlk->hasDup) { + ASSERT(0); + // TODO: need to get count + // record.count = 0; + } + + code = tBrinBlockPut(brinBlock, &record); + TSDB_CHECK_CODE(code, lino, _exit); + + if (BRIN_BLOCK_SIZE(brinBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) { + // TODO + tBrinBlockClear(brinBlock); + } + } + } + + if (BRIN_BLOCK_SIZE(brinBlock) > 0) { + // TODO + ASSERT(0); + } + + // TODO + ASSERT(0); + + TARRAY2_DESTROY(brinBlkArray, NULL); + tBrinBlockDestroy(brinBlock); + taosArrayDestroy(aBlockIdx); + tMapDataClear(mDataBlk); + } + + // .data + + // .sma + + // .stt + for (int32_t i = 0; i < pDFileSet->nSttF; ++i) { + // TODO + } + + tsdbDataFReaderClose(&reader); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArray *fileSetArray) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + + ASSERT(0); +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { + int32_t code = 0; + int32_t lino = 0; + + TFileSetArray fileSetArray[1] = {0}; + + // load old file system and convert + code = tsdbFSOpen(tsdb, rollback); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) { + SDFileSet *pDFileSet = taosArrayGet(tsdb->fs.aDFileSet, i); + + code = tsdbUpgradeFileSet(tsdb, pDFileSet, fileSetArray); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (tsdb->fs.pDelFile != NULL) { + code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbFSClose(tsdb); + TSDB_CHECK_CODE(code, lino, _exit); + + // save new file system + char fname[TSDB_FILENAME_LEN]; + current_fname(tsdb, fname, TSDB_FCURRENT); + + code = save_fs(fileSetArray, NULL); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { + char fname[TSDB_FILENAME_LEN]; + + tsdbGetCurrentFName(tsdb, fname, NULL); + if (!taosCheckExistFile(fname)) return 0; + + int32_t code = tsdbDoUpgradeFileSystem(tsdb, rollback); + if (code) return code; + + taosRemoveFile(fname); + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.h b/source/dnode/vnode/src/tsdb/tsdbUpgrade.h new file mode 100644 index 0000000000000000000000000000000000000000..4dec009613efcac6ef01298ca3f82b4e2a9bdcaf --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" +#include "tsdbDef.h" +#include "tsdbFS2.h" +#include "tsdbUtil2.h" + +#ifndef _TSDB_UPGRADE_H_ +#define _TSDB_UPGRADE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback); + +#ifdef __cplusplus +} +#endif + +#endif /*_TSDB_UPGRADE_H_*/ \ No newline at end of file