提交 8626d3e6 编写于 作者: H Hongze Cheng

more code

上级 b479a0a0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "trbtree.h"
#include "tsdbDef.h"
#ifndef _TSDB_ITER_H_
#define _TSDB_ITER_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SIterMerger SIterMerger;
typedef struct STsdbIter STsdbIter;
typedef TARRAY2(STsdbIter *) TTsdbIterArray;
// STsdbIter ===============
// SIterMerger ===============
int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger);
int32_t tsdbIterMergerClear(SIterMerger **merger);
int32_t tsdbIterMergerNext(SIterMerger *merger);
SRowInfo *tsdbIterMergerGet(SIterMerger *merger);
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid);
#ifdef __cplusplus
}
#endif
#endif /*_TSDB_ITER_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "inc/tsdbIter.h"
// STsdbIter ================
struct STsdbIter {
struct {
bool noMoreData;
} ctx[1];
SRowInfo row[1];
SRBTreeNode node[1];
SBlockData bData[1];
int32_t iRow;
// TODO
};
int32_t tsdbIterNext(STsdbIter *iter) {
// TODO
return 0;
}
static int32_t tsdbIterSkipTableData(STsdbIter *iter) {
// TODO
return 0;
}
static int32_t tsdbIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
STsdbIter *iter1 = TCONTAINER_OF(n1, STsdbIter, node);
STsdbIter *iter2 = TCONTAINER_OF(n2, STsdbIter, node);
return tRowInfoCmprFn(&iter1->row, &iter2->row);
}
// SIterMerger ================
struct SIterMerger {
STsdbIter *iter;
SRBTree iterTree[1];
};
int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger) {
STsdbIter *iter;
merger[0] = taosMemoryCalloc(1, sizeof(*merger[0]));
if (!merger[0]) return TSDB_CODE_OUT_OF_MEMORY;
tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn);
TARRAY2_FOREACH(iterArray, iter) {
if (iter->ctx->noMoreData) continue;
SRBTreeNode *node = tRBTreePut(merger[0]->iterTree, iter->node);
ASSERT(node);
}
return tsdbIterMergerNext(merger[0]);
}
int32_t tsdbIterMergerClear(SIterMerger **merger) {
taosMemoryFree(merger[0]);
return 0;
}
int32_t tsdbIterMergerNext(SIterMerger *merger) {
int32_t code;
int32_t c;
SRBTreeNode *node;
if (merger->iter) {
code = tsdbIterNext(merger->iter);
if (code) return code;
if (merger->iter->ctx->noMoreData) {
merger->iter = NULL;
} else if ((node = tRBTreeMin(merger->iterTree))) {
c = tsdbIterCmprFn(merger->iter->node, node);
ASSERT(c);
if (c > 0) {
node = tRBTreePut(merger->iterTree, merger->iter->node);
merger->iter = NULL;
ASSERT(node);
}
}
}
if (!merger->iter && (node = tRBTreeDropMin(merger->iterTree))) {
merger->iter = TCONTAINER_OF(node, STsdbIter, node);
}
return 0;
}
SRowInfo *tsdbIterMergerGet(SIterMerger *merger) { return merger->iter ? merger->iter->row : NULL; }
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
int32_t code;
int32_t c;
SRBTreeNode *node;
while (merger->iter && tbid->suid == merger->iter->row->suid && tbid->uid == merger->iter->row->uid) {
int32_t code = tsdbIterSkipTableData(merger->iter);
if (code) return code;
if (merger->iter->ctx->noMoreData) {
merger->iter = NULL;
c = tsdbIterCmprFn(merger->iter->node, node);
ASSERT(c);
if (c > 0) {
node = tRBTreePut(merger->iterTree, merger->iter->node);
merger->iter = NULL;
ASSERT(node);
}
}
if (!merger->iter && (node = tRBTreeDropMin(merger->iterTree))) {
merger->iter = TCONTAINER_OF(node, STsdbIter, node);
}
}
return 0;
}
\ No newline at end of file
......@@ -16,18 +16,16 @@
#include "inc/tsdbMerge.h"
typedef struct {
bool launched;
STsdb *tsdb;
// context
struct {
bool opened;
bool toData;
int32_t level;
STFileSet *fset;
SRowInfo *pRowInfo;
SRowInfo *row;
SBlockData bData;
} SMergeCtx;
typedef struct {
STsdb *tsdb;
// context
SMergeCtx ctx;
} ctx[1];
// config
int32_t maxRow;
int32_t minRow;
......@@ -48,7 +46,7 @@ typedef struct {
} SMerger;
static int32_t tsdbMergerOpen(SMerger *merger) {
merger->ctx.launched = true;
merger->ctx->opened = true;
TARRAY2_INIT(&merger->fopArr);
return 0;
}
......@@ -84,20 +82,20 @@ static int32_t tsdbMergeNextRow(SMerger *merger) {
}
static int32_t tsdbMergeToDataWriteTSDataBlock(SMerger *merger) {
if (merger->ctx.bData.nRow == 0) return 0;
if (merger->ctx->bData.nRow == 0) return 0;
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode);
if (merger->ctx.bData.nRow >= merger->minRow) {
// code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx.bData);
if (merger->ctx->bData.nRow >= merger->minRow) {
// code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx->bData);
// TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbSttFWriteTSDataBlock(merger->sttWriter, &merger->ctx.bData);
code = tsdbSttFWriteTSDataBlock(merger->sttWriter, &merger->ctx->bData);
TSDB_CHECK_CODE(code, lino, _exit);
}
tBlockDataReset(&merger->ctx.bData);
tBlockDataReset(&merger->ctx->bData);
_exit:
if (code) {
......@@ -114,28 +112,28 @@ static int32_t tsdbMergeToData(SMerger *merger) {
code = tsdbMergeNextRow(merger);
TSDB_CHECK_CODE(code, lino, _exit);
if (!merger->ctx.pRowInfo) {
if (!merger->ctx->row) {
code = tsdbMergeToDataWriteTSDataBlock(merger);
TSDB_CHECK_CODE(code, lino, _exit);
break;
}
if (!TABLE_SAME_SCHEMA(merger->ctx.bData.suid, merger->ctx.bData.suid, merger->ctx.pRowInfo->suid,
merger->ctx.pRowInfo->uid)) {
if (!TABLE_SAME_SCHEMA(merger->ctx->bData.suid, merger->ctx->bData.suid, merger->ctx->row->suid,
merger->ctx->row->uid)) {
code = tsdbMergeToDataWriteTSDataBlock(merger);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx.pRowInfo, &merger->skmTb);
code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx->row, &merger->skmTb);
TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataInit(&merger->ctx.bData, (TABLEID *)merger->ctx.pRowInfo, merger->skmTb.pTSchema, NULL, 0);
code = tBlockDataInit(&merger->ctx->bData, (TABLEID *)merger->ctx->row, merger->skmTb.pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataAppendRow(&merger->ctx.bData, &merger->ctx.pRowInfo->row, NULL, merger->ctx.pRowInfo->uid);
code = tBlockDataAppendRow(&merger->ctx->bData, &merger->ctx->row->row, NULL, merger->ctx->row->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (merger->ctx.bData.nRow >= merger->maxRow) {
if (merger->ctx->bData.nRow >= merger->maxRow) {
code = tsdbMergeToDataWriteTSDataBlock(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -157,9 +155,9 @@ static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
code = tsdbMergeNextRow(merger);
TSDB_CHECK_CODE(code, lino, _exit);
if (!merger->ctx.pRowInfo) break;
if (!merger->ctx->row) break;
code = tsdbSttFWriteTSData(merger->sttWriter, merger->ctx.pRowInfo);
code = tsdbSttFWriteTSData(merger->sttWriter, merger->ctx->row);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -174,27 +172,27 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode);
STFileSet *fset = merger->ctx.fset;
STFileSet *fset = merger->ctx->fset;
// prepare the merger file set
SSttLvl *lvl;
STFileObj *fobj;
merger->ctx.toData = true;
merger->ctx.level = 0;
merger->ctx->toData = true;
merger->ctx->level = 0;
TARRAY2_FOREACH(&fset->lvlArr, lvl) {
if (lvl->level != merger->ctx.level) {
if (lvl->level != merger->ctx->level) {
lvl = NULL;
break;
}
fobj = TARRAY2_GET(&lvl->farr, 0);
if (fobj->f.stt->nseg < merger->tsdb->pVnode->config.sttTrigger) {
merger->ctx.toData = false;
merger->ctx->toData = false;
break;
} else {
ASSERT(lvl->level == 0 || TARRAY2_SIZE(&lvl->farr) == 1);
merger->ctx.level++;
merger->ctx->level++;
// open the reader
SSttFileReader *reader;
......@@ -249,7 +247,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
.fid = fset->fid,
.cid = merger->cid,
.size = 0,
.stt = {{.level = merger->ctx.level, .nseg = 0}},
.stt = {{.level = merger->ctx->level, .nseg = 0}},
},
};
code = tsdbSttFWriterOpen(&config, &merger->sttWriter);
......@@ -257,7 +255,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
}
// open data file writer
if (merger->ctx.toData) {
if (merger->ctx->toData) {
SDataFileWriterConfig config = {
.tsdb = merger->tsdb,
// TODO
......@@ -286,7 +284,7 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
TSDB_CHECK_CODE(code, lino, _exit);
}
if (merger->ctx.toData) {
if (merger->ctx->toData) {
// code = tsdbDataFWriterClose();
// TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -301,18 +299,18 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
int32_t code = 0;
int32_t lino = 0;
if (merger->ctx.launched == false) {
if (merger->ctx->opened == false) {
code = tsdbMergerOpen(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
merger->ctx.fset = fset;
merger->ctx->fset = fset;
code = tsdbMergeFileSetBegin(merger);
TSDB_CHECK_CODE(code, lino, _exit);
// do merge
if (merger->ctx.toData) {
if (merger->ctx->toData) {
code = tsdbMergeToData(merger);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
......@@ -343,13 +341,9 @@ int32_t tsdbMerge(STsdb *tsdb) {
STFileObj *fobj;
int32_t sttTrigger = vnode->config.sttTrigger;
SMerger merger = {
.tsdb = tsdb,
.ctx =
{
.launched = false,
},
};
SMerger merger[1];
merger->tsdb = tsdb;
merger->ctx->opened = false;
// loop to merge each file set
TARRAY2_FOREACH(&fs->cstate, fset) {
......@@ -363,21 +357,21 @@ int32_t tsdbMerge(STsdb *tsdb) {
fobj = TARRAY2_GET(&lvl0->farr, 0);
if (fobj->f.stt->nseg >= sttTrigger) {
code = tsdbMergeFileSet(&merger, fset);
code = tsdbMergeFileSet(merger, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// end the merge
if (merger.ctx.launched) {
code = tsdbMergerClose(&merger);
if (merger->ctx->opened) {
code = tsdbMergerClose(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else if (merger.ctx.launched) {
TSDB_ERROR_LOG(vid, lino, code);
} else if (merger->ctx->opened) {
tsdbDebug("vgId:%d %s done", vid, __func__);
}
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册