diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index db78db2e287105a36cfe3c30e3e6d59735d64b62..8f5467b7d64fbf37eb91f85ae535d08fe39168de 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -155,7 +155,7 @@ static int32_t commit_delete_data(SCommitter *pCommitter) { int32_t code = 0; int32_t lino; - ASSERTS(0, "not implemented yet"); + // ASSERTS(0, "TODO: Not implemented yet"); int64_t nDel = 0; SMemTable *pMem = pCommitter->pTsdb->imem; @@ -284,7 +284,9 @@ static int32_t close_committer(SCommitter *pCommiter, int32_t eno) { int32_t code = 0; int32_t lino; - // code = tsdbFSBegin(pCommiter->pTsdb, pCommiter->aFileOp); + code = tsdbFileSystemEditBegin(pCommiter->pTsdb->pFS, // + pCommiter->aFileOp, // + TSDB_FS_EDIT_COMMIT); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -329,57 +331,77 @@ int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", // + TD_VID(pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); } else { - tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pMem->nRow, + tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, // + TD_VID(pTsdb->pVnode), // + __func__, // + pMem->nRow, // pMem->nDel); } return code; } -// int32_t tsdbCommitCommit(STsdb *pTsdb) { -// int32_t code = 0; -// int32_t lino = 0; -// SMemTable *pMemTable = pTsdb->imem; - -// // lock -// taosThreadRwlockWrlock(&pTsdb->rwLock); - -// code = tsdbFSCommit(pTsdb); -// if (code) { -// taosThreadRwlockUnlock(&pTsdb->rwLock); -// TSDB_CHECK_CODE(code, lino, _exit); -// } - -// pTsdb->imem = NULL; - -// // unlock -// taosThreadRwlockUnlock(&pTsdb->rwLock); -// if (pMemTable) { -// tsdbUnrefMemTable(pMemTable, NULL, true); -// } - -// _exit: -// if (code) { -// tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); -// } else { -// tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode)); -// } -// return code; -// } - -// int32_t tsdbCommitAbort(STsdb *pTsdb) { -// int32_t code = 0; -// int32_t lino = 0; - -// code = tsdbFSRollback(pTsdb); -// TSDB_CHECK_CODE(code, lino, _exit); - -// _exit: -// if (code) { -// tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); -// } else { -// tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode)); -// } -// return code; -// } \ No newline at end of file +int32_t tsdbCommitCommit(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + SMemTable *pMemTable = pTsdb->imem; + + // lock + taosThreadRwlockWrlock(&pTsdb->rwLock); + + code = tsdbFileSystemEditCommit(pTsdb->pFS, // + TSDB_FS_EDIT_COMMIT); + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pTsdb->imem = NULL; + + // unlock + taosThreadRwlockUnlock(&pTsdb->rwLock); + if (pMemTable) { + tsdbUnrefMemTable(pMemTable, NULL, true); + } + +_exit: + if (code) { + tsdbError("vgId:%d, %s failed at line %d since %s", // + TD_VID(pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done", // + TD_VID(pTsdb->pVnode), __func__); + } + return code; +} + +int32_t tsdbCommitAbort(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbFileSystemEditAbort(pTsdb->pFS, // + TSDB_FS_EDIT_COMMIT); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d, %s failed at line %d since %s", // + TD_VID(pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done", // + TD_VID(pTsdb->pVnode), // + __func__); + } + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index e909b78c8c04223e3d0e0fae5d16999176fceae0..a0af60c05270ba854e2b929c1506f4b7dc0d6dbc 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -26,7 +26,7 @@ static int32_t create_file_system(STsdb *pTsdb, struct STFileSystem **ppFS) { } ppFS[0]->pTsdb = pTsdb; - tsem_init(&ppFS[0]->can_edit, 0, 1); + tsem_init(&ppFS[0]->canEdit, 0, 1); return 0; } @@ -34,7 +34,7 @@ static int32_t create_file_system(STsdb *pTsdb, struct STFileSystem **ppFS) { static int32_t destroy_file_system(struct STFileSystem **ppFS) { if (ppFS[0]) { taosArrayDestroy(ppFS[0]->aFileSet); - tsem_destroy(&ppFS[0]->can_edit); + tsem_destroy(&ppFS[0]->canEdit); taosMemoryFree(ppFS[0]); ppFS[0] = NULL; } @@ -42,27 +42,193 @@ static int32_t destroy_file_system(struct STFileSystem **ppFS) { } static int32_t get_current_json(STsdb *pTsdb, char fname[]) { - snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json"); + if (pTsdb->pVnode->pTfs) { + snprintf(fname, // + TSDB_FILENAME_LEN, // + "%s%s%s%s%s", // + tfsGetPrimaryPath(pTsdb->pVnode->pTfs), // + TD_DIRSEP, // + pTsdb->path, // + TD_DIRSEP, // + "current.json"); + } else { + snprintf(fname, // + TSDB_FILENAME_LEN, // + "%s%s%s", // + pTsdb->path, // + TD_DIRSEP, // + "current.json"); + } return 0; } static int32_t get_current_temp(STsdb *pTsdb, char fname[], tsdb_fs_edit_t etype) { switch (etype) { case TSDB_FS_EDIT_COMMIT: - snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json.commit"); + if (pTsdb->pVnode->pTfs) { + snprintf(fname, // + TSDB_FILENAME_LEN, // + "%s%s%s%s%s", // + tfsGetPrimaryPath(pTsdb->pVnode->pTfs), // + TD_DIRSEP, // + pTsdb->path, // + TD_DIRSEP, // + "current.json.commit"); + } else { + snprintf(fname, // + TSDB_FILENAME_LEN, // + "%s%s%s", // + pTsdb->path, // + TD_DIRSEP, // + "current.json.commit"); + } break; default: - snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json.t"); + if (pTsdb->pVnode->pTfs) { + snprintf(fname, // + TSDB_FILENAME_LEN, // + "%s%s%s%s%s", // + tfsGetPrimaryPath(pTsdb->pVnode->pTfs), // + TD_DIRSEP, // + pTsdb->path, // + TD_DIRSEP, // + "current.json.t"); + } else { + snprintf(fname, // + TSDB_FILENAME_LEN, // + "%s%s%s", // + pTsdb->path, // + TD_DIRSEP, // + "current.json.t"); + } break; } return 0; } -static int32_t save_fs_to_file(struct STFileSystem *pFS, const char *fname) { - cJSON *pJson = NULL; +static int32_t fs_to_json_str(struct STFileSystem *pFS, char **ppData) { + int32_t code = 0; + int32_t lino; + + cJSON *pJson = cJSON_CreateObject(); + if (pJson == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + /* format version */ + TSDB_CHECK_NULL( // + cJSON_AddNumberToObject(pJson, // + "format", // + 1 /* TODO */), + code, // + lino, // + _exit, // + TSDB_CODE_OUT_OF_MEMORY); + + /* next edit id */ + TSDB_CHECK_NULL( // + cJSON_AddNumberToObject(pJson, // + "next edit id", // + pFS->nextEditId), + code, // + lino, // + _exit, // + TSDB_CODE_OUT_OF_MEMORY); + + /* file sets */ + cJSON *aFileSetJson; + TSDB_CHECK_NULL( // + aFileSetJson = cJSON_AddArrayToObject(pJson, "file sets"), // + code, // + lino, // + _exit, // + TSDB_CODE_OUT_OF_MEMORY); + + for (int32_t i = 0; i < taosArrayGetSize(pFS->aFileSet); i++) { + struct SFileSet *pFileSet = taosArrayGet(pFS->aFileSet, i); + + code = tsdbFileSetToJson(aFileSetJson, pFileSet); + TSDB_CHECK_CODE(code, lino, _exit); + } + + ppData[0] = cJSON_Print(pJson); + if (ppData[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + cJSON_Delete(pJson); + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", // + TD_VID(pFS->pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); + } + return code; +} + +static int32_t fs_from_json_str(const char *pData, struct STFileSystem *pFS) { + int32_t code = 0; + int32_t lino; + ASSERTS(0, "TODO: Not implemented yet"); - return 0; + +_exit: + return code; +} + +static int32_t save_fs_to_file(struct STFileSystem *pFS, const char *fname) { + int32_t code = 0; + int32_t lino; + char *pData = NULL; + + // to json string + code = fs_to_json_str(pFS, &pData); + TSDB_CHECK_CODE(code, lino, _exit); + + TdFilePtr fd = taosOpenFile(fname, // + TD_FILE_WRITE // + | TD_FILE_CREATE // + | TD_FILE_TRUNC); + if (fd == NULL) { + code = TAOS_SYSTEM_ERROR(code); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t n = taosWriteFile(fd, pData, strlen(pData) + 1); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(code); + taosCloseFile(&fd); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (taosFsyncFile(fd) < 0) { + code = TAOS_SYSTEM_ERROR(code); + taosCloseFile(&fd); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosCloseFile(&fd); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", // + TD_VID(pFS->pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s success", // + TD_VID(pFS->pTsdb->pVnode), // + __func__); + } + if (pData) { + taosMemoryFree(pData); + } + return code; } static int32_t load_fs_from_file(const char *fname, struct STFileSystem *pFS) { @@ -231,7 +397,7 @@ int32_t tsdbFileSystemEditBegin(struct STFileSystem *pFS, const SArray *aFileOp, get_current_temp(pFS->pTsdb, fname, etype); - tsem_wait(&pFS->can_edit); + tsem_wait(&pFS->canEdit); code = write_fs_to_file(pFS, fname); TSDB_CHECK_CODE(code, lino, _exit); @@ -254,7 +420,7 @@ _exit: int32_t tsdbFileSystemEditCommit(struct STFileSystem *pFS, tsdb_fs_edit_t etype) { int32_t code = commit_edit(pFS, etype); - tsem_post(&pFS->can_edit); + tsem_post(&pFS->canEdit); if (code) { tsdbError("vgId:%d %s failed since %s", // TD_VID(pFS->pTsdb->pVnode), // @@ -279,6 +445,6 @@ int32_t tsdbFileSystemEditAbort(struct STFileSystem *pFS, tsdb_fs_edit_t etype) etype); } else { } - tsem_post(&pFS->can_edit); + tsem_post(&pFS->canEdit); return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.h b/source/dnode/vnode/src/tsdb/dev/tsdbFS.h index bedb6669a9c1fa315b81769f27b0f5e5452a2678..69fefd6fb01fd8b0e125e12920221965b86d1e1f 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.h @@ -25,8 +25,8 @@ extern "C" { /* Exposed Handle */ struct STFileSystem { STsdb *pTsdb; - tsem_t can_edit; - int64_t eidt_id; + tsem_t canEdit; + int64_t nextEditId; SArray *aFileSet; // SArray }; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c index 39c65c8585426e635aac8fea5f7284d84873c33a..26eef30c57424ce07763a497300aad8109226800 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c @@ -14,3 +14,12 @@ */ #include "dev.h" + +int32_t tsdbFileSetToJson(SJson *pJson, const struct SFileSet *pSet) { + int32_t code = 0; + + ASSERTS(0, "TODO: Not implemented yet"); + +_exit: + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.h b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.h index e0b95d795e8e406a4ebe82111cde920b120819b9..95844b77aa9771aeff770bc8fd13ed5a41c76bc7 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.h @@ -52,6 +52,8 @@ struct SFileSet { } lStt[TSDB_STT_FILE_LEVEL_MAX]; }; +int32_t tsdbFileSetToJson(SJson *pJson, const struct SFileSet *pSet); + #ifdef __cplusplus } #endif