提交 d17d6c50 编写于 作者: H Hongze Cheng

more work

上级 0359254b
...@@ -72,6 +72,7 @@ typedef struct SDelFReader SDelFReader; ...@@ -72,6 +72,7 @@ typedef struct SDelFReader SDelFReader;
#define HAS_VALUE ((int8_t)0x4) #define HAS_VALUE ((int8_t)0x4)
// tsdbUtil.c ============================================================================================== // tsdbUtil.c ==============================================================================================
// TSDBROW // TSDBROW
#define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow)
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)}); #define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)});
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .pTSRow = (IROW)}); #define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .pTSRow = (IROW)});
TSDBKEY tsdbRowKey(TSDBROW *pRow); TSDBKEY tsdbRowKey(TSDBROW *pRow);
...@@ -149,9 +150,18 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, S ...@@ -149,9 +150,18 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, S
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter); TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter);
// tsdbFile.c ============================================================================================== // tsdbFile.c ==============================================================================================
// SDataFSet
// SHeadFile
void tsdbHeadFileName(STsdb *pTsdb, SHeadFile *pFile, char fname[]);
// SDataFile
void tsdbDataFileName(STsdb *pTsdb, SDataFile *pFile, char fname[]);
// SLastFile
void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]);
// SSmaFile
void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]);
// SDelFile // SDelFile
#define tsdbDelFileCreate() ((SDelFile){.info = KEYINFO_INIT_VAL, .size = 0, .offset = 0}) #define tsdbDelFileCreate() ((SDelFile){.info = KEYINFO_INIT_VAL, .size = 0, .offset = 0})
char *tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile); void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ============================================================================================== // tsdbFS.c ==============================================================================================
typedef struct STsdbFS STsdbFS; typedef struct STsdbFS STsdbFS;
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS); int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS);
...@@ -442,6 +452,8 @@ struct SSmaFile { ...@@ -442,6 +452,8 @@ struct SSmaFile {
}; };
struct SDFileSet { struct SDFileSet {
SDiskID diskId;
int32_t nRef;
SHeadFile *pHeadFile; SHeadFile *pHeadFile;
SDataFile *pDataFile; SDataFile *pDataFile;
SLastFile *pLastFile; SLastFile *pLastFile;
......
...@@ -341,8 +341,14 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx ...@@ -341,8 +341,14 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx
bool toDataOnly) { bool toDataOnly) {
int32_t code = 0; int32_t code = 0;
TSDBROW *pRow; TSDBROW *pRow;
SBlock block = tBlockInit(); STSchema *pTSchema = NULL; // TODO
TSDBKEY key;
SBlock *pBlock = &pCommitter->nBlock;
if (pIter == NULL) goto _exit;
tBlockReset(pBlock);
tBlockDataReset(&pCommitter->nBlockData);
while (true) { while (true) {
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
...@@ -354,30 +360,55 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx ...@@ -354,30 +360,55 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx
} }
} }
code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, NULL /*TODO*/); // update schema
if (pTSchema == NULL || pTSchema->version != TSDBROW_SVERSION(pRow)) {
// TODO
// pTSchema = NULL;
}
// append row
code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, pTSchema);
if (code) goto _err; if (code) goto _err;
// update info
key = tsdbRowKey(pRow);
if (tsdbKeyCmprFn(&key, &pBlock->info.maxKey) > 0) pBlock->info.maxKey = key;
if (tsdbKeyCmprFn(&key, &pBlock->info.minKey) < 0) pBlock->info.minKey = key;
if (key.version > pBlock->info.maxVersion) pBlock->info.maxVersion = key.version;
if (key.version < pBlock->info.minVerion) pBlock->info.minVerion = key.version;
// iter next
tsdbTbDataIterNext(pIter);
// check write
if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) { if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) {
continue; continue;
} }
_write_block_data: _write_block_data:
if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) { if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) {
block.last = 1; pCommitter->nBlock.last = 1;
} else { } else {
block.last = 0; pCommitter->nBlock.last = 0;
} }
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block); code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, pBlock);
if (code) goto _err; if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock); code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err; if (code) goto _err;
tBlockReset(&block); // update info
if (tsdbKeyCmprFn(&pBlock->info.minKey, &pBlockIdx->info.minKey) < 0) pBlock->info.minKey = pBlockIdx->info.minKey;
if (tsdbKeyCmprFn(&pBlock->info.maxKey, &pBlockIdx->info.maxKey) < 0) pBlock->info.maxKey = pBlockIdx->info.maxKey;
if (pBlock->info.minVerion < pBlockIdx->info.minVerion) pBlockIdx->info.minVerion = pBlock->info.minVerion;
if (pBlock->info.maxVersion < pBlockIdx->info.maxVersion) pBlockIdx->info.maxVersion = pBlock->info.maxVersion;
tBlockReset(pBlock);
tBlockDataReset(&pCommitter->nBlockData); tBlockDataReset(&pCommitter->nBlockData);
} }
_exit:
return code; return code;
_err: _err:
...@@ -467,12 +498,10 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb ...@@ -467,12 +498,10 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb
int8_t isLastBlock) { int8_t isLastBlock) {
int32_t code = 0; int32_t code = 0;
TSDBROW *pRow; TSDBROW *pRow;
SBlock block = tBlockInit();
SBlockData nBlockData;
TSDBKEY key; TSDBKEY key;
int32_t c; int32_t c;
if (pBlock == NULL) { if (pBlock == NULL) { // (pIter && pBlock == NULL)
key.ts = pCommitter->maxKey; key.ts = pCommitter->maxKey;
key.version = INT64_MAX; key.version = INT64_MAX;
code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0); code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0);
...@@ -481,12 +510,14 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb ...@@ -481,12 +510,14 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb
// merge // merge
code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0); code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0);
if (code) goto _err; if (code) goto _err;
} else { } else { // pBlock && pBlock->last == 0 && (pIter == NULL || pIter)
// memory // memory
if (pIter) {
key.ts = pBlock->info.minKey.ts; key.ts = pBlock->info.minKey.ts;
key.version = pBlock->info.minKey.version - 1; key.version = pBlock->info.minKey.version - 1;
code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1); code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1);
if (code) goto _err; if (code) goto _err;
}
// merge or move block // merge or move block
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
......
...@@ -15,17 +15,29 @@ ...@@ -15,17 +15,29 @@
#include "tsdb.h" #include "tsdb.h"
static const char *tsdbFileSuffix[] = {".tombstone", ".cache", ".index", ".data", ".last", ".sma", ""}; static const char *tsdbFileSuffix[] = {".del", ".cache", ".head", ".data", ".last", ".sma", ""};
// .tombstone // SHeadFile ===============================================
void tsdbHeadFileName(STsdb *pTsdb, SHeadFile *pFile, char fname[]) {
// TODO
}
// SDelFile =============================================== // SDataFile ===============================================
char *tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile) { void tsdbDataFileName(STsdb *pTsdb, SDataFile *pFile, char fname[]) {
char *pName = NULL; // TODO
int32_t size; }
// SLastFile ===============================================
void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]) {
// TODO // TODO
// sprintf(pName, "", pTsdb->path, ); }
return pName; // SSmaFile ===============================================
void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]) {
// TODO
}
// SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
// snprintf(fname, TSDB_FILENAME_LEN, "", pTsdb->path);
} }
\ No newline at end of file
...@@ -218,6 +218,7 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa ...@@ -218,6 +218,7 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa
pIter->pTbData = pTbData; pIter->pTbData = pTbData;
pIter->backward = backward; pIter->backward = backward;
pIter->pRow = NULL; pIter->pRow = NULL;
pIter->row.type = 0;
if (pFrom == NULL) { if (pFrom == NULL) {
// create from head or tail // create from head or tail
if (backward) { if (backward) {
......
...@@ -727,6 +727,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS ...@@ -727,6 +727,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
int32_t code = 0; int32_t code = 0;
if (pRow->type == 0) { if (pRow->type == 0) {
ASSERT(pTSchema);
code = tsdbBlockDataAppendRow0(pBlockData, pRow, pTSchema); code = tsdbBlockDataAppendRow0(pBlockData, pRow, pTSchema);
} else if (pRow->type == 1) { } else if (pRow->type == 1) {
code = tsdbBlockDataAppendRow1(pBlockData, pRow); code = tsdbBlockDataAppendRow1(pBlockData, pRow);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册