提交 47b96101 编写于 作者: H Hongze Cheng

more code

上级 61e6608c
......@@ -43,11 +43,11 @@ static int32_t open_committer_writer(SCommitter *pCommitter) {
struct SSttFWriterConf conf = {
.pTsdb = pCommitter->pTsdb,
.pSkmTb = NULL,
.pSkmRow = NULL,
.maxRow = pCommitter->maxRow,
.szPage = pCommitter->pTsdb->pVnode->config.tsdbPageSize,
.cmprAlg = pCommitter->cmprAlg,
.pSkmTb = NULL,
.pSkmRow = NULL,
.aBuf = NULL,
};
......@@ -134,19 +134,28 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) {
break;
}
nRow++;
code = tsdbCommitWriteTSData(pCommitter, (TABLEID *)pTbData, pRow);
TSDB_CHECK_CODE(code, lino, _exit);
nRow++;
}
}
_exit:
if (code) {
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
nRow);
tsdbDebug( //
"vgId:%d %s done, fid:%d nRow:%" PRId64, //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
pCommitter->fid, //
nRow);
}
return code;
}
......@@ -155,7 +164,7 @@ static int32_t commit_delete_data(SCommitter *pCommitter) {
int32_t code = 0;
int32_t lino;
// ASSERTS(0, "TODO: Not implemented yet");
ASSERTS(0, "TODO: Not implemented yet");
int64_t nDel = 0;
SMemTable *pMem = pCommitter->pTsdb->imem;
......@@ -197,9 +206,14 @@ static int32_t start_commit_file_set(SCommitter *pCommitter) {
pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
pCommitter->nextKey = TSKEY_MAX;
tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d",
TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, pCommitter->minKey, pCommitter->maxKey,
pCommitter->expLevel);
tsdbDebug( //
"vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
pCommitter->fid, //
pCommitter->minKey, //
pCommitter->maxKey, //
pCommitter->expLevel);
return 0;
}
......@@ -228,8 +242,10 @@ static int32_t commit_next_file_set(SCommitter *pCommitter) {
code = commit_timeseries_data(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
/* TODO
code = commit_delete_data(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
*/
// fset commit end
code = end_commit_file_set(pCommitter);
......@@ -255,14 +271,14 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom
pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
pCommitter->sttTrigger = 0; // TODO
pCommitter->sttTrigger = 7; // TODO
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
if (pCommitter->aTbDataP == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCommitter->aFileOp = taosArrayInit(10, sizeof(struct SFileOp));
pCommitter->aFileOp = taosArrayInit(16, sizeof(struct SFileOp));
if (pCommitter->aFileOp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -273,9 +289,17 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
tsdbDebug( //
"vgId:%d %s done", //
TD_VID(pTsdb->pVnode), //
__func__);
}
return code;
}
......
......@@ -117,9 +117,9 @@ static int32_t fs_to_json_str(struct STFileSystem *pFS, char **ppData) {
}
/* format version */
TSDB_CHECK_NULL( //
cJSON_AddNumberToObject(pJson, //
"format", //
TSDB_CHECK_NULL( //
cJSON_AddNumberToObject(pJson, //
"version", //
1 /* TODO */),
code, //
lino, //
......@@ -127,9 +127,9 @@ static int32_t fs_to_json_str(struct STFileSystem *pFS, char **ppData) {
TSDB_CODE_OUT_OF_MEMORY);
/* next edit id */
TSDB_CHECK_NULL( //
cJSON_AddNumberToObject(pJson, //
"next edit id", //
TSDB_CHECK_NULL( //
cJSON_AddNumberToObject(pJson, //
"edit id", //
pFS->nextEditId),
code, //
lino, //
......
......@@ -394,7 +394,7 @@ _exit:
return code;
}
int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter) {
int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter, int8_t abort, struct SFileOp *op) {
int32_t vgId = TD_VID(ppWriter[0]->config.pTsdb->pVnode);
int32_t code = 0;
int32_t lino;
......
......@@ -23,13 +23,6 @@ extern "C" {
#endif
struct SSttFWriter;
struct SSttFWriterConf;
int32_t tsdbSttFWriterOpen(const struct SSttFWriterConf *pConf, struct SSttFWriter **ppWriter);
int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter);
int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow);
int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData);
struct SSttFWriterConf {
STsdb *pTsdb;
struct STFile file;
......@@ -41,6 +34,11 @@ struct SSttFWriterConf {
uint8_t **aBuf;
};
int32_t tsdbSttFWriterOpen(const struct SSttFWriterConf *pConf, struct SSttFWriter **ppWriter);
int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter, int8_t abort, struct SFileOp *op);
int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow);
int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData);
#ifdef __cplusplus
}
#endif
......
......@@ -19,6 +19,8 @@
#ifdef USE_DEV_CODE
extern int32_t tsdbPreCommit(STsdb *pTsdb);
extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo);
extern int32_t tsdbCommitCommit(STsdb *pTsdb);
extern int32_t tsdbCommitAbort(STsdb *pTsdb);
#endif
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
......@@ -455,7 +457,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
// commit each sub-system
#ifdef USE_DEV_CODE
code = tsdbCommitBegin(pVnode->pTsdb, pInfo);
#else
......@@ -479,7 +480,11 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
TSDB_CHECK_CODE(code, lino, _exit);
}
#ifdef USE_DEV_CODE
code = tsdbCommitCommit(pVnode->pTsdb);
#else
code = tsdbFinishCommit(pVnode->pTsdb);
#endif
TSDB_CHECK_CODE(code, lino, _exit);
if (VND_IS_RSMA(pVnode)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册