提交 0090d022 编写于 作者: H Hongze Cheng

more code

上级 9abfe52c
......@@ -67,13 +67,15 @@ static FORCE_INLINE int32_t tarray2_make_room( //
return 0;
}
#define TARRAY2_INIT(a) \
#define TARRAY2_INIT_EX(a, size_, capacity_, data_) \
do { \
(a)->size = 0; \
(a)->capacity = 0; \
(a)->data = NULL; \
(a)->size = (size_); \
(a)->capacity = (capacity_); \
(a)->data = (data_); \
} while (0)
#define TARRAY2_INIT(a) TARRAY2_INIT_EX(a, 0, 0, NULL)
#define TARRAY2_FREE(a) \
do { \
if ((a)->data) { \
......
......@@ -13,43 +13,49 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbFS.h"
#include "tsdbUtil.h"
#ifndef _TSDB_STT_FILE_WRITER_H
#define _TSDB_STT_FILE_WRITER_H
#include "tsdbFS.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef TARRAY2(SSttBlk) TSttBlkArray;
typedef TARRAY2(SDelBlk) TDelBlkArray;
typedef TARRAY2(STbStatisBlk) TStatisBlkArray;
// SSttFileReader ==========================================
typedef struct SSttFSegReader SSttFSegReader;
typedef struct SSttFileReader SSttFileReader;
typedef struct SSttFileReaderConfig SSttFileReaderConfig;
typedef struct SSttSegReader SSttSegReader;
typedef TARRAY2(SSttSegReader *) TSttSegReaderArray;
// SSttFileReader
int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **ppReader);
int32_t tsdbSttFReaderClose(SSttFileReader **ppReader);
int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader);
int32_t tsdbSttFReaderClose(SSttFileReader **reader);
int32_t tsdbSttFReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **segReaderArray);
// SSttSegReader
int32_t tsdbSttFReadBloomFilter(SSttSegReader *reader, const void *pFilter);
// SSttFSegReader
int32_t tsdbSttFSegReaderOpen(SSttFileReader *pReader, SSttFSegReader **ppSegReader, int32_t nSegment);
int32_t tsdbSttFSegReaderClose(SSttFSegReader **ppSegReader);
int32_t tsdbSttFSegReadBloomFilter(SSttFSegReader *pSegReader, const void *pFilter);
int32_t tsdbSttFSegReadStatisBlk(SSttFSegReader *pSegReader, const SArray *pStatis);
int32_t tsdbSttFSegReadDelBlk(SSttFSegReader *pSegReader, const SArray *pDelBlk);
int32_t tsdbSttFSegReadSttBlk(SSttFSegReader *pSegReader, const SArray *pSttBlk);
int32_t tsdbSttFSegReadStatisBlock(SSttFSegReader *pSegReader, const void *pBlock);
int32_t tsdbSttFSegReadDelBlock(SSttFSegReader *pSegReader, const void *pBlock);
int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock);
int32_t tsdbSttFReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray);
int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray);
int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray);
int32_t tsdbSttFReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData);
int32_t tsdbSttFReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData);
int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData);
struct SSttFileReaderConfig {
STsdb *pTsdb;
STsdb *tsdb;
SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow;
uint8_t **aBuf;
// TODO
int32_t szPage;
STFile file;
};
// SSttFileWriter ==========================================
......
......@@ -199,7 +199,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
// open the reader
SSttFileReader *reader;
SSttFileReaderConfig config = {
.pTsdb = merger->tsdb,
.tsdb = merger->tsdb,
// TODO
};
code = tsdbSttFReaderOpen(&config, &reader);
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dev.h"
#include "inc/tsdbSttFReaderWriter.h"
typedef struct {
int64_t prevFooter;
......@@ -23,73 +23,221 @@ typedef struct {
// SSttFReader ============================================================
struct SSttFileReader {
SSttFileReaderConfig *config;
// TODO
SSttFileReaderConfig config;
TSttSegReaderArray segReaderArray;
STsdbFD *fd;
};
struct SSttSegReader {
SSttFileReader *reader;
struct {
bool bloomFilterLoaded;
bool sttBlkLoaded;
bool delBlkLoaded;
bool statisBlkLoaded;
} ctx;
SFSttFooter footer;
void *bloomFilter;
TSttBlkArray sttBlkArray;
TDelBlkArray delBlkArray;
TStatisBlkArray statisBlkArray;
};
// SSttFileReader
int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **ppReader) {
static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) {
int32_t code = 0;
// TODO
int32_t lino = 0;
int32_t vid = TD_VID(reader->config.tsdb->pVnode);
ASSERT(offset >= TSDB_FHDR_SIZE);
segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0]));
if (segReader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
segReader[0]->reader = reader;
code = tsdbReadFile(reader->fd, offset, (uint8_t *)(&segReader[0]->footer), sizeof(segReader[0]->footer));
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
taosMemoryFree(segReader[0]);
segReader[0] = NULL;
}
return code;
}
int32_t tsdbSttFReaderClose(SSttFileReader **ppReader) {
int32_t code = 0;
static int32_t tsdbSttSegReaderClose(SSttSegReader **segReader) {
if (!segReader[0]) return 0;
if (segReader[0]->ctx.bloomFilterLoaded) {
// TODO
return code;
}
if (segReader[0]->ctx.sttBlkLoaded) {
TARRAY2_FREE(&segReader[0]->sttBlkArray);
}
if (segReader[0]->ctx.delBlkLoaded) {
TARRAY2_FREE(&segReader[0]->delBlkArray);
}
if (segReader[0]->ctx.statisBlkLoaded) {
TARRAY2_FREE(&segReader[0]->statisBlkArray);
}
taosMemoryFree(segReader[0]);
segReader[0] = NULL;
return 0;
}
// SSttFSegReader
int32_t tsdbSttFSegReaderOpen(SSttFileReader *pReader, SSttFSegReader **ppSegReader, int32_t nSegment) {
int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader) {
int32_t code = 0;
// TODO
int32_t lino = 0;
int32_t vid = TD_VID(config->tsdb->pVnode);
reader[0] = taosMemoryCalloc(1, sizeof(*reader[0]));
if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
reader[0]->config = config[0];
TARRAY2_INIT(&reader[0]->segReaderArray);
// open file
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(config->tsdb, &config->file, fname);
code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
TSDB_CHECK_CODE(code, lino, _exit);
// open each segment reader
int64_t size = config->file.size;
while (size > 0) {
SSttSegReader *segReader;
code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SFSttFooter), &segReader);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(&reader[0]->segReaderArray, segReader);
TSDB_CHECK_CODE(code, lino, _exit);
size = segReader->footer.prevFooter;
}
ASSERT(TARRAY2_SIZE(&reader[0]->segReaderArray) == config->file.stt.nseg);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
tsdbSttFReaderClose(reader);
}
return code;
}
int32_t tsdbSttFSegReaderClose(SSttFSegReader **ppSegReader) {
int32_t code = 0;
// TODO
return code;
int32_t tsdbSttFReaderClose(SSttFileReader **reader) {
tsdbCloseFile(&reader[0]->fd);
TARRAY2_CLEAR_FREE(&reader[0]->segReaderArray, tsdbSttSegReaderClose);
taosMemoryFree(reader[0]);
reader[0] = NULL;
return 0;
}
int32_t tsdbSttFSegReadBloomFilter(SSttFSegReader *pSegReader, const void *pFilter) {
int32_t code = 0;
// TODO
return code;
int32_t tsdbSttFReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **segReaderArray) {
segReaderArray[0] = &reader->segReaderArray;
return 0;
}
int32_t tsdbSttFSegReadStatisBlk(SSttFSegReader *pSegReader, const SArray *pStatis) {
// SSttFSegReader
int32_t tsdbSttFReadBloomFilter(SSttSegReader *reader, const void *pFilter) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbSttFSegReadDelBlk(SSttFSegReader *pSegReader, const SArray *pDelBlk) {
int32_t code = 0;
// TODO
return code;
int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) {
if (!reader->ctx.statisBlkLoaded) {
SFDataPtr fptr = reader->footer.dict[2];
if (fptr.size > 0) {
ASSERT(fptr.size % sizeof(STbStatisBlk) == 0);
int32_t size = fptr.size / sizeof(STbStatisBlk);
void *data = taosMemoryMalloc(fptr.size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size);
if (code) return code;
TARRAY2_INIT_EX(&reader->statisBlkArray, size, size, data);
} else {
TARRAY2_INIT(&reader->statisBlkArray);
}
reader->ctx.statisBlkLoaded = true;
}
statisBlkArray[0] = &reader->statisBlkArray;
return 0;
}
int32_t tsdbSttFSegReadSttBlk(SSttFSegReader *pSegReader, const SArray *pSttBlk) {
int32_t code = 0;
// TODO
return code;
int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray) {
if (!reader->ctx.delBlkLoaded) {
SFDataPtr fptr = reader->footer.dict[3];
if (fptr.size > 0) {
ASSERT(fptr.size % sizeof(SDelBlk) == 0);
int32_t size = fptr.size / sizeof(SDelBlk);
void *data = taosMemoryMalloc(fptr.size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size);
if (code) return code;
TARRAY2_INIT_EX(&reader->delBlkArray, size, size, data);
} else {
TARRAY2_INIT(&reader->delBlkArray);
}
reader->ctx.delBlkLoaded = true;
}
delBlkArray[0] = &reader->delBlkArray;
return 0;
}
int32_t tsdbSttFReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) {
if (!reader->ctx.sttBlkLoaded) {
SFDataPtr fptr = reader->footer.dict[1];
if (fptr.size > 0) {
ASSERT(fptr.size % sizeof(SSttBlk) == 0);
int32_t size = fptr.size / sizeof(SSttBlk);
void *data = taosMemoryMalloc(fptr.size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size);
if (code) return code;
TARRAY2_INIT_EX(&reader->sttBlkArray, size, size, data);
} else {
TARRAY2_INIT(&reader->sttBlkArray);
}
reader->ctx.sttBlkLoaded = true;
}
sttBlkArray[0] = &reader->sttBlkArray;
return 0;
}
int32_t tsdbSttFSegReadStatisBlock(SSttFSegReader *pSegReader, const void *pBlock) {
int32_t tsdbSttFReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbSttFSegReadDelBlock(SSttFSegReader *pSegReader, const void *pBlock) {
int32_t tsdbSttFReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock) {
int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData) {
int32_t code = 0;
// TODO
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册