diff --git a/include/util/tarray2.h b/include/util/tarray2.h
index db0fc759768cdaee63541df04e8c009905f1e07a..9b67c84563569d657131929a0185c8d70e4b3ce4 100644
--- a/include/util/tarray2.h
+++ b/include/util/tarray2.h
@@ -67,13 +67,15 @@ static FORCE_INLINE int32_t tarray2_make_room( //
return 0;
}
-#define TARRAY2_INIT(a) \
- do { \
- (a)->size = 0; \
- (a)->capacity = 0; \
- (a)->data = NULL; \
+#define TARRAY2_INIT_EX(a, size_, capacity_, data_) \
+ do { \
+ (a)->size = (size_); \
+ (a)->capacity = (capacity_); \
+ (a)->data = (data_); \
} while (0)
+#define TARRAY2_INIT(a) TARRAY2_INIT_EX(a, 0, 0, NULL)
+
#define TARRAY2_FREE(a) \
do { \
if ((a)->data) { \
diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h
index 65e95aa88f9f0100f4b66a50c8e57e9a9927694a..cf7d0765eb1f318eab08da6a454238e8256434da 100644
--- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h
+++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h
@@ -13,43 +13,49 @@
* along with this program. If not, see .
*/
+#include "tsdbFS.h"
+#include "tsdbUtil.h"
+
#ifndef _TSDB_STT_FILE_WRITER_H
#define _TSDB_STT_FILE_WRITER_H
-#include "tsdbFS.h"
-
#ifdef __cplusplus
extern "C" {
#endif
typedef TARRAY2(SSttBlk) TSttBlkArray;
+typedef TARRAY2(SDelBlk) TDelBlkArray;
+typedef TARRAY2(STbStatisBlk) TStatisBlkArray;
// SSttFileReader ==========================================
-typedef struct SSttFSegReader SSttFSegReader;
typedef struct SSttFileReader SSttFileReader;
typedef struct SSttFileReaderConfig SSttFileReaderConfig;
+typedef struct SSttSegReader SSttSegReader;
+typedef TARRAY2(SSttSegReader *) TSttSegReaderArray;
// SSttFileReader
-int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **ppReader);
-int32_t tsdbSttFReaderClose(SSttFileReader **ppReader);
+int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader);
+int32_t tsdbSttFReaderClose(SSttFileReader **reader);
+int32_t tsdbSttFReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **segReaderArray);
+
+// SSttSegReader
+int32_t tsdbSttFReadBloomFilter(SSttSegReader *reader, const void *pFilter);
-// SSttFSegReader
-int32_t tsdbSttFSegReaderOpen(SSttFileReader *pReader, SSttFSegReader **ppSegReader, int32_t nSegment);
-int32_t tsdbSttFSegReaderClose(SSttFSegReader **ppSegReader);
-int32_t tsdbSttFSegReadBloomFilter(SSttFSegReader *pSegReader, const void *pFilter);
-int32_t tsdbSttFSegReadStatisBlk(SSttFSegReader *pSegReader, const SArray *pStatis);
-int32_t tsdbSttFSegReadDelBlk(SSttFSegReader *pSegReader, const SArray *pDelBlk);
-int32_t tsdbSttFSegReadSttBlk(SSttFSegReader *pSegReader, const SArray *pSttBlk);
-int32_t tsdbSttFSegReadStatisBlock(SSttFSegReader *pSegReader, const void *pBlock);
-int32_t tsdbSttFSegReadDelBlock(SSttFSegReader *pSegReader, const void *pBlock);
-int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock);
+int32_t tsdbSttFReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray);
+int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray);
+int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray);
+
+int32_t tsdbSttFReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData);
+int32_t tsdbSttFReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData);
+int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData);
struct SSttFileReaderConfig {
- STsdb *pTsdb;
+ STsdb *tsdb;
SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow;
uint8_t **aBuf;
- // TODO
+ int32_t szPage;
+ STFile file;
};
// SSttFileWriter ==========================================
diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
index 69a726f7d1e57abdce42bf81786c23cbc3130dd5..b4176d9526f0983d66bcf77327d9550ce31b7fca 100644
--- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
+++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
@@ -199,7 +199,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
// open the reader
SSttFileReader *reader;
SSttFileReaderConfig config = {
- .pTsdb = merger->tsdb,
+ .tsdb = merger->tsdb,
// TODO
};
code = tsdbSttFReaderOpen(&config, &reader);
diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c
index e2f241d8f45c7755c886bc358ba032681cd7daf5..669368c8618bafe5b5193da3d763d103c23f986c 100644
--- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c
+++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "dev.h"
+#include "inc/tsdbSttFReaderWriter.h"
typedef struct {
int64_t prevFooter;
@@ -23,73 +23,221 @@ typedef struct {
// SSttFReader ============================================================
struct SSttFileReader {
- SSttFileReaderConfig *config;
- // TODO
+ SSttFileReaderConfig config;
+ TSttSegReaderArray segReaderArray;
+ STsdbFD *fd;
+};
+
+struct SSttSegReader {
+ SSttFileReader *reader;
+
+ struct {
+ bool bloomFilterLoaded;
+ bool sttBlkLoaded;
+ bool delBlkLoaded;
+ bool statisBlkLoaded;
+ } ctx;
+
+ SFSttFooter footer;
+ void *bloomFilter;
+ TSttBlkArray sttBlkArray;
+ TDelBlkArray delBlkArray;
+ TStatisBlkArray statisBlkArray;
};
// SSttFileReader
-int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **ppReader) {
+static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) {
int32_t code = 0;
- // TODO
+ int32_t lino = 0;
+ int32_t vid = TD_VID(reader->config.tsdb->pVnode);
+
+ ASSERT(offset >= TSDB_FHDR_SIZE);
+
+ segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0]));
+ if (segReader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
+
+ segReader[0]->reader = reader;
+ code = tsdbReadFile(reader->fd, offset, (uint8_t *)(&segReader[0]->footer), sizeof(segReader[0]->footer));
+ TSDB_CHECK_CODE(code, lino, _exit);
+
+_exit:
+ if (code) {
+ tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
+ taosMemoryFree(segReader[0]);
+ segReader[0] = NULL;
+ }
return code;
}
-int32_t tsdbSttFReaderClose(SSttFileReader **ppReader) {
- int32_t code = 0;
- // TODO
- return code;
+static int32_t tsdbSttSegReaderClose(SSttSegReader **segReader) {
+ if (!segReader[0]) return 0;
+
+ if (segReader[0]->ctx.bloomFilterLoaded) {
+ // TODO
+ }
+ if (segReader[0]->ctx.sttBlkLoaded) {
+ TARRAY2_FREE(&segReader[0]->sttBlkArray);
+ }
+ if (segReader[0]->ctx.delBlkLoaded) {
+ TARRAY2_FREE(&segReader[0]->delBlkArray);
+ }
+ if (segReader[0]->ctx.statisBlkLoaded) {
+ TARRAY2_FREE(&segReader[0]->statisBlkArray);
+ }
+ taosMemoryFree(segReader[0]);
+ segReader[0] = NULL;
+ return 0;
}
-// SSttFSegReader
-int32_t tsdbSttFSegReaderOpen(SSttFileReader *pReader, SSttFSegReader **ppSegReader, int32_t nSegment) {
+int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader) {
int32_t code = 0;
- // TODO
+ int32_t lino = 0;
+ int32_t vid = TD_VID(config->tsdb->pVnode);
+
+ reader[0] = taosMemoryCalloc(1, sizeof(*reader[0]));
+ if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
+
+ reader[0]->config = config[0];
+ TARRAY2_INIT(&reader[0]->segReaderArray);
+
+ // open file
+ char fname[TSDB_FILENAME_LEN];
+ tsdbTFileName(config->tsdb, &config->file, fname);
+ code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
+ TSDB_CHECK_CODE(code, lino, _exit);
+
+ // open each segment reader
+ int64_t size = config->file.size;
+ while (size > 0) {
+ SSttSegReader *segReader;
+
+ code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SFSttFooter), &segReader);
+ TSDB_CHECK_CODE(code, lino, _exit);
+
+ code = TARRAY2_APPEND(&reader[0]->segReaderArray, segReader);
+ TSDB_CHECK_CODE(code, lino, _exit);
+
+ size = segReader->footer.prevFooter;
+ }
+
+ ASSERT(TARRAY2_SIZE(&reader[0]->segReaderArray) == config->file.stt.nseg);
+
+_exit:
+ if (code) {
+ tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
+ tsdbSttFReaderClose(reader);
+ }
return code;
}
-int32_t tsdbSttFSegReaderClose(SSttFSegReader **ppSegReader) {
- int32_t code = 0;
- // TODO
- return code;
+int32_t tsdbSttFReaderClose(SSttFileReader **reader) {
+ tsdbCloseFile(&reader[0]->fd);
+ TARRAY2_CLEAR_FREE(&reader[0]->segReaderArray, tsdbSttSegReaderClose);
+ taosMemoryFree(reader[0]);
+ reader[0] = NULL;
+ return 0;
}
-int32_t tsdbSttFSegReadBloomFilter(SSttFSegReader *pSegReader, const void *pFilter) {
- int32_t code = 0;
- // TODO
- return code;
+int32_t tsdbSttFReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **segReaderArray) {
+ segReaderArray[0] = &reader->segReaderArray;
+ return 0;
}
-int32_t tsdbSttFSegReadStatisBlk(SSttFSegReader *pSegReader, const SArray *pStatis) {
+// SSttFSegReader
+int32_t tsdbSttFReadBloomFilter(SSttSegReader *reader, const void *pFilter) {
int32_t code = 0;
// TODO
return code;
}
-int32_t tsdbSttFSegReadDelBlk(SSttFSegReader *pSegReader, const SArray *pDelBlk) {
- int32_t code = 0;
- // TODO
- return code;
+int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) {
+ if (!reader->ctx.statisBlkLoaded) {
+ SFDataPtr fptr = reader->footer.dict[2];
+ if (fptr.size > 0) {
+ ASSERT(fptr.size % sizeof(STbStatisBlk) == 0);
+
+ int32_t size = fptr.size / sizeof(STbStatisBlk);
+ void *data = taosMemoryMalloc(fptr.size);
+ if (!data) return TSDB_CODE_OUT_OF_MEMORY;
+
+ int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size);
+ if (code) return code;
+
+ TARRAY2_INIT_EX(&reader->statisBlkArray, size, size, data);
+ } else {
+ TARRAY2_INIT(&reader->statisBlkArray);
+ }
+
+ reader->ctx.statisBlkLoaded = true;
+ }
+
+ statisBlkArray[0] = &reader->statisBlkArray;
+ return 0;
}
-int32_t tsdbSttFSegReadSttBlk(SSttFSegReader *pSegReader, const SArray *pSttBlk) {
- int32_t code = 0;
- // TODO
- return code;
+int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray) {
+ if (!reader->ctx.delBlkLoaded) {
+ SFDataPtr fptr = reader->footer.dict[3];
+ if (fptr.size > 0) {
+ ASSERT(fptr.size % sizeof(SDelBlk) == 0);
+
+ int32_t size = fptr.size / sizeof(SDelBlk);
+ void *data = taosMemoryMalloc(fptr.size);
+ if (!data) return TSDB_CODE_OUT_OF_MEMORY;
+
+ int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size);
+ if (code) return code;
+
+ TARRAY2_INIT_EX(&reader->delBlkArray, size, size, data);
+ } else {
+ TARRAY2_INIT(&reader->delBlkArray);
+ }
+
+ reader->ctx.delBlkLoaded = true;
+ }
+
+ delBlkArray[0] = &reader->delBlkArray;
+ return 0;
+}
+
+int32_t tsdbSttFReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) {
+ if (!reader->ctx.sttBlkLoaded) {
+ SFDataPtr fptr = reader->footer.dict[1];
+ if (fptr.size > 0) {
+ ASSERT(fptr.size % sizeof(SSttBlk) == 0);
+
+ int32_t size = fptr.size / sizeof(SSttBlk);
+ void *data = taosMemoryMalloc(fptr.size);
+ if (!data) return TSDB_CODE_OUT_OF_MEMORY;
+
+ int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size);
+ if (code) return code;
+
+ TARRAY2_INIT_EX(&reader->sttBlkArray, size, size, data);
+ } else {
+ TARRAY2_INIT(&reader->sttBlkArray);
+ }
+
+ reader->ctx.sttBlkLoaded = true;
+ }
+
+ sttBlkArray[0] = &reader->sttBlkArray;
+ return 0;
}
-int32_t tsdbSttFSegReadStatisBlock(SSttFSegReader *pSegReader, const void *pBlock) {
+int32_t tsdbSttFReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
int32_t code = 0;
// TODO
return code;
}
-int32_t tsdbSttFSegReadDelBlock(SSttFSegReader *pSegReader, const void *pBlock) {
+int32_t tsdbSttFReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData) {
int32_t code = 0;
// TODO
return code;
}
-int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock) {
+int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData) {
int32_t code = 0;
// TODO
return code;