提交 b881b872 编写于 作者: H Hongze Cheng

more code

上级 cc0727ae
......@@ -15,10 +15,9 @@
#include "tsdbDataFileRW.h"
extern int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
extern int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr,
int64_t *fileSize);
extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize);
// SDataFileReader =============================================
struct SDataFileReader {
......@@ -1161,8 +1160,8 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbFileDoWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr);
code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......@@ -1178,8 +1177,8 @@ static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbFileDoWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
&writer->files[TSDB_FTYPE_TOMB].size);
code = tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
&writer->files[TSDB_FTYPE_TOMB].size);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......@@ -1189,14 +1188,19 @@ _exit:
return code;
}
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize) {
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer));
if (code) return code;
*fileSize += sizeof(*footer);
return 0;
}
static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size,
(const uint8_t *)writer->tombFooter, sizeof(STombFooter));
code = tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter, &writer->files[TSDB_FTYPE_TOMB].size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_TOMB].size += sizeof(STombFooter);
_exit:
if (code) {
......
......@@ -522,8 +522,8 @@ _exit:
return code;
}
int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr) {
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr) {
int32_t code;
if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0;
......@@ -584,8 +584,8 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbFileDoWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size,
writer->tombBlkArray, writer->config->bufArr);
code = tsdbFileWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size,
writer->tombBlkArray, writer->config->bufArr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......@@ -644,7 +644,7 @@ _exit:
return code;
}
int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize) {
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize) {
ptr->size = TARRAY2_DATA_LEN(tombBlkArray);
if (ptr->size > 0) {
ptr->offset = *fileSize;
......@@ -663,7 +663,7 @@ static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbFileDoWriteTombBlk(writer->fd, writer->tombBlkArray, writer->footer->tombBlkPtr, &writer->file->size);
code = tsdbFileWriteTombBlk(writer->fd, writer->tombBlkArray, writer->footer->tombBlkPtr, &writer->file->size);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......
......@@ -29,6 +29,10 @@ extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHe
extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize);
extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize);
extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize);
extern int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize);
static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
int32_t code = 0;
......@@ -362,35 +366,102 @@ _exit:
return code;
}
static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **fd, STFileObj **fobj, bool *toStt) {
int32_t code = 0;
int32_t lino = 0;
if (TARRAY2_SIZE(fset->lvlArr) == 0) { // to .tomb file
*toStt = false;
STFile file = {
.type = TSDB_FTYPE_TOMB,
.did = fset->farr[TSDB_FTYPE_HEAD]->f->did,
.fid = fset->fid,
.cid = 0,
.size = 0,
};
code = tsdbTFileObjInit(tsdb, &file, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
fset->farr[TSDB_FTYPE_TOMB] = *fobj;
} else { // to .stt file
*toStt = true;
SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, 0);
STFile file = {
.type = TSDB_FTYPE_STT,
.did = TARRAY2_GET(lvl->fobjArr, 0)->f->did,
.fid = fset->fid,
.cid = 0,
.size = 0,
};
code = tsdbTFileObjInit(tsdb, &file, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(lvl->fobjArr, fobj[0]);
TSDB_CHECK_CODE(code, lino, _exit);
}
char fname[TSDB_FILENAME_LEN] = {0};
code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize,
TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd);
TSDB_CHECK_CODE(code, lino, _exit);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
code = tsdbWriteFile(fd[0], 0, hdr, TSDB_FHDR_SIZE);
TSDB_CHECK_CODE(code, lino, _exit);
fobj[0]->f->size += TSDB_FHDR_SIZE;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *aDelIdx, STFileSet *fset) {
int32_t code = 0;
int32_t lino = 0;
SArray *aDelData = NULL;
int64_t minKey, maxKey;
STombBlock tombBlock[1] = {0};
TTombBlkArray tombBlkArray[1] = {0};
STsdbFD *fd = NULL;
struct {
// context
bool toStt;
int8_t cmprAlg;
int32_t maxRow;
int64_t minKey;
int64_t maxKey;
uint8_t *bufArr[8];
// reader
SArray *aDelData;
// writer
STsdbFD *fd;
STFileObj *fobj;
STombBlock tombBlock[1];
TTombBlkArray tombBlkArray[1];
STombFooter tombFooter[1];
SSttFooter sttFooter[1];
} ctx[1] = {{
.maxRow = tsdb->pVnode->config.tsdbCfg.maxRows,
.cmprAlg = tsdb->pVnode->config.tsdbCfg.compression,
}};
tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &ctx->minKey, &ctx->maxKey);
if ((aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
if ((ctx->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < taosArrayGetSize(aDelIdx); ++i) {
SDelIdx *pDelIdx = taosArrayGet(aDelIdx, i);
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(aDelIdx, iDelIdx);
code = tsdbReadDelData(reader, pDelIdx, aDelData);
code = tsdbReadDelData(reader, pDelIdx, ctx->aDelData);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t j = 0; j < taosArrayGetSize(aDelData); ++j) {
SDelData *pDelData = taosArrayGet(aDelData, j);
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
continue;
}
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(ctx->aDelData); iDelData++) {
SDelData *pDelData = (SDelData *)taosArrayGet(ctx->aDelData, iDelData);
STombRecord record = {
.suid = pDelIdx->suid,
......@@ -400,64 +471,62 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
.ekey = pDelData->eKey,
};
code = tTombBlockPut(tombBlock, &record);
code = tTombBlockPut(ctx->tombBlock, &record);
TSDB_CHECK_CODE(code, lino, _exit);
if (TOMB_BLOCK_SIZE(tombBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) {
if (fd == NULL) {
STFile file = {
.type = TSDB_FTYPE_TOMB,
.did = {0}, // TODO
.fid = fset->fid,
.cid = 0, // TODO
};
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_TOMB]);
if (TOMB_BLOCK_SIZE(ctx->tombBlock) > ctx->maxRow) {
if (ctx->fd == NULL) {
code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbOpenFile(fset->farr[TSDB_FTYPE_TOMB]->fname, tsdb->pVnode->config.tsdbPageSize,
TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC, &fd);
TSDB_CHECK_CODE(code, lino, _exit);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
code = tsdbWriteFile(fd, 0, hdr, TSDB_FHDR_SIZE);
TSDB_CHECK_CODE(code, lino, _exit);
fset->farr[TSDB_FTYPE_TOMB]->f->size += sizeof(hdr);
}
// TODO
tTombBlockClear(tombBlock);
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
ctx->bufArr);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
if (TOMB_BLOCK_SIZE(tombBlock) > 0) {
// TODO
tTombBlockClear(tombBlock);
if (TOMB_BLOCK_SIZE(ctx->tombBlock) > 0) {
if (ctx->fd == NULL) {
code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
ctx->bufArr);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (TARRAY2_SIZE(tombBlkArray) > 0) {
// TODO
}
if (ctx->fd != NULL) {
if (ctx->toStt) {
code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->sttFooter->tombBlkPtr, &ctx->fobj->f->size);
TSDB_CHECK_CODE(code, lino, _exit);
if (fd) {
// write footer
code = tsdbFileWriteSttFooter(ctx->fd, ctx->sttFooter, &ctx->fobj->f->size);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->tombFooter->tombBlkPtr, &ctx->fobj->f->size);
TSDB_CHECK_CODE(code, lino, _exit);
// sync and close
code = tsdbFsyncFile(fd);
code = tsdbFileWriteTombFooter(ctx->fd, ctx->tombFooter, &ctx->fobj->f->size);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbFsyncFile(ctx->fd);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbCloseFile(&fd);
}
// clear
TARRAY2_DESTROY(tombBlkArray, NULL);
tTombBlockDestroy(tombBlock);
taosArrayDestroy(aDelData);
tsdbCloseFile(&ctx->fd);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
}
for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); i++) {
tFree(ctx->bufArr[i]);
}
TARRAY2_DESTROY(ctx->tombBlkArray, NULL);
tTombBlockDestroy(ctx->tombBlock);
taosArrayDestroy(ctx->aDelData);
return code;
}
......@@ -487,13 +556,12 @@ static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArra
}
}
tsdbDelFReaderClose(&reader);
taosArrayDestroy(aDelIdx);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
}
tsdbDelFReaderClose(&reader);
taosArrayDestroy(aDelIdx);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册