diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 34ded9f26372682f2686a1bd9b70c081bad69b91..0b88424015e22e12e704e26949b5748df25ab424 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -72,6 +72,7 @@ typedef struct SDelFReader SDelFReader; #define HAS_VALUE ((int8_t)0x4) // tsdbUtil.c ============================================================================================== // TSDBROW +#define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow) #define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)}); #define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .pTSRow = (IROW)}); TSDBKEY tsdbRowKey(TSDBROW *pRow); @@ -149,9 +150,18 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, S TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); // tsdbFile.c ============================================================================================== +// SDataFSet +// SHeadFile +void tsdbHeadFileName(STsdb *pTsdb, SHeadFile *pFile, char fname[]); +// SDataFile +void tsdbDataFileName(STsdb *pTsdb, SDataFile *pFile, char fname[]); +// SLastFile +void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]); +// SSmaFile +void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]); // SDelFile #define tsdbDelFileCreate() ((SDelFile){.info = KEYINFO_INIT_VAL, .size = 0, .offset = 0}) -char *tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile); +void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); // tsdbFS.c ============================================================================================== typedef struct STsdbFS STsdbFS; int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS); @@ -442,6 +452,8 @@ struct SSmaFile { }; struct SDFileSet { + SDiskID diskId; + int32_t nRef; SHeadFile *pHeadFile; SDataFile *pDataFile; SLastFile *pLastFile; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 38ee34b5b45179584ee357e0fe08dc72470fdd13..5c01333fc7d1bb089c9df116966da658fa910252 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -339,10 +339,16 @@ _err: static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey, bool toDataOnly) { - int32_t code = 0; - TSDBROW *pRow; - SBlock block = tBlockInit(); + int32_t code = 0; + TSDBROW *pRow; + STSchema *pTSchema = NULL; // TODO + TSDBKEY key; + SBlock *pBlock = &pCommitter->nBlock; + + if (pIter == NULL) goto _exit; + tBlockReset(pBlock); + tBlockDataReset(&pCommitter->nBlockData); while (true) { pRow = tsdbTbDataIterGet(pIter); @@ -354,30 +360,55 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx } } - code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, NULL /*TODO*/); + // update schema + if (pTSchema == NULL || pTSchema->version != TSDBROW_SVERSION(pRow)) { + // TODO + // pTSchema = NULL; + } + + // append row + code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, pTSchema); if (code) goto _err; + // update info + key = tsdbRowKey(pRow); + if (tsdbKeyCmprFn(&key, &pBlock->info.maxKey) > 0) pBlock->info.maxKey = key; + if (tsdbKeyCmprFn(&key, &pBlock->info.minKey) < 0) pBlock->info.minKey = key; + if (key.version > pBlock->info.maxVersion) pBlock->info.maxVersion = key.version; + if (key.version < pBlock->info.minVerion) pBlock->info.minVerion = key.version; + + // iter next + tsdbTbDataIterNext(pIter); + + // check write if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) { continue; } _write_block_data: if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) { - block.last = 1; + pCommitter->nBlock.last = 1; } else { - block.last = 0; + pCommitter->nBlock.last = 0; } - code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block); + code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, pBlock); if (code) goto _err; - code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock); + code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); if (code) goto _err; - tBlockReset(&block); + // update info + if (tsdbKeyCmprFn(&pBlock->info.minKey, &pBlockIdx->info.minKey) < 0) pBlock->info.minKey = pBlockIdx->info.minKey; + if (tsdbKeyCmprFn(&pBlock->info.maxKey, &pBlockIdx->info.maxKey) < 0) pBlock->info.maxKey = pBlockIdx->info.maxKey; + if (pBlock->info.minVerion < pBlockIdx->info.minVerion) pBlockIdx->info.minVerion = pBlock->info.minVerion; + if (pBlock->info.maxVersion < pBlockIdx->info.maxVersion) pBlockIdx->info.maxVersion = pBlock->info.maxVersion; + + tBlockReset(pBlock); tBlockDataReset(&pCommitter->nBlockData); } +_exit: return code; _err: @@ -465,14 +496,12 @@ _err: static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock, int8_t isLastBlock) { - int32_t code = 0; - TSDBROW *pRow; - SBlock block = tBlockInit(); - SBlockData nBlockData; - TSDBKEY key; - int32_t c; + int32_t code = 0; + TSDBROW *pRow; + TSDBKEY key; + int32_t c; - if (pBlock == NULL) { + if (pBlock == NULL) { // (pIter && pBlock == NULL) key.ts = pCommitter->maxKey; key.version = INT64_MAX; code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0); @@ -481,12 +510,14 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb // merge code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0); if (code) goto _err; - } else { + } else { // pBlock && pBlock->last == 0 && (pIter == NULL || pIter) // memory - key.ts = pBlock->info.minKey.ts; - key.version = pBlock->info.minKey.version - 1; - code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1); - if (code) goto _err; + if (pIter) { + key.ts = pBlock->info.minKey.ts; + key.version = pBlock->info.minKey.version - 1; + code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1); + if (code) goto _err; + } // merge or move block pRow = tsdbTbDataIterGet(pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 561a68e928cd799c3309763c0cafe35cde9bccf6..0e7ac9f0d7a0535a8e7c36cec8c9ca664c51cfe3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -15,17 +15,29 @@ #include "tsdb.h" -static const char *tsdbFileSuffix[] = {".tombstone", ".cache", ".index", ".data", ".last", ".sma", ""}; +static const char *tsdbFileSuffix[] = {".del", ".cache", ".head", ".data", ".last", ".sma", ""}; -// .tombstone +// SHeadFile =============================================== +void tsdbHeadFileName(STsdb *pTsdb, SHeadFile *pFile, char fname[]) { + // TODO +} -// SDelFile =============================================== -char *tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile) { - char *pName = NULL; - int32_t size; +// SDataFile =============================================== +void tsdbDataFileName(STsdb *pTsdb, SDataFile *pFile, char fname[]) { + // TODO +} +// SLastFile =============================================== +void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]) { // TODO - // sprintf(pName, "", pTsdb->path, ); +} - return pName; +// SSmaFile =============================================== +void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]) { + // TODO +} + +// SDelFile =============================================== +void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) { + // snprintf(fname, TSDB_FILENAME_LEN, "", pTsdb->path); } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index cd8cde135e8511f4dda36cc3ce34d1dba649542f..2554f320a50621d0572b9a434a74030430c6a731 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -218,6 +218,7 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa pIter->pTbData = pTbData; pIter->backward = backward; pIter->pRow = NULL; + pIter->row.type = 0; if (pFrom == NULL) { // create from head or tail if (backward) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index af5732e2c850ce833a4d8a257df5352ccb050852..dd3570a88419969a0637d08331532914e75663c1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -727,6 +727,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS int32_t code = 0; if (pRow->type == 0) { + ASSERT(pTSchema); code = tsdbBlockDataAppendRow0(pBlockData, pRow, pTSchema); } else if (pRow->type == 1) { code = tsdbBlockDataAppendRow1(pBlockData, pRow);