提交 023281ae 编写于 作者: H Hongze Cheng

more code

上级 faadf5d3
......@@ -178,7 +178,6 @@ option(
"If use dev code"
ON
)
if (${USE_DEV_CODE})
add_definitions(-DUSE_DEV_CODE)
endif(USE_DEV_CODE)
\ No newline at end of file
......@@ -26,26 +26,28 @@ extern "C" {
typedef struct STFileSystem STFileSystem;
typedef enum {
TSDB_FS_EDIT_COMMIT = 1, //
TSDB_FS_EDIT_MERGE
} tsdb_fs_edit_t;
TSDB_FEDIT_COMMIT = 1, //
TSDB_FEDIT_MERGE
} EFEditT;
/* Exposed APIs */
// open/close
int32_t tsdbOpenFileSystem(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback);
int32_t tsdbCloseFileSystem(STFileSystem **ppFS);
// txn
int32_t tsdbFileSystemEditBegin(STFileSystem *pFS, const SArray *aFileOp, tsdb_fs_edit_t etype);
int32_t tsdbFileSystemEditCommit(STFileSystem *pFS, tsdb_fs_edit_t etype);
int32_t tsdbFileSystemEditAbort(STFileSystem *pFS, tsdb_fs_edit_t etype);
int32_t tsdbFSEditBegin(STFileSystem *pFS, const SArray *aFileOp, EFEditT etype);
int32_t tsdbFSEditCommit(STFileSystem *pFS, EFEditT etype);
int32_t tsdbFSEditAbort(STFileSystem *pFS, EFEditT etype);
/* Exposed Structs */
struct STFileSystem {
STsdb *pTsdb;
int32_t state;
tsem_t canEdit;
int64_t nextEditId;
int64_t neid;
SArray *cstate; // current state, SArray<STFileSet>
EFEditT etype;
int64_t eid;
SArray *nstate; // next state, SArray<STFileSet>
};
......
......@@ -357,11 +357,11 @@ static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
int32_t lino;
if (eno == 0) {
TSDB_CHECK_CODE( //
code = tsdbFileSystemEditBegin( //
pCommiter->pTsdb->pFS, //
pCommiter->aFileOp, //
TSDB_FS_EDIT_COMMIT),
TSDB_CHECK_CODE( //
code = tsdbFSEditBegin( //
pCommiter->pTsdb->pFS, //
pCommiter->aFileOp, //
TSDB_FEDIT_COMMIT),
lino, //
_exit);
} else {
......@@ -448,8 +448,8 @@ int32_t tsdbCommitCommit(STsdb *pTsdb) {
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFileSystemEditCommit(pTsdb->pFS, //
TSDB_FS_EDIT_COMMIT);
code = tsdbFSEditCommit(pTsdb->pFS, //
TSDB_FEDIT_COMMIT);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -481,8 +481,8 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbFileSystemEditAbort(pTsdb->pFS, //
TSDB_FS_EDIT_COMMIT);
code = tsdbFSEditAbort(pTsdb->pFS, //
TSDB_FEDIT_COMMIT);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......
......@@ -15,8 +15,8 @@
#include "inc/tsdbFS.h"
#define TSDB_FS_EDIT_MIN TSDB_FS_EDIT_COMMIT
#define TSDB_FS_EDIT_MAX (TSDB_FS_EDIT_MERGE + 1)
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
enum {
TSDB_FS_STATE_NONE = 0,
......@@ -33,8 +33,8 @@ typedef enum {
static const char *gCurrentFname[] = {
[TSDB_FCURRENT] = "current.json",
[TSDB_FCURRENT_C] = "current.json.0",
[TSDB_FCURRENT_M] = "current.json.1",
[TSDB_FCURRENT_C] = "current.c.json",
[TSDB_FCURRENT_M] = "current.m.json",
};
static int32_t create_fs(STsdb *pTsdb, STFileSystem **ppFS) {
......@@ -55,7 +55,7 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **ppFS) {
ppFS[0]->pTsdb = pTsdb;
ppFS[0]->state = TSDB_FS_STATE_NONE;
tsem_init(&ppFS[0]->canEdit, 0, 1);
ppFS[0]->nextEditId = 0;
ppFS[0]->neid = 0;
return 0;
}
......@@ -194,17 +194,17 @@ static int32_t save_fs(int64_t eid, SArray *aTFileSet, const char *fname) {
}
for (int32_t i = 0; i < taosArrayGetSize(aTFileSet); i++) {
STFileSet *pFileSet = (STFileSet *)taosArrayGet(aTFileSet, i);
cJSON *item;
cJSON *tjson = cJSON_CreateObject();
if (tjson == NULL) {
if ((item = cJSON_CreateObject()) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit)
}
code = tsdbFileSetToJson(pFileSet, tjson);
code = tsdbFileSetToJson(pFileSet, item);
TSDB_CHECK_CODE(code, lino, _exit);
cJSON_AddItemToArray(ajson, tjson);
cJSON_AddItemToArray(ajson, item);
}
code = save_json(json, fname);
......@@ -237,8 +237,7 @@ static int32_t load_fs(const char *fname, SArray *aTFileSet, int64_t *eid) {
if (cJSON_IsNumber(item)) {
ASSERT(item->valuedouble == 1);
} else {
code = TSDB_CODE_FILE_CORRUPTED;
TSDB_CHECK_CODE(code, lino, _exit)
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit)
}
/* eid */
......@@ -246,8 +245,7 @@ static int32_t load_fs(const char *fname, SArray *aTFileSet, int64_t *eid) {
if (cJSON_IsNumber(item)) {
eid[0] = item->valuedouble;
} else {
code = TSDB_CODE_FILE_CORRUPTED;
TSDB_CHECK_CODE(code, lino, _exit)
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit)
}
/* fset */
......@@ -255,18 +253,16 @@ static int32_t load_fs(const char *fname, SArray *aTFileSet, int64_t *eid) {
if (cJSON_IsArray(item)) {
const cJSON *titem;
cJSON_ArrayForEach(titem, item) {
STFileSet *pFileSet = taosArrayReserve(aTFileSet, 1);
if (pFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
STFileSet *pFileSet;
if ((pFileSet = taosArrayReserve(aTFileSet, 1)) == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
code = tsdbFileSetFromJson(titem, pFileSet);
TSDB_CHECK_CODE(code, lino, _exit)
}
} else {
code = TSDB_CODE_FILE_CORRUPTED;
TSDB_CHECK_CODE(code, lino, _exit)
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit)
}
_exit:
......@@ -277,44 +273,55 @@ _exit:
return code;
}
static int32_t commit_edit(STFileSystem *pFS, tsdb_fs_edit_t etype) {
int32_t code;
char ofname[TSDB_FILENAME_LEN];
char nfname[TSDB_FILENAME_LEN];
current_fname(pFS->pTsdb, nfname, TSDB_FCURRENT);
current_fname(pFS->pTsdb, ofname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M);
static int32_t commit_edit(STFileSystem *pFS) {
char current[TSDB_FILENAME_LEN];
char current_t[TSDB_FILENAME_LEN];
code = taosRenameFile(ofname, nfname);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
return code;
current_fname(pFS->pTsdb, current, TSDB_FCURRENT);
if (pFS->etype == TSDB_FEDIT_COMMIT) {
current_fname(pFS->pTsdb, current, TSDB_FCURRENT_C);
} else if (pFS->etype == TSDB_FEDIT_MERGE) {
current_fname(pFS->pTsdb, current, TSDB_FCURRENT_M);
} else {
ASSERT(0);
}
ASSERTS(0, "TODO: Do changes to pFS");
int32_t code;
int32_t lino;
if ((code = taosRenameFile(current_t, current))) {
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit)
}
return 0;
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pFS->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t abort_edit(STFileSystem *pFS, tsdb_fs_edit_t etype) {
int32_t code;
char fname[TSDB_FILENAME_LEN];
static int32_t abort_edit(STFileSystem *pFS) {
char fname[TSDB_FILENAME_LEN];
current_fname(pFS->pTsdb, fname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M);
if (pFS->etype == TSDB_FEDIT_COMMIT) {
current_fname(pFS->pTsdb, fname, TSDB_FCURRENT_C);
} else if (pFS->etype == TSDB_FEDIT_MERGE) {
current_fname(pFS->pTsdb, fname, TSDB_FCURRENT_M);
} else {
ASSERT(0);
}
code = taosRemoveFile(fname);
int32_t code = taosRemoveFile(fname);
if (code) code = TAOS_SYSTEM_ERROR(code);
return code;
}
static int32_t scan_file_system(STFileSystem *pFS) {
// ASSERTS(0, "TODO: Not implemented yet");
// TODO
return 0;
}
static int32_t scan_and_schedule_merge(STFileSystem *pFS) {
// ASSERTS(0, "TODO: Not implemented yet");
// TODO
return 0;
}
......@@ -323,12 +330,12 @@ static int32_t update_fs_if_needed(STFileSystem *pFS) {
return 0;
}
static int32_t open_fs(STFileSystem *pFS, int8_t rollback) {
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pFS->pTsdb;
STsdb *pTsdb = fs->pTsdb;
code = update_fs_if_needed(pFS);
code = update_fs_if_needed(fs);
TSDB_CHECK_CODE(code, lino, _exit)
char fCurrent[TSDB_FILENAME_LEN];
......@@ -340,33 +347,37 @@ static int32_t open_fs(STFileSystem *pFS, int8_t rollback) {
current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
if (taosCheckExistFile(fCurrent)) { // current.json exists
code = load_fs(fCurrent, pFS->cstate, &pFS->nextEditId);
code = load_fs(fCurrent, fs->cstate, &fs->neid);
TSDB_CHECK_CODE(code, lino, _exit);
// check current.json.commit existence
if (taosCheckExistFile(cCurrent)) {
// current.c.json exists
fs->etype = TSDB_FEDIT_COMMIT;
if (rollback) {
code = commit_edit(pFS, TSDB_FS_EDIT_COMMIT);
code = abort_edit(fs);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = abort_edit(pFS, TSDB_FS_EDIT_COMMIT);
code = load_fs(cCurrent, fs->nstate, &fs->eid);
TSDB_CHECK_CODE(code, lino, _exit)
code = commit_edit(fs);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// check current.json.t existence
if (taosCheckExistFile(mCurrent)) {
code = abort_edit(pFS, TSDB_FS_EDIT_MERGE);
} else if (taosCheckExistFile(mCurrent)) {
// current.m.json exists
fs->etype = TSDB_FEDIT_MERGE;
code = abort_edit(fs);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = scan_file_system(pFS);
code = scan_file_system(fs);
TSDB_CHECK_CODE(code, lino, _exit);
code = scan_and_schedule_merge(pFS);
code = scan_and_schedule_merge(fs);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = save_fs(0, pFS->nstate, fCurrent);
code = save_fs(0, fs->nstate, fCurrent);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -479,12 +490,12 @@ int32_t tsdbCloseFileSystem(STFileSystem **ppFS) {
return 0;
}
int32_t tsdbFileSystemEditBegin(STFileSystem *pFS, const SArray *aFileOp, tsdb_fs_edit_t etype) {
int32_t tsdbFSEditBegin(STFileSystem *pFS, const SArray *aFileOp, EFEditT etype) {
int32_t code = 0;
int32_t lino;
char fname[TSDB_FILENAME_LEN];
// current_fname(pFS->pTsdb, fname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M);
// current_fname(pFS->pTsdb, fname, etype == TSDB_FEDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M);
// tsem_wait(&pFS->canEdit);
......@@ -514,8 +525,8 @@ _exit:
return code;
}
int32_t tsdbFileSystemEditCommit(STFileSystem *pFS, tsdb_fs_edit_t etype) {
int32_t code = commit_edit(pFS, etype);
int32_t tsdbFSEditCommit(STFileSystem *pFS, EFEditT etype) {
int32_t code = commit_edit(pFS);
tsem_post(&pFS->canEdit);
if (code) {
tsdbError("vgId:%d %s failed since %s", //
......@@ -531,8 +542,8 @@ int32_t tsdbFileSystemEditCommit(STFileSystem *pFS, tsdb_fs_edit_t etype) {
return code;
}
int32_t tsdbFileSystemEditAbort(STFileSystem *pFS, tsdb_fs_edit_t etype) {
int32_t code = abort_edit(pFS, etype);
int32_t tsdbFSEditAbort(STFileSystem *pFS, EFEditT etype) {
int32_t code = abort_edit(pFS);
if (code) {
tsdbError("vgId:%d %s failed since %s, etype:%d", //
TD_VID(pFS->pTsdb->pVnode), //
......
......@@ -69,14 +69,14 @@ static int32_t tsdbCloseMerger(SMerger *pMerger) {
STsdb *pTsdb = pMerger->pTsdb;
code = tsdbFileSystemEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FS_EDIT_MERGE);
code = tsdbFSEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FEDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit)
_exit:
if (code) {
tsdbFileSystemEditAbort(pTsdb->pFS, TSDB_FS_EDIT_MERGE);
tsdbFSEditAbort(pTsdb->pFS, TSDB_FEDIT_MERGE);
} else {
tsdbFileSystemEditCommit(pTsdb->pFS, TSDB_FS_EDIT_MERGE);
tsdbFSEditCommit(pTsdb->pFS, TSDB_FEDIT_MERGE);
}
tsdbDestroyMerger(pMerger);
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册