提交 4775b26e 编写于 作者: H Hongze Cheng

more code

上级 166a86db
......@@ -2509,7 +2509,7 @@ int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) {
if (!(pBind->num == 1 && pBind->is_null && *pBind->is_null)) {
ASSERT(pColData->type == pBind->buffer_type);
}
if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type
for (int32_t i = 0; i < pBind->num; ++i) {
if (pBind->is_null && pBind->is_null[i]) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbFS.h"
#include "tsdbSttFReaderWriter.h"
#ifndef _TD_TSDB_MERGE_H_
#define _TD_TSDB_MERGE_H_
#ifdef __cplusplus
extern "C" {
#endif
/* Exposed Handle */
/* Exposed APIs */
/* Exposed Structs */
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_MERGE_H_*/
\ No newline at end of file
......@@ -442,7 +442,9 @@ int32_t tsdbCommitCommit(STsdb *pTsdb) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable, NULL, true);
// check merge (TODO: remove here)
// TODO: make this call async
code = tsdbMerge(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
......
......@@ -13,103 +13,127 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dev.h"
#include "inc/tsdbMerge.h"
typedef struct {
STsdb *pTsdb;
STFileSet *pSet;
bool launched;
} SMergeCtx;
SBlockData bData;
// reader
SSttFileReader *pSttFReader;
int32_t nSttFSegReader;
SSttFSegReader *aSttFSegReader;
// writer
SSttFileWriter *pSttFWriter;
SDataFileWriter *pDataFWriter;
SArray *aFileOp; // SArray<struct STFileOp>
typedef struct {
STsdb *tsdb;
SMergeCtx ctx;
TFileOpArray fopArr;
} SMerger;
static int32_t tsdbFileSystemShouldMerge(STsdb *pTsdb) {
ASSERTS(0, "TODO: not implemented yet");
// TODO
return 0;
}
static int32_t tsdbFileSetShouldMerge(struct STFileSet *pSet) {
ASSERTS(0, "TODO: not implemented yet");
// TODO
static int32_t tsdbMergerOpen(SMerger *merger) {
merger->ctx.launched = true;
TARRAY2_INIT(&merger->fopArr);
return 0;
}
static int32_t tsdbFileSetMerge(struct STFileSet *pFileSet) {
ASSERTS(0, "TODO: not implemented yet");
static int32_t tsdbMergerClose(SMerger *merger) {
// TODO
ASSERT(0);
return 0;
}
static int32_t tsdbOpenMerger(STsdb *pTsdb, SMerger *pMerger) {
pMerger->pTsdb = pTsdb;
// TODO
return 0;
}
static int32_t tsdbDestroyMerger(SMerger *pMerger) {
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
int32_t code = 0;
// TODO
return code;
}
int32_t lino = 0;
static int32_t tsdbCloseMerger(SMerger *pMerger) {
int32_t code = 0;
int32_t lino;
if (merger->ctx.launched == false) {
code = tsdbMergerOpen(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
STsdb *pTsdb = pMerger->pTsdb;
{ // prepare the merger file set
SSttLvl *lvl;
STFileObj *fobj;
TARRAY2_FOREACH(&fset->lvlArr, lvl) {
TARRAY2_FOREACH(&lvl->farr, fobj) {
if (fobj->f.stt.nseg >= merger->tsdb->pVnode->config.sttTrigger) {
STFileOp op = {
.fid = fset->fid,
.optype = TSDB_FOP_REMOVE,
.of = fobj->f,
};
code = TARRAY2_APPEND(&merger->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if (lvl->level == 0) {
continue;
} else {
// TODO
}
}
}
}
}
// code = tsdbFSEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FEDIT_MERGE);
// TSDB_CHECK_CODE(code, lino, _exit)
{
// do merge the file set
}
{ // end merge the file set
}
_exit:
if (code) {
tsdbFSEditAbort(pTsdb->pFS);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbFSEditCommit(pTsdb->pFS);
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
}
tsdbDestroyMerger(pMerger);
return code;
return 0;
}
int32_t tsdbMerge(STsdb *pTsdb) {
int32_t tsdbMerge(STsdb *tsdb) {
int32_t code = 0;
int32_t lino;
if (!tsdbFileSystemShouldMerge(pTsdb)) {
goto _exit;
SVnode *vnode = tsdb->pVnode;
int32_t vid = TD_VID(vnode);
STFileSystem *fs = tsdb->pFS;
STFileSet *fset;
STFileObj *fobj;
int32_t sttTrigger = vnode->config.sttTrigger;
SMerger merger = {
.tsdb = tsdb,
.ctx =
{
.launched = false,
},
};
// loop to merge each file set
TARRAY2_FOREACH(&fs->cstate, fset) {
SSttLvl *lvl0 = tsdbTFileSetGetLvl(fset, 0);
if (lvl0 == NULL) {
continue;
}
ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0);
fobj = TARRAY2_GET(&lvl0->farr, 0);
if (fobj->f.stt.nseg >= sttTrigger) {
code = tsdbMergeFileSet(&merger, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
SMerger pMerger = {0};
code = tsdbOpenMerger(pTsdb, &pMerger);
TSDB_CHECK_CODE(code, lino, _exit);
// for (int32_t i = 0; i < taosArrayGetSize(pTsdb->pFS->cstate); i++) {
// struct STFileSet *pFileSet = taosArrayGet(pTsdb->pFS->cstate, i);
// if (!tsdbFileSetShouldMerge(pFileSet)) {
// continue;
// }
// code = tsdbFileSetMerge(pFileSet);
// TSDB_CHECK_CODE(code, lino, _exit);
// }
code = tsdbCloseMerger(&pMerger);
TSDB_CHECK_CODE(code, lino, _exit);
// end the merge
if (merger.ctx.launched) {
code = tsdbMergerClose(&merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done, do merge: %d", vid, __func__, merger.ctx.launched);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册