提交 ec288ade 编写于 作者: H Hongze Cheng

change stt from multi seg to one seg

上级 2ad30e29
......@@ -46,9 +46,14 @@ typedef struct {
TABLEID tbid[1];
} ctx[1];
// reader
SSttFileReader *sttReader;
TTsdbIterArray iterArray[1];
SIterMerger *iterMerger;
// iter
TTsdbIterArray dataIterArray[1];
SIterMerger *dataIterMerger;
TTsdbIterArray tombIterArray[1];
SIterMerger *tombIterMerger;
// writer
SBlockData blockData[2];
......@@ -57,10 +62,11 @@ typedef struct {
SSttFileWriter *sttWriter;
} SCommitter2;
static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) {
static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
// stt writer
SSttFileWriterConfig config[1] = {{
.tsdb = committer->tsdb,
.maxRow = committer->maxRow,
......@@ -79,75 +85,6 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) {
code = tsdbSttFileWriterOpen(config, &committer->sttWriter);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d %s success", TD_VID(committer->tsdb->pVnode), __func__);
}
return code;
}
static int32_t tsdbCommitOpenExistSttWriter(SCommitter2 *committer, const STFile *f) {
int32_t code = 0;
int32_t lino = 0;
SSttFileWriterConfig config[1] = {{
.tsdb = committer->tsdb,
.maxRow = committer->maxRow,
.szPage = committer->szPage,
.cmprAlg = committer->cmprAlg,
.compactVersion = committer->compactVersion,
.file = f[0],
}};
code = tsdbSttFileWriterOpen(config, &committer->sttWriter);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d %s success", TD_VID(committer->tsdb->pVnode), __func__);
}
return code;
}
static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
// stt writer
if (committer->ctx->fset == NULL) {
code = tsdbCommitOpenNewSttWriter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
const SSttLvl *lvl0 = tsdbTFileSetGetSttLvl(committer->ctx->fset, 0);
if (lvl0 == NULL || TARRAY2_SIZE(lvl0->fobjArr) == 0) {
code = tsdbCommitOpenNewSttWriter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr);
if (fobj->f->stt->nseg >= committer->sttTrigger) {
code = tsdbCommitOpenNewSttWriter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
if (committer->sttTrigger == 1) {
SSttFileReaderConfig sttFileReaderConfig = {
.tsdb = committer->tsdb,
.szPage = committer->szPage,
.file = fobj->f[0],
};
code = tsdbSttFileReaderOpen(NULL, &sttFileReaderConfig, &committer->sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
code = tsdbCommitOpenExistSttWriter(committer, fobj->f);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
// data writer
if (committer->sttTrigger == 1) {
// data writer
......@@ -182,49 +119,14 @@ _exit:
return code;
}
static int32_t tsdbCommitTSDataOpenIterMerger(SCommitter2 *committer) {
static int32_t tsdbCommitCloseWriter(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
ASSERT(TARRAY2_SIZE(committer->iterArray) == 0);
ASSERT(committer->iterMerger == NULL);
STsdbIter *iter;
STsdbIterConfig config[1] = {0};
// memtable iter
config->type = TSDB_ITER_TYPE_MEMT;
config->memt = committer->tsdb->imem;
config->from->ts = committer->ctx->minKey;
config->from->version = VERSION_MIN;
code = tsdbIterOpen(config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->iterArray, iter);
TSDB_CHECK_CODE(code, lino, _exit);
// stt file iter
if (committer->sttReader) {
const TSttSegReaderArray *readerArray;
tsdbSttFileReaderGetSegReader(committer->sttReader, &readerArray);
SSttSegReader *segReader;
TARRAY2_FOREACH(readerArray, segReader) {
config->type = TSDB_ITER_TYPE_STT;
config->sttReader = segReader;
}
code = tsdbIterOpen(config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->iterArray, iter);
code = tsdbSttFileWriterClose(&committer->sttWriter, 0, committer->fopArray);
TSDB_CHECK_CODE(code, lino, _exit);
}
// open iter merger
code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger, false);
code = tsdbDataFileWriterClose(&committer->dataWriter, 0, committer->fopArray);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......@@ -234,20 +136,6 @@ _exit:
return code;
}
static int32_t tsdbCommitTSDataCloseIterMerger(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
tsdbIterMergerClose(&committer->iterMerger);
TARRAY2_CLEAR(committer->iterArray, tsdbIterClose);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitTSDataToDataTableBegin(SCommitter2 *committer, const TABLEID *tbid) {
int32_t code = 0;
int32_t lino = 0;
......@@ -342,7 +230,7 @@ static int32_t tsdbCommitTSDataToData(SCommitter2 *committer) {
int32_t lino = 0;
SMetaInfo info;
for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->iterMerger)) != NULL;) {
for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
if (row->uid != committer->ctx->tbid->uid) {
// end last table write
code = tsdbCommitTSDataToDataTableEnd(committer);
......@@ -350,7 +238,7 @@ static int32_t tsdbCommitTSDataToData(SCommitter2 *committer) {
// Ignore table of obsolescence
if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
code = tsdbIterMergerSkipTableData(committer->iterMerger, (TABLEID *)row);
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, (TABLEID *)row);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
}
......@@ -389,7 +277,7 @@ static int32_t tsdbCommitTSDataToData(SCommitter2 *committer) {
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbIterMergerNext(committer->iterMerger);
code = tsdbIterMergerNext(committer->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -410,14 +298,14 @@ static int32_t tsdbCommitTSDataToStt(SCommitter2 *committer) {
ASSERT(committer->sttReader == NULL);
SMetaInfo info;
for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->iterMerger)) != NULL;) {
for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
if (row->uid != committer->ctx->tbid->uid) {
committer->ctx->tbid->suid = row->suid;
committer->ctx->tbid->uid = row->uid;
// Ignore table of obsolescence
if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid);
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
}
......@@ -426,13 +314,13 @@ static int32_t tsdbCommitTSDataToStt(SCommitter2 *committer) {
TSKEY ts = TSDBROW_TS(&row->row);
if (ts > committer->ctx->maxKey) {
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid);
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbSttFileWriteRow(committer->sttWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger);
code = tsdbIterMergerNext(committer->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
......@@ -448,12 +336,6 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
if (committer->tsdb->imem->nRow == 0) goto _exit;
// open iter and iter merger
code = tsdbCommitTSDataOpenIterMerger(committer);
TSDB_CHECK_CODE(code, lino, _exit);
// loop iter
if (committer->sttTrigger == 1) {
code = tsdbCommitTSDataToData(committer);
......@@ -463,9 +345,34 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
TSDB_CHECK_CODE(code, lino, _exit);
}
// close iter and iter merger
code = tsdbCommitTSDataCloseIterMerger(committer);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitTombData(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) {
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
code = tsdbSttFileWriteTombRecord(committer->sttWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
code = tsdbDataFileWriteTombRecord(committer->dataWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:
if (code) {
......@@ -474,42 +381,48 @@ _exit:
return code;
}
static int32_t tsdbCommitTombDataOpenIter(SCommitter2 *committer) {
static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
STsdbIter *iter;
STsdbIterConfig config[1] = {0};
ASSERT(committer->sttReader == NULL);
if (committer->sttReader) {
const TSttSegReaderArray *readerArray;
if (committer->ctx->fset == NULL //
|| committer->sttTrigger > 1 //
|| TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 0 //
) {
return 0;
}
tsdbSttFileReaderGetSegReader(committer->sttReader, &readerArray);
ASSERT(TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 1);
SSttSegReader *segReader;
TARRAY2_FOREACH(readerArray, segReader) {
config->type = TSDB_ITER_TYPE_STT_TOMB;
config->sttReader = segReader;
SSttLvl *lvl = TARRAY2_FIRST(committer->ctx->fset->lvlArr);
code = tsdbIterOpen(config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(lvl->level == 0);
code = TARRAY2_APPEND(committer->iterArray, iter);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
return 0;
}
config->type = TSDB_ITER_TYPE_MEMT_TOMB;
config->memt = committer->tsdb->imem;
ASSERT(TARRAY2_SIZE(lvl->fobjArr) == 1);
code = tsdbIterOpen(config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr);
code = TARRAY2_APPEND(committer->iterArray, iter);
SSttFileReaderConfig config = {
.tsdb = committer->tsdb,
.szPage = committer->szPage,
.file = fobj->f[0],
};
code = tsdbSttFileReaderOpen(fobj->fname, &config, &committer->sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
// open iter
code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger, true);
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
.fid = fobj->f->fid,
.of = fobj->f[0],
};
code = TARRAY2_APPEND(committer->fopArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......@@ -519,38 +432,70 @@ _exit:
return code;
}
static int32_t tsdbCommitTombDataCloseIter(SCommitter2 *committer) {
tsdbIterMergerClose(&committer->iterMerger);
TARRAY2_CLEAR(committer->iterArray, tsdbIterClose);
return 0;
}
static int32_t tsdbCommitCloseReader(SCommitter2 *committer) { return tsdbSttFileReaderClose(&committer->sttReader); }
static int32_t tsdbCommitTombData(SCommitter2 *committer) {
static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbCommitTombDataOpenIter(committer);
ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0);
ASSERT(committer->dataIterMerger == NULL);
ASSERT(TARRAY2_SIZE(committer->tombIterArray) == 0);
ASSERT(committer->tombIterMerger == NULL);
STsdbIter *iter;
STsdbIterConfig config = {0};
// mem data iter
config.type = TSDB_ITER_TYPE_MEMT;
config.memt = committer->tsdb->imem;
config.from->ts = committer->ctx->minKey;
config.from->version = VERSION_MIN;
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) {
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
code = tsdbSttFileWriteTombRecord(committer->sttWriter, record);
code = TARRAY2_APPEND(committer->dataIterArray, iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger);
// mem tomb iter
config.type = TSDB_ITER_TYPE_MEMT_TOMB;
config.memt = committer->tsdb->imem;
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
code = tsdbDataFileWriteTombRecord(committer->dataWriter, record);
code = TARRAY2_APPEND(committer->tombIterArray, iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger);
// STT
if (committer->sttReader) {
// data iter
config.type = TSDB_ITER_TYPE_STT;
config.sttReader = committer->sttReader;
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->dataIterArray, iter);
TSDB_CHECK_CODE(code, lino, _exit);
// tomb iter
config.type = TSDB_ITER_TYPE_STT_TOMB;
config.sttReader = committer->sttReader;
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->tombIterArray, iter);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tsdbCommitTombDataCloseIter(committer);
// open merger
code = tsdbIterMergerOpen(committer->dataIterArray, &committer->dataIterMerger, false);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerOpen(committer->tombIterArray, &committer->tombIterMerger, true);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......@@ -560,6 +505,14 @@ _exit:
return code;
}
static int32_t tsdbCommitCloseIter(SCommitter2 *committer) {
tsdbIterMergerClose(&committer->tombIterMerger);
tsdbIterMergerClose(&committer->dataIterMerger);
TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose);
TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose);
return 0;
}
static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
......@@ -577,11 +530,17 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
committer->ctx->tbid->suid = 0;
committer->ctx->tbid->uid = 0;
ASSERT(TARRAY2_SIZE(committer->iterArray) == 0);
ASSERT(committer->iterMerger == NULL);
ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0);
ASSERT(committer->dataIterMerger == NULL);
ASSERT(committer->sttWriter == NULL);
ASSERT(committer->dataWriter == NULL);
code = tsdbCommitOpenReader(committer);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommitOpenIter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommitOpenWriter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -602,22 +561,15 @@ static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
if (committer->sttReader) {
code = tsdbSttFileReaderClose(&committer->sttReader);
code = tsdbCommitCloseWriter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (committer->dataWriter) {
code = tsdbDataFileWriterClose(&committer->dataWriter, 0, committer->fopArray);
code = tsdbCommitCloseIter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbSttFileWriterClose(&committer->sttWriter, 0, committer->fopArray);
code = tsdbCommitCloseReader(committer);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbIterMergerClose(&committer->iterMerger);
TARRAY2_CLEAR(committer->iterArray, tsdbIterClose);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
......@@ -713,8 +665,10 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
ASSERT(committer->dataWriter == NULL);
ASSERT(committer->sttWriter == NULL);
ASSERT(committer->iterMerger == NULL);
TARRAY2_DESTROY(committer->iterArray, NULL);
ASSERT(committer->dataIterMerger == NULL);
ASSERT(committer->tombIterMerger == NULL);
TARRAY2_DESTROY(committer->dataIterArray, NULL);
TARRAY2_DESTROY(committer->tombIterArray, NULL);
TARRAY2_DESTROY(committer->fopArray, NULL);
tsdbFSDestroyCopySnapshot(&committer->fsetArr);
......
......@@ -1522,6 +1522,8 @@ int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWri
}
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) {
if (writer[0] == NULL) return 0;
int32_t code = 0;
int32_t lino = 0;
......
......@@ -607,11 +607,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;
SSttLvl *lvl0 = TARRAY2_FIRST(fset->lvlArr);
if (lvl0->level != 0 || TARRAY2_SIZE(lvl0->fobjArr) == 0) continue;
STFileObj *fobj = TARRAY2_FIRST(lvl0->fobjArr);
if (fobj->f->stt->nseg < fs->tsdb->pVnode->config.sttTrigger) continue;
if (lvl0->level != 0 || TARRAY2_SIZE(lvl0->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue;
code = vnodeScheduleTaskEx(1, tsdbMerge, fs->tsdb);
TSDB_CHECK_CODE(code, lino, _exit);
......
......@@ -134,11 +134,6 @@ static int32_t stt_to_json(const STFile *file, cJSON *json) {
return TSDB_CODE_OUT_OF_MEMORY;
}
/* nseg */
if (cJSON_AddNumberToObject(json, "nseg", file->stt->nseg) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0;
}
......@@ -160,14 +155,6 @@ static int32_t stt_from_json(const cJSON *json, STFile *file) {
return TSDB_CODE_FILE_CORRUPTED;
}
/* nseg */
item = cJSON_GetObjectItem(json, "nseg");
if (cJSON_IsNumber(item)) {
file->stt->nseg = item->valuedouble;
} else {
return TSDB_CODE_FILE_CORRUPTED;
}
return 0;
}
......@@ -290,7 +277,7 @@ bool tsdbIsSameTFile(const STFile *f1, const STFile *f2) {
bool tsdbIsTFileChanged(const STFile *f1, const STFile *f2) {
if (f1->size != f2->size) return true;
if (f1->type == TSDB_FTYPE_STT && f1->stt->nseg != f2->stt->nseg) return true;
// if (f1->type == TSDB_FTYPE_STT && f1->stt->nseg != f2->stt->nseg) return true;
return false;
}
......
......@@ -64,7 +64,6 @@ struct STFile {
union {
struct {
int32_t level;
int32_t nseg;
} stt[1];
};
};
......
......@@ -28,7 +28,7 @@ struct STsdbIter {
SRBTreeNode node[1];
union {
struct {
SSttSegReader *reader;
SSttFileReader *reader;
const TSttBlkArray *sttBlkArray;
int32_t sttBlkArrayIdx;
SBlockData blockData[1];
......@@ -51,7 +51,7 @@ struct STsdbIter {
STbDataIter tbIter[1];
} memtData[1];
struct {
SSttSegReader *reader;
SSttFileReader *reader;
const TTombBlkArray *tombBlkArray;
int32_t tombBlkArrayIdx;
STombBlock tombBlock[1];
......
......@@ -41,7 +41,7 @@ typedef enum {
typedef struct {
EIterType type;
union {
SSttSegReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB
SSttFileReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB
SDataFileReader *dataReader; // TSDB_ITER_TYPE_DATA || TSDB_ITER_TYPE_DATA_TOMB
struct {
SMemTable *memt; // TSDB_ITER_TYPE_MEMT_TOMB
......
......@@ -37,7 +37,7 @@ typedef struct {
bool toData;
int32_t level;
SSttLvl *lvl;
STFileObj *fobj;
// STFileObj *fobj;
TABLEID tbid[1];
int32_t blockDataIdx;
SBlockData blockData[2];
......@@ -305,8 +305,6 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
merger->ctx->toData = true;
merger->ctx->level = 0;
// TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) {
for (int32_t i = 0;; ++i) {
if (i >= TARRAY2_SIZE(merger->ctx->fset->lvlArr)) {
merger->ctx->lvl = NULL;
......@@ -314,38 +312,38 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
}
merger->ctx->lvl = TARRAY2_GET(merger->ctx->fset->lvlArr, i);
if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) {
if (merger->ctx->lvl->level != merger->ctx->level ||
TARRAY2_SIZE(merger->ctx->lvl->fobjArr) + 1 < merger->sttTrigger) {
merger->ctx->toData = false;
merger->ctx->lvl = NULL;
break;
}
ASSERT(merger->ctx->lvl->level == 0 || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 1);
merger->ctx->level++;
merger->ctx->fobj = TARRAY2_FIRST(merger->ctx->lvl->fobjArr);
if (merger->ctx->fobj->f->stt->nseg < merger->sttTrigger) {
merger->ctx->toData = false;
STFileObj *fobj;
int32_t numFile = 0;
TARRAY2_FOREACH(merger->ctx->lvl->fobjArr, fobj) {
if (numFile == merger->sttTrigger) {
break;
} else {
merger->ctx->level++;
}
// add remove operation
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
.fid = merger->ctx->fset->fid,
.of = merger->ctx->fobj->f[0],
.of = fobj->f[0],
};
code = TARRAY2_APPEND(merger->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// open the reader
SSttFileReader *reader;
SSttFileReaderConfig config[1] = {{
SSttFileReaderConfig config = {
.tsdb = merger->tsdb,
.szPage = merger->szPage,
.file[0] = merger->ctx->fobj->f[0],
}};
code = tsdbSttFileReaderOpen(merger->ctx->fobj->fname, config, &reader);
.file[0] = fobj->f[0],
};
code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(merger->sttReaderArr, reader);
......@@ -367,34 +365,29 @@ static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
SSttFileReader *sttReader;
TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
const TSttSegReaderArray *segReaderArr;
code = tsdbSttFileReaderGetSegReader(sttReader, &segReaderArr);
TSDB_CHECK_CODE(code, lino, _exit);
SSttSegReader *segReader;
TARRAY2_FOREACH(segReaderArr, segReader) {
STsdbIter *iter;
STsdbIterConfig config[1] = {{
.type = TSDB_ITER_TYPE_STT,
.sttReader = segReader,
}};
STsdbIterConfig config = {0};
// data iter
code = tsdbIterOpen(config, &iter);
config.type = TSDB_ITER_TYPE_STT;
config.sttReader = sttReader;
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(merger->dataIterArr, iter);
TSDB_CHECK_CODE(code, lino, _exit);
// tomb iter
config->type = TSDB_ITER_TYPE_STT_TOMB;
code = tsdbIterOpen(config, &iter);
config.type = TSDB_ITER_TYPE_STT_TOMB;
config.sttReader = sttReader;
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(merger->tombIterArr, iter);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tsdbIterMergerOpen(merger->dataIterArr, &merger->dataIterMerger, false);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -414,26 +407,14 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode);
if (merger->ctx->lvl) {
// to existing level
SSttFileWriterConfig config[1] = {{
.tsdb = merger->tsdb,
.maxRow = merger->maxRow,
.szPage = merger->szPage,
.cmprAlg = merger->cmprAlg,
.compactVersion = merger->compactVersion,
.file = merger->ctx->fobj->f[0],
}};
code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
SDiskID did[1];
SDiskID did;
int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, did) < 0) {
if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) {
code = TSDB_CODE_FS_NO_VALID_DISK;
TSDB_CHECK_CODE(code, lino, _exit);
}
{
// to new level
SSttFileWriterConfig config[1] = {{
.tsdb = merger->tsdb,
......@@ -444,13 +425,12 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
.file =
{
.type = TSDB_FTYPE_STT,
.did = did[0],
.did = did,
.fid = merger->ctx->fset->fid,
.cid = merger->cid,
.size = 0,
.stt = {{
.level = merger->ctx->level,
.nseg = 0,
}},
},
}};
......@@ -459,14 +439,6 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
}
if (merger->ctx->toData) {
SDiskID did;
int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) {
code = TSDB_CODE_FS_NO_VALID_DISK;
TSDB_CHECK_CODE(code, lino, _exit);
}
SDataFileWriterConfig config[1] = {{
.tsdb = merger->tsdb,
.cmprAlg = merger->cmprAlg,
......@@ -622,11 +594,11 @@ static int32_t tsdbDoMerge(SMerger *merger) {
STFileSet *fset;
TARRAY2_FOREACH(merger->fsetArr, fset) {
SSttLvl *lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL;
if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue;
if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr);
if (fobj->f->stt->nseg < merger->sttTrigger) continue;
if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) continue;
if (!merger->ctx->opened) {
code = tsdbMergerOpen(merger);
......
......@@ -16,7 +16,6 @@
#include "tsdbSttFileRW.h"
typedef struct {
int64_t prevFooter;
SFDataPtr sttBlkPtr[1];
SFDataPtr statisBlkPtr[1];
SFDataPtr tombBlkPtr[1];
......@@ -26,13 +25,7 @@ typedef struct {
// SSttFReader ============================================================
struct SSttFileReader {
SSttFileReaderConfig config[1];
TSttSegReaderArray readerArray[1];
STsdbFD *fd;
uint8_t *bufArr[5];
};
struct SSttSegReader {
SSttFileReader *reader;
SSttFooter footer[1];
struct {
bool sttBlkLoaded;
......@@ -42,42 +35,10 @@ struct SSttSegReader {
TSttBlkArray sttBlkArray[1];
TStatisBlkArray statisBlkArray[1];
TTombBlkArray tombBlkArray[1];
uint8_t *bufArr[5];
};
// SSttFileReader
static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) {
ASSERT(offset >= TSDB_FHDR_SIZE);
int32_t code = 0;
int32_t lino = 0;
segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0]));
if (!segReader[0]) return TSDB_CODE_OUT_OF_MEMORY;
segReader[0]->reader = reader;
code = tsdbReadFile(reader->fd, offset, (uint8_t *)(segReader[0]->footer), sizeof(SSttFooter));
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
taosMemoryFree(segReader[0]);
segReader[0] = NULL;
}
return code;
}
static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
if (reader[0]) {
TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->statisBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL);
taosMemoryFree(reader[0]);
reader[0] = NULL;
}
return 0;
}
int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) {
int32_t code = 0;
int32_t lino = 0;
......@@ -101,22 +62,13 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
TSDB_CHECK_CODE(code, lino, _exit);
}
// open each segment reader
int64_t size = config->file->size;
while (size > 0) {
SSttSegReader *reader1;
code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SSttFooter), &reader1);
TSDB_CHECK_CODE(code, lino, _exit);
// // open each segment reader
int64_t offset = config->file->size - sizeof(SSttFooter);
ASSERT(offset >= TSDB_FHDR_SIZE);
code = TARRAY2_APPEND(reader[0]->readerArray, reader1);
code = tsdbReadFile(reader[0]->fd, offset, (uint8_t *)(reader[0]->footer), sizeof(SSttFooter));
TSDB_CHECK_CODE(code, lino, _exit);
size = reader1->footer->prevFooter;
}
ASSERT(TARRAY2_SIZE(reader[0]->readerArray) == config->file->stt->nseg);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
......@@ -131,20 +83,17 @@ int32_t tsdbSttFileReaderClose(SSttFileReader **reader) {
tFree(reader[0]->bufArr[i]);
}
tsdbCloseFile(&reader[0]->fd);
TARRAY2_DESTROY(reader[0]->readerArray, tsdbSttSegReaderClose);
TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->statisBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL);
taosMemoryFree(reader[0]);
reader[0] = NULL;
}
return 0;
}
int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray) {
readerArray[0] = reader->readerArray;
return 0;
}
// SSttFSegReader
int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) {
int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray **statisBlkArray) {
if (!reader->ctx->statisBlkLoaded) {
if (reader->footer->statisBlkPtr->size > 0) {
ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0);
......@@ -153,8 +102,8 @@ int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **
void *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->statisBlkPtr->offset, data,
reader->footer->statisBlkPtr->size);
int32_t code =
tsdbReadFile(reader->fd, reader->footer->statisBlkPtr->offset, data, reader->footer->statisBlkPtr->size);
if (code) {
taosMemoryFree(data);
return code;
......@@ -172,7 +121,7 @@ int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **
return 0;
}
int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tombBlkArray) {
int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tombBlkArray) {
if (!reader->ctx->tombBlkLoaded) {
if (reader->footer->tombBlkPtr->size > 0) {
ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0);
......@@ -182,7 +131,7 @@ int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tomb
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code =
tsdbReadFile(reader->reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size);
tsdbReadFile(reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size);
if (code) {
taosMemoryFree(data);
return code;
......@@ -200,7 +149,7 @@ int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tomb
return 0;
}
int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) {
int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBlkArray) {
if (!reader->ctx->sttBlkLoaded) {
if (reader->footer->sttBlkPtr->size > 0) {
ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0);
......@@ -209,8 +158,7 @@ int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlk
void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code =
tsdbReadFile(reader->reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size);
int32_t code = tsdbReadFile(reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size);
if (code) {
taosMemoryFree(data);
return code;
......@@ -228,29 +176,27 @@ int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlk
return 0;
}
int32_t tsdbSttFileReadBlockData(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
int32_t tsdbSttFileReadBlockData(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
int32_t code = 0;
int32_t lino = 0;
code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szBlock);
TSDB_CHECK_CODE(code, lino, _exit);
code =
tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szBlock);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecmprBlockData(reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData,
&reader->reader->config->bufArr[1]);
code = tDecmprBlockData(reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
STSchema *pTSchema, int16_t cids[], int32_t ncid) {
int32_t code = 0;
int32_t lino = 0;
......@@ -260,17 +206,17 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s
TSDB_CHECK_CODE(code, lino, _exit);
// uid + version + tskey
code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szKey);
code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szKey);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szKey);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szKey);
TSDB_CHECK_CODE(code, lino, _exit);
// hdr
SDiskDataHdr hdr[1];
int32_t size = 0;
size += tGetDiskDataHdr(reader->reader->config->bufArr[0] + size, hdr);
size += tGetDiskDataHdr(reader->config->bufArr[0] + size, hdr);
ASSERT(hdr->delimiter == TSDB_FILE_DLMT);
......@@ -280,8 +226,8 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s
// uid
if (hdr->uid == 0) {
ASSERT(hdr->szUid);
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szUid, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
(uint8_t **)&bData->aUid, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]);
code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szUid, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
(uint8_t **)&bData->aUid, sizeof(int64_t) * hdr->nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
ASSERT(hdr->szUid == 0);
......@@ -289,14 +235,14 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s
size += hdr->szUid;
// version
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
(uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]);
code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
(uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += hdr->szVer;
// ts
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg,
(uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->reader->config->bufArr[1]);
code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg,
(uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += hdr->szKey;
......@@ -305,11 +251,11 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s
// other columns
if (bData->nColData > 0) {
if (hdr->szBlkCol > 0) {
code = tRealloc(&reader->reader->config->bufArr[0], hdr->szBlkCol);
code = tRealloc(&reader->config->bufArr[0], hdr->szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey,
reader->reader->config->bufArr[0], hdr->szBlkCol);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, reader->config->bufArr[0],
hdr->szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -322,7 +268,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s
while (blockCol && blockCol->cid < colData->cid) {
if (size < hdr->szBlkCol) {
size += tGetBlockCol(reader->reader->config->bufArr[0] + size, blockCol);
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
} else {
ASSERT(size == hdr->szBlkCol);
blockCol = NULL;
......@@ -346,16 +292,15 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s
} else {
int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;
code = tRealloc(&reader->reader->config->bufArr[1], size1);
code = tRealloc(&reader->config->bufArr[1], size1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->reader->fd,
sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset,
reader->reader->config->bufArr[1], size1);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset,
reader->config->bufArr[1], size1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
&reader->reader->config->bufArr[2]);
code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
&reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
......@@ -364,30 +309,29 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *s
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *tombBlock) {
int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk, STombBlock *tombBlock) {
int32_t code = 0;
int32_t lino = 0;
code = tRealloc(&reader->reader->config->bufArr[0], tombBlk->dp->size);
code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->reader->fd, tombBlk->dp->offset, reader->reader->config->bufArr[0], tombBlk->dp->size);
code = tsdbReadFile(reader->fd, tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size);
if (code) TSDB_CHECK_CODE(code, lino, _exit);
int64_t size = 0;
tTombBlockClear(tombBlock);
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); ++i) {
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT,
tombBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec,
&reader->reader->config->bufArr[2]);
code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg,
&reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, &reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_BATCH(&tombBlock->dataArr[i], reader->reader->config->bufArr[1], tombBlk->numRec);
code = TARRAY2_APPEND_BATCH(&tombBlock->dataArr[i], reader->config->bufArr[1], tombBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += tombBlk->size[i];
......@@ -396,31 +340,30 @@ int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk,
ASSERT(size == tombBlk->dp->size);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) {
int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) {
int32_t code = 0;
int32_t lino = 0;
code = tRealloc(&reader->reader->config->bufArr[0], statisBlk->dp->size);
code = tRealloc(&reader->config->bufArr[0], statisBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code =
tsdbReadFile(reader->reader->fd, statisBlk->dp->offset, reader->reader->config->bufArr[0], statisBlk->dp->size);
code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->config->bufArr[0], statisBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
int64_t size = 0;
tStatisBlockClear(statisBlock);
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) {
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT,
statisBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec,
&reader->reader->config->bufArr[2]);
code =
tsdbDecmprData(reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg,
&reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec, &reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_BATCH(statisBlock->dataArr + i, reader->reader->config->bufArr[1], statisBlk->numRec);
code = TARRAY2_APPEND_BATCH(statisBlock->dataArr + i, reader->config->bufArr[1], statisBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += statisBlk->size[i];
......@@ -430,7 +373,7 @@ int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *stat
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
......@@ -696,7 +639,6 @@ _exit:
}
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
writer->footer->prevFooter = writer->config->file.size;
int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer));
if (code) return code;
writer->file->size += sizeof(writer->footer);
......@@ -708,32 +650,24 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
int32_t lino = 0;
// set
writer->file[0] = writer->config->file;
writer->file->stt->nseg++;
if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr;
writer->file[0] = writer->config->file;
// open file
int32_t flag;
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
char fname[TSDB_FILENAME_LEN];
if (writer->file->size > 0) {
flag = TD_FILE_READ | TD_FILE_WRITE;
} else {
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
}
tsdbTFileName(writer->config->tsdb, writer->file, fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
TSDB_CHECK_CODE(code, lino, _exit);
if (writer->file->size == 0) {
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr));
TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += sizeof(hdr);
}
writer->ctx->opened = true;
......@@ -798,22 +732,12 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
tsdbCloseFile(&writer->fd);
ASSERT(writer->config->file.size < writer->file->size);
STFileOp op;
if (writer->config->file.size == 0) {
op = (STFileOp){
ASSERT(writer->file->size > 0);
STFileOp op = (STFileOp){
.optype = TSDB_FOP_CREATE,
.fid = writer->config->file.fid,
.nf = writer->file[0],
};
} else {
op = (STFileOp){
.optype = TSDB_FOP_MODIFY,
.fid = writer->config->file.fid,
.of = writer->config->file,
.nf = writer->file[0],
};
}
code = TARRAY2_APPEND(opArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -826,22 +750,17 @@ _exit:
}
static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
if (writer->config->file.size) { // truncate the file to the original size
ASSERT(writer->config->file.size <= writer->file->size);
if (writer->config->file.size < writer->file->size) {
taosFtruncateFile(writer->fd->pFD, writer->config->file.size);
tsdbCloseFile(&writer->fd);
}
} else { // remove the file
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(writer->config->tsdb, &writer->config->file, fname);
tsdbCloseFile(&writer->fd);
taosRemoveFile(fname);
}
return 0;
}
int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) {
ASSERT(config->file.type == TSDB_FTYPE_STT);
ASSERT(config->file.size == 0);
writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -30,25 +30,22 @@ typedef TARRAY2(STombBlk) TTombBlkArray;
// SSttFileReader ==========================================
typedef struct SSttFileReader SSttFileReader;
typedef struct SSttFileReaderConfig SSttFileReaderConfig;
typedef struct SSttSegReader SSttSegReader;
typedef TARRAY2(SSttFileReader *) TSttFileReaderArray;
typedef TARRAY2(SSttSegReader *) TSttSegReaderArray;
// SSttFileReader
int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader);
int32_t tsdbSttFileReaderClose(SSttFileReader **reader);
int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray);
// SSttSegReader
int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray);
int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray);
int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **delBlkArray);
int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBlkArray);
int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray **statisBlkArray);
int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **delBlkArray);
int32_t tsdbSttFileReadBlockData(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData);
int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
int32_t tsdbSttFileReadBlockData(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData);
int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
STSchema *pTSchema, int16_t cids[], int32_t ncid);
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData);
int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *delBlk, STombBlock *dData);
int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData);
int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *delBlk, STombBlock *dData);
struct SSttFileReaderConfig {
STsdb *tsdb;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册