diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2e6cb4c406e513f360f59003c0f7bd6ecf910a78..d33b8b00bf1a4e43523ff950bf70b386a171219e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -370,12 +370,12 @@ struct STsdb { TdThreadRwlock rwLock; SMemTable *mem; SMemTable *imem; - STsdbFS fs; + STsdbFS fs; // old SLRUCache *lruCache; TdThreadMutex lruMutex; SLRUCache *biCache; TdThreadMutex biMutex; - struct STFileSystem *pFS; + struct STFileSystem *pFS; // new SRocksCache rCache; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index db7abf443c111ac2efba988efb442733c0f5b81f..e316a79d92e6e293bfbe5eb80231cb5e148d3485 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -122,6 +122,14 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { committer->ctx->tbid->uid = row->uid; } + int64_t ts = TSDBROW_TS(&row->row); + if (ts > committer->ctx->maxKey) { + committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts); + code = tsdbIterMergerSkipTableData(committer->dataIterMerger, (TABLEID *)row); + TSDB_CHECK_CODE(code, lino, _exit); + continue; + } + code = tsdbFSetWriteRow(committer->writer, row); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h index b26d2f743ae76f698ee9061d5bc5f35af4cb3a7f..bc9b784e16a239ff3c9b9e6250662730b8237a8b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h @@ -25,7 +25,6 @@ extern "C" { typedef TARRAY2(SSttBlk) TSttBlkArray; typedef TARRAY2(SStatisBlk) TStatisBlkArray; -typedef TARRAY2(STombBlk) TTombBlkArray; // SSttFileReader ========================================== typedef struct SSttFileReader SSttFileReader; diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 22be8de23aef5f5b6f520b9e38e0ba92d52d5dba..e348e60e74fd52dc6cca56a6a03478b187f880ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -123,13 +123,130 @@ _exit: return code; } +static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *aDelIdx, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + + SArray *aDelData = NULL; + int64_t minKey, maxKey; + STombBlock tombBlock[1] = {0}; + TTombBlkArray tombBlkArray[1] = {0}; + STsdbFD *fd = NULL; + + tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); + + if ((aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + for (int32_t i = 0; i < taosArrayGetSize(aDelIdx); ++i) { + SDelIdx *pDelIdx = taosArrayGet(aDelIdx, i); + + code = tsdbReadDelData(reader, pDelIdx, aDelData); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t j = 0; j < taosArrayGetSize(aDelData); ++j) { + SDelData *pDelData = taosArrayGet(aDelData, j); + + if (pDelData->sKey > maxKey || pDelData->eKey < minKey) { + continue; + } + + STombRecord record = { + .suid = pDelIdx->suid, + .uid = pDelIdx->uid, + .version = pDelData->version, + .skey = pDelData->sKey, + .ekey = pDelData->eKey, + }; + + code = tTombBlockPut(tombBlock, &record); + TSDB_CHECK_CODE(code, lino, _exit); + + if (TOMB_BLOCK_SIZE(tombBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) { + if (fd == NULL) { + STFile file = { + .type = TSDB_FTYPE_TOMB, + .did = {0}, // TODO + .fid = fset->fid, + .cid = 0, // TODO + }; + + code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_TOMB]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbOpenFile(fset->farr[TSDB_FTYPE_TOMB]->fname, tsdb->pVnode->config.tsdbPageSize, + TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC, &fd); + TSDB_CHECK_CODE(code, lino, _exit); + + uint8_t hdr[TSDB_FHDR_SIZE] = {0}; + code = tsdbWriteFile(fd, 0, hdr, TSDB_FHDR_SIZE); + TSDB_CHECK_CODE(code, lino, _exit); + fset->farr[TSDB_FTYPE_TOMB]->f->size += sizeof(hdr); + } + + // TODO + tTombBlockClear(tombBlock); + } + } + } + + if (TOMB_BLOCK_SIZE(tombBlock) > 0) { + // TODO + tTombBlockClear(tombBlock); + } + + if (TARRAY2_SIZE(tombBlkArray) > 0) { + // TODO + } + + if (fd) { + code = tsdbFsyncFile(fd); + TSDB_CHECK_CODE(code, lino, _exit); + + tsdbCloseFile(&fd); + } + TARRAY2_DESTROY(tombBlkArray, NULL); + tTombBlockDestroy(tombBlock); + taosArrayDestroy(aDelData); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArray *fileSetArray) { int32_t code = 0; int32_t lino = 0; - // TODO + SDelFReader *reader = NULL; + SArray *aDelIdx = NULL; + + if ((aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbDelFReaderOpen(&reader, pDelFile, tsdb); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbReadDelIdx(reader, aDelIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(aDelIdx) > 0) { + STFileSet *fset; + TARRAY2_FOREACH(fileSetArray, fset) { + code = tsdbDumpTombDataToFSet(tsdb, reader, aDelIdx, fset); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + tsdbDelFReaderClose(&reader); + taosArrayDestroy(aDelIdx); - ASSERT(0); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); @@ -166,9 +283,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { char fname[TSDB_FILENAME_LEN]; current_fname(tsdb, fname, TSDB_FCURRENT); - code = save_fs(fileSetArray, NULL); + code = save_fs(fileSetArray, fname); TSDB_CHECK_CODE(code, lino, _exit); + TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear); + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.h b/source/dnode/vnode/src/tsdb/tsdbUtil2.h index 69226477bab1e327f08669a159f0e507d86b94e3..87a98203b265efbf11cce382610d9157ef70eca0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.h +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.h @@ -58,6 +58,8 @@ typedef struct { int8_t rsvd[7]; } STombBlk; +typedef TARRAY2(STombBlk) TTombBlkArray; + #define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) int32_t tTombBlockInit(STombBlock *tombBlock);