提交 e9a0210e 编写于 作者: H Hongze Cheng

more code

上级 ef524f69
......@@ -44,11 +44,24 @@ typedef struct SDataFileReaderConfig {
int32_t tsdbDataFileReaderOpen(const char *fname[/* TSDB_FTYPE_MAX */], const SDataFileReaderConfig *config,
SDataFileReader **reader);
int32_t tsdbDataFileReaderClose(SDataFileReader **reader);
// .head
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray);
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock);
// .data
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData);
int32_t tsdbDataFileReadBlockDataByCol(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
STSchema *pTSchema, int32_t cidArr[], int32_t numCid);
// .sma
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader);
// .tomb
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray);
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData);
#if 1
int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray);
int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockIdx, const TDataBlkArray **dataBlkArray);
int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData);
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray);
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData);
#endif
// SDataFileWriter =============================================
typedef struct SDataFileWriter SDataFileWriter;
......@@ -72,9 +85,11 @@ typedef struct SDataFileWriterConfig {
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer);
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr);
int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row);
int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData);
int32_t tsdbDataFileFlushTSDataBlock(SDataFileWriter *writer);
int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record);
#ifdef __cplusplus
......
......@@ -165,13 +165,19 @@ typedef union {
} SBrinBlock;
typedef struct {
SFDataPtr dp[1];
TABLEID minTbid;
TABLEID maxTbid;
int64_t minVer;
int64_t maxVer;
SFDataPtr dp[1];
int32_t numRec;
int32_t size[15];
int8_t cmprAlg;
int8_t rsvd[7];
} SBrinBlk;
typedef TARRAY2(SBrinBlk) TBrinBlkArray;
#define BRIN_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
int32_t tBrinBlockInit(SBrinBlock *brinBlock);
......
......@@ -16,7 +16,10 @@
#include "inc/tsdbDataFileRW.h"
typedef struct {
SFDataPtr brinBlkPtr[1];
#if 1
SFDataPtr blockIdxPtr[1];
#endif
SFDataPtr rsrvd[2];
} SHeadFooter;
......@@ -27,40 +30,48 @@ typedef struct {
// SDataFileReader =============================================
struct SDataFileReader {
struct SDataFileReaderConfig config[1];
SDataFileReaderConfig config[1];
uint8_t *bufArr[5];
struct {
bool headFooterLoaded;
bool tombFooterLoaded;
bool blockIdxLoaded;
bool tombBlkLoaded;
bool headFooterLoaded;
bool tombFooterLoaded;
bool brinBlkLoaded;
bool tombBlkLoaded;
#if 1
TABLEID tbid[1];
bool blockIdxLoaded;
#endif
} ctx[1];
STsdbFD *fd[TSDB_FTYPE_MAX];
SHeadFooter headFooter[1];
STombFooter tombFooter[1];
TBlockIdxArray blockIdxArray[1];
SHeadFooter headFooter[1];
STombFooter tombFooter[1];
TBrinBlkArray brinBlkArray[1];
TTombBlkArray tombBlkArray[1];
#if 1
TDataBlkArray dataBlkArray[1];
TTombBlkArray tombBlkArray[1];
TBlockIdxArray blockIdxArray[1];
#endif
};
static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
if (reader->fd[TSDB_FTYPE_HEAD] == NULL //
|| reader->ctx->headFooterLoaded) {
return 0;
}
if (reader->ctx->headFooterLoaded) return 0;
int32_t code = 0;
int32_t lino = 0;
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->config->files[TSDB_FTYPE_HEAD].file.size - sizeof(SHeadFooter),
(uint8_t *)reader->headFooter, sizeof(SHeadFooter));
TSDB_CHECK_CODE(code, lino, _exit);
int32_t ftype = TSDB_FTYPE_HEAD;
if (reader->fd[ftype]) {
code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter),
(uint8_t *)reader->headFooter, sizeof(SHeadFooter));
TSDB_CHECK_CODE(code, lino, _exit);
}
reader->ctx->headFooterLoaded = true;
_exit:
......@@ -71,18 +82,17 @@ _exit:
}
static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
if (reader->fd[TSDB_FTYPE_TOMB] == NULL //
|| reader->ctx->tombFooterLoaded) {
return 0;
}
if (reader->ctx->tombFooterLoaded) return 0;
int32_t code = 0;
int32_t lino = 0;
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->config->files[TSDB_FTYPE_TOMB].file.size - sizeof(STombFooter),
(uint8_t *)reader->tombFooter, sizeof(STombFooter));
TSDB_CHECK_CODE(code, lino, _exit);
int32_t ftype = TSDB_FTYPE_TOMB;
if (reader->fd[ftype]) {
code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter),
(uint8_t *)reader->tombFooter, sizeof(STombFooter));
TSDB_CHECK_CODE(code, lino, _exit);
}
reader->ctx->tombFooterLoaded = true;
_exit:
......@@ -97,13 +107,13 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig
int32_t lino = 0;
reader[0] = taosMemoryCalloc(1, sizeof(**reader));
if (!reader[0]) {
if (reader[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
reader[0]->config[0] = config[0];
if (!reader[0]->config->bufArr) {
if (reader[0]->config->bufArr == NULL) {
reader[0]->config->bufArr = reader[0]->bufArr;
}
......@@ -135,22 +145,158 @@ _exit:
int32_t tsdbDataFileReaderClose(SDataFileReader **reader) {
if (reader[0] == NULL) return 0;
TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL);
#if 1
TARRAY2_DESTROY(reader[0]->dataBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->blockIdxArray, NULL);
#endif
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
tsdbCloseFile(&reader[0]->fd[i]);
if (reader[0]->fd[i]) {
tsdbCloseFile(&reader[0]->fd[i]);
}
}
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) {
tFree(reader[0]->bufArr[i]);
}
taosMemoryFree(reader[0]);
reader[0] = NULL;
return 0;
}
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) {
int32_t code = 0;
int32_t lino = 0;
if (!reader->ctx->brinBlkLoaded) {
code = tsdbDataFileReadHeadFooter(reader);
TSDB_CHECK_CODE(code, lino, _exit);
if (reader->headFooter->brinBlkPtr->size > 0) {
void *data = taosMemoryMalloc(reader->headFooter->brinBlkPtr->size);
if (data == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data,
reader->headFooter->brinBlkPtr->size);
if (code) {
taosMemoryFree(data);
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t size = reader->headFooter->brinBlkPtr->size / sizeof(SBrinBlk);
TARRAY2_INIT_EX(reader->brinBlkArray, size, size, data);
} else {
TARRAY2_INIT(reader->brinBlkArray);
}
reader->ctx->brinBlkLoaded = true;
}
brinBlkArray[0] = reader->brinBlkArray;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
int32_t code = 0;
int32_t lino = 0;
code = tRealloc(&reader->config->bufArr[0], brinBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = 0;
tBrinBlockClear(brinBlock);
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) {
code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[i], TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg,
&reader->config->bufArr[1], brinBlk->numRec * sizeof(int64_t), &reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr1[i], reader->config->bufArr[1], brinBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += brinBlk->size[i];
}
for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) {
code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[j], TSDB_DATA_TYPE_INT, brinBlk->cmprAlg,
&reader->config->bufArr[1], brinBlk->numRec * sizeof(int32_t), &reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr2[i], reader->config->bufArr[1], brinBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += brinBlk->size[j];
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) {
int32_t code = 0;
int32_t lino = 0;
code = tRealloc(&reader->config->bufArr[0], record->blockSize);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockSize);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecmprBlockData(reader->config->bufArr[0], record->blockSize, bData, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbDataFileReadBlockDataByCol(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
STSchema *pTSchema, int32_t cidArr[], int32_t numCid) {
int32_t code = 0;
int32_t lino = 0;
// TODO
ASSERT(0);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader) {
int32_t code = 0;
int32_t lino = 0;
// TODO
ASSERT(0);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray) {
int32_t code = 0;
int32_t lino = 0;
......@@ -251,32 +397,32 @@ int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **t
int32_t code = 0;
int32_t lino = 0;
code = tsdbDataFileReadTombFooter(reader);
TSDB_CHECK_CODE(code, lino, _exit);
if (reader->fd[TSDB_FTYPE_TOMB] && !reader->ctx->tombBlkLoaded) {
if (!reader->ctx->tombBlkLoaded) {
code = tsdbDataFileReadTombFooter(reader);
TSDB_CHECK_CODE(code, lino, _exit);
TARRAY2_CLEAR(reader->tombBlkArray, NULL);
if (reader->tombFooter->tombBlkPtr->size) {
code = tRealloc(&reader->config->bufArr[0], reader->tombFooter->tombBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset,
reader->config->bufArr[0], reader->tombFooter->tombBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
if (reader->tombFooter->tombBlkPtr->size > 0) {
void *data = taosMemoryMalloc(reader->tombFooter->tombBlkPtr->size);
if (data == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk);
for (int32_t i = 0; i < size; ++i) {
code = TARRAY2_APPEND_PTR(reader->tombBlkArray, ((STombBlk *)reader->config->bufArr[0]) + i);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data,
reader->tombFooter->tombBlkPtr->size);
if (code) {
taosMemoryFree(data);
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk);
TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
} else {
TARRAY2_INIT(reader->tombBlkArray);
}
reader->ctx->tombBlkLoaded = true;
}
tombBlkArray[0] = reader->tombBlkArray;
_exit:
......@@ -296,19 +442,19 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB
code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
tTombBlockClear(tData);
int32_t size = 0;
tTombBlockClear(tData);
for (int32_t i = 0; i < ARRAY_SIZE(tData->dataArr); ++i) {
code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP,
&reader->config->bufArr[1], 0, &reader->config->bufArr[2]);
code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg,
&reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, &reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t j = 0; j < tombBlk->numRec; j++) {
code = TARRAY2_APPEND_PTR(tData->dataArr + i, ((int64_t *)reader->config->bufArr[1]) + j);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = TARRAY2_APPEND_BATCH(&tData->dataArr[i], reader->config->bufArr[1], tombBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += tombBlk->size[i];
}
ASSERT(size == tombBlk->dp->size);
_exit:
if (code) {
......
......@@ -254,8 +254,6 @@ int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk,
int32_t code = 0;
int32_t lino = 0;
tTombBlockClear(dData);
code = tRealloc(&reader->reader->config->bufArr[0], tombBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -263,6 +261,7 @@ int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk,
if (code) TSDB_CHECK_CODE(code, lino, _exit);
int64_t size = 0;
tTombBlockClear(dData);
for (int32_t i = 0; i < ARRAY_SIZE(dData->dataArr); ++i) {
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT,
tombBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册