提交 cc0727ae 编写于 作者: H Hongze Cheng

more code

上级 014bfc4a
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "tsdbFSet2.h" #include "tsdbFSet2.h"
static int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) { int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY; if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY;
lvl[0]->level = level; lvl[0]->level = level;
TARRAY2_INIT(lvl[0]->fobjArr); TARRAY2_INIT(lvl[0]->fobjArr);
......
...@@ -15,13 +15,6 @@ ...@@ -15,13 +15,6 @@
#include "tsdbSttFileRW.h" #include "tsdbSttFileRW.h"
typedef struct {
SFDataPtr sttBlkPtr[1];
SFDataPtr statisBlkPtr[1];
SFDataPtr tombBlkPtr[1];
SFDataPtr rsrvd[2];
} SSttFooter;
// SSttFReader ============================================================ // SSttFReader ============================================================
struct SSttFileReader { struct SSttFileReader {
SSttFileReaderConfig config[1]; SSttFileReaderConfig config[1];
...@@ -602,7 +595,7 @@ _exit: ...@@ -602,7 +595,7 @@ _exit:
return code; return code;
} }
int32_t tsdbFileDoWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize) { int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize) {
ptr->size = TARRAY2_DATA_LEN(sttBlkArray); ptr->size = TARRAY2_DATA_LEN(sttBlkArray);
if (ptr->size > 0) { if (ptr->size > 0) {
ptr->offset = *fileSize; ptr->offset = *fileSize;
...@@ -621,7 +614,7 @@ static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { ...@@ -621,7 +614,7 @@ static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
code = tsdbFileDoWriteSttBlk(writer->fd, writer->sttBlkArray, writer->footer->sttBlkPtr, &writer->file->size); code = tsdbFileWriteSttBlk(writer->fd, writer->sttBlkArray, writer->footer->sttBlkPtr, &writer->file->size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
...@@ -680,7 +673,7 @@ _exit: ...@@ -680,7 +673,7 @@ _exit:
return code; return code;
} }
int32_t tsdbSttFileDoWriteFooterImpl(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize) { int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize) {
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer)); int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer));
if (code) return code; if (code) return code;
*fileSize += sizeof(*footer); *fileSize += sizeof(*footer);
...@@ -688,7 +681,7 @@ int32_t tsdbSttFileDoWriteFooterImpl(STsdbFD *fd, const SSttFooter *footer, int6 ...@@ -688,7 +681,7 @@ int32_t tsdbSttFileDoWriteFooterImpl(STsdbFD *fd, const SSttFooter *footer, int6
} }
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
return tsdbSttFileDoWriteFooterImpl(writer->fd, writer->footer, &writer->file->size); return tsdbFileWriteSttFooter(writer->fd, writer->footer, &writer->file->size);
} }
static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
......
...@@ -26,6 +26,13 @@ extern "C" { ...@@ -26,6 +26,13 @@ extern "C" {
typedef TARRAY2(SSttBlk) TSttBlkArray; typedef TARRAY2(SSttBlk) TSttBlkArray;
typedef TARRAY2(SStatisBlk) TStatisBlkArray; typedef TARRAY2(SStatisBlk) TStatisBlkArray;
typedef struct {
SFDataPtr sttBlkPtr[1];
SFDataPtr statisBlkPtr[1];
SFDataPtr tombBlkPtr[1];
SFDataPtr rsrvd[2];
} SSttFooter;
// SSttFileReader ========================================== // SSttFileReader ==========================================
typedef struct SSttFileReader SSttFileReader; typedef struct SSttFileReader SSttFileReader;
typedef struct SSttFileReaderConfig SSttFileReaderConfig; typedef struct SSttFileReaderConfig SSttFileReaderConfig;
......
...@@ -26,6 +26,9 @@ extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t ...@@ -26,6 +26,9 @@ extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t
TBrinBlkArray *brinBlkArray, uint8_t **bufArr); TBrinBlkArray *brinBlkArray, uint8_t **bufArr);
extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize); extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize);
extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer); extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer);
extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize);
extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize);
static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
int32_t code = 0; int32_t code = 0;
...@@ -170,7 +173,20 @@ static int32_t tsdbUpgradeData(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * ...@@ -170,7 +173,20 @@ static int32_t tsdbUpgradeData(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// TODO if (fset->farr[TSDB_FTYPE_HEAD] == NULL) {
return 0;
}
STFile file = {
.type = TSDB_FTYPE_DATA,
.did = pDFileSet->diskId,
.fid = fset->fid,
.cid = pDFileSet->pDataF->commitID,
.size = pDFileSet->pDataF->size,
};
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_DATA]);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
...@@ -183,7 +199,20 @@ static int32_t tsdbUpgradeSma(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *r ...@@ -183,7 +199,20 @@ static int32_t tsdbUpgradeSma(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *r
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// TODO if (fset->farr[TSDB_FTYPE_HEAD] == NULL) {
return 0;
}
STFile file = {
.type = TSDB_FTYPE_SMA,
.did = pDFileSet->diskId,
.fid = fset->fid,
.cid = pDFileSet->pSmaF->commitID,
.size = pDFileSet->pSmaF->size,
};
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_SMA]);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
...@@ -192,11 +221,96 @@ _exit: ...@@ -192,11 +221,96 @@ _exit:
return code; return code;
} }
static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset,
int32_t iStt, SSttLvl *lvl) {
int32_t code = 0;
int32_t lino = 0;
SArray *aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbReadSttBlk(reader, iStt, aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(aSttBlk) > 0) {
SSttFile *pSttF = pDFileSet->aSttF[iStt];
STFileObj *fobj;
struct {
int32_t szPage;
// writer
STsdbFD *fd;
TSttBlkArray sttBlkArray[1];
SSttFooter footer[1];
} ctx[1] = {{
.szPage = tsdb->pVnode->config.tsdbPageSize,
}};
STFile file = {
.type = TSDB_FTYPE_STT,
.did = pDFileSet->diskId,
.fid = fset->fid,
.cid = pSttF->commitID,
.size = pSttF->size,
};
code = tsdbTFileObjInit(tsdb, &file, &fobj);
TSDB_CHECK_CODE(code, lino, _exit1);
code = tsdbOpenFile(fobj->fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
TSDB_CHECK_CODE(code, lino, _exit1);
for (int32_t iSttBlk = 0; iSttBlk < taosArrayGetSize(aSttBlk); iSttBlk++) {
code = TARRAY2_APPEND_PTR(ctx->sttBlkArray, (SSttBlk *)taosArrayGet(aSttBlk, iSttBlk));
TSDB_CHECK_CODE(code, lino, _exit1);
}
code = tsdbFileWriteSttBlk(ctx->fd, ctx->sttBlkArray, ctx->footer->sttBlkPtr, &fobj->f->size);
TSDB_CHECK_CODE(code, lino, _exit1);
code = tsdbFileWriteSttFooter(ctx->fd, ctx->footer, &fobj->f->size);
TSDB_CHECK_CODE(code, lino, _exit1);
code = tsdbFsyncFile(ctx->fd);
TSDB_CHECK_CODE(code, lino, _exit1);
tsdbCloseFile(&ctx->fd);
code = TARRAY2_APPEND(lvl->fobjArr, fobj);
TSDB_CHECK_CODE(code, lino, _exit1);
_exit1:
TARRAY2_DESTROY(ctx->sttBlkArray, NULL);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
}
taosArrayDestroy(aSttBlk);
return code;
}
static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// TODO if (pDFileSet->nSttF == 0) {
return 0;
}
SSttLvl *lvl;
code = tsdbSttLvlInit(0, &lvl);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t iStt = 0; iStt < pDFileSet->nSttF; ++iStt) {
code = tsdbUpgradeSttFile(tsdb, pDFileSet, reader, fset, iStt, lvl);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = TARRAY2_APPEND(fset->lvlArr, lvl);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册