未验证 提交 e18b010d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #20466 from taosdata/enh/tsdb_optimize

current from binary to json
......@@ -25,7 +25,7 @@ extern "C" {
#define tjsonGetNumberValue(pJson, pName, val, code) \
do { \
uint64_t _tmp = 0; \
int64_t _tmp = 0; \
code = tjsonGetBigIntValue(pJson, pName, &_tmp); \
val = _tmp; \
} while (0)
......
......@@ -29,7 +29,7 @@ extern "C" {
int32_t strdequote(char *src);
size_t strtrim(char *src);
char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote);
TdUcs4* wcsnchr(const TdUcs4* haystack, TdUcs4 needle, size_t len);
TdUcs4 *wcsnchr(const TdUcs4 *haystack, TdUcs4 needle, size_t len);
char **strsplit(char *src, const char *delim, int32_t *num);
char *strtolower(char *dst, const char *src);
......@@ -37,11 +37,11 @@ char *strntolower(char *dst, const char *src, int32_t n);
char *strntolower_s(char *dst, const char *src, int32_t n);
int64_t strnatoi(char *num, int32_t len);
size_t tstrncspn(const char *str, size_t ssize, const char *reject, size_t rsize);
size_t twcsncspn(const TdUcs4 *wcs, size_t size, const TdUcs4 *reject, size_t rsize);
size_t tstrncspn(const char *str, size_t ssize, const char *reject, size_t rsize);
size_t twcsncspn(const TdUcs4 *wcs, size_t size, const TdUcs4 *reject, size_t rsize);
char *strbetween(char *string, char *begin, char *end);
char *paGetToken(char *src, char **token, int32_t *tokenLen);
char *strbetween(char *string, char *begin, char *end);
char *paGetToken(char *src, char **token, int32_t *tokenLen);
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]);
int32_t taosHexStrToByteArray(char hexstr[], char bytes[]);
......@@ -92,12 +92,26 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
}
}
#define TSDB_CHECK(condition, CODE, LINO, LABEL, ERRNO) \
if (!(condition)) { \
(CODE) = (ERRNO); \
(LINO) = __LINE__; \
goto LABEL; \
}
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
if (CODE) { \
LINO = __LINE__; \
if ((CODE)) { \
(LINO) = __LINE__; \
goto LABEL; \
}
#define TSDB_CHECK_NULL(ptr, CODE, LINO, LABEL, ERRNO) \
if ((ptr) == NULL) { \
(CODE) = (ERRNO); \
(LINO) = __LINE__; \
goto LABEL; \
}
#ifdef __cplusplus
}
#endif
......
......@@ -88,6 +88,98 @@ _exit:
return code;
}
extern int32_t tsdbDelFileToJson(const SDelFile *pDelFile, cJSON *pJson);
extern int32_t tsdbJsonToDelFile(const cJSON *pJson, SDelFile *pDelFile);
extern int32_t tsdbDFileSetToJson(const SDFileSet *pSet, cJSON *pJson);
extern int32_t tsdbJsonToDFileSet(const cJSON *pJson, SDFileSet *pDelFile);
static int32_t tsdbFSToJsonStr(STsdbFS *pFS, char **ppStr) {
int32_t code = 0;
int32_t lino = 0;
cJSON *pJson;
ppStr[0] = NULL;
pJson = cJSON_CreateObject();
TSDB_CHECK_NULL(pJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
// format version
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "format", 1), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
// SDelFile
if (pFS->pDelFile) {
code = tsdbDelFileToJson(pFS->pDelFile, cJSON_AddObjectToObject(pJson, "del"));
TSDB_CHECK_CODE(code, lino, _exit);
}
// aDFileSet
cJSON *aSetJson = cJSON_AddArrayToObject(pJson, "file set");
TSDB_CHECK_NULL(aSetJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) {
cJSON *pSetJson = cJSON_CreateObject();
TSDB_CHECK_NULL(pSetJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
cJSON_AddItemToArray(aSetJson, pSetJson);
code = tsdbDFileSetToJson(taosArrayGet(pFS->aDFileSet, iSet), pSetJson);
TSDB_CHECK_CODE(code, lino, _exit);
}
// print
ppStr[0] = cJSON_Print(pJson);
TSDB_CHECK_NULL(ppStr[0], code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
cJSON_Delete(pJson);
if (code) tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
static int32_t tsdbJsonStrToFS(const char *pStr, STsdbFS *pFS) {
int32_t code = 0;
int32_t lino;
cJSON *pJson = cJSON_Parse(pStr);
TSDB_CHECK(pJson, code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
const cJSON *pItem;
// format version
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "format")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
// SDelFile
if (cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "del"))) {
pFS->pDelFile = (SDelFile *)taosMemoryCalloc(1, sizeof(SDelFile));
TSDB_CHECK_NULL(pFS->pDelFile, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
code = tsdbJsonToDelFile(pItem, pFS->pDelFile);
TSDB_CHECK_CODE(code, lino, _exit);
pFS->pDelFile->nRef = 1;
} else {
pFS->pDelFile = NULL;
}
// aDFileSet
taosArrayClear(pFS->aDFileSet);
const cJSON *pSetJson;
TSDB_CHECK(cJSON_IsArray(pItem = cJSON_GetObjectItem(pJson, "file set")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
cJSON_ArrayForEach(pSetJson, pItem) {
SDFileSet *pSet = (SDFileSet *)taosArrayReserve(pFS->aDFileSet, 1);
TSDB_CHECK_NULL(pSet, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
code = tsdbJsonToDFileSet(pSetJson, pSet);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
cJSON_Delete(pJson);
if (code) tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) {
int32_t code = 0;
int32_t lino = 0;
......@@ -132,6 +224,84 @@ _exit:
return code;
}
static int32_t tsdbSaveFSToJsonFile(STsdbFS *pFS, const char *fname) {
int32_t code;
int32_t lino;
char *pData;
code = tsdbFSToJsonStr(pFS, &pData);
if (code) return code;
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t n = taosWriteFile(pFD, pData, strlen(pData) + 1);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
_exit:
taosMemoryFree(pData);
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbLoadFSFromJsonFile(const char *fname, STsdbFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
char *pData = NULL;
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size;
if (taosFStatFile(pFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if ((pData = taosMemoryMalloc(size)) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosReadFile(pFD, pData, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code = tsdbJsonStrToFS(pData, pFS), lino, _exit);
_exit:
if (pData) taosMemoryFree(pData);
if (code) tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
int32_t tsdbFSCreate(STsdbFS *pFS) {
int32_t code = 0;
......@@ -269,7 +439,8 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) {
return 0;
}
static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t, char *current_json,
char *current_json_t) {
SVnode *pVnode = pTsdb->pVnode;
if (pVnode->pTfs) {
if (current) {
......@@ -280,6 +451,14 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
pTsdb->path, TD_DIRSEP);
}
if (current_json) {
snprintf(current_json, TSDB_FILENAME_LEN - 1, "%s%s%s%scurrent.json", tfsGetPrimaryPath(pTsdb->pVnode->pTfs),
TD_DIRSEP, pTsdb->path, TD_DIRSEP);
}
if (current_json_t) {
snprintf(current_json_t, TSDB_FILENAME_LEN - 1, "%s%s%s%scurrent.json.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs),
TD_DIRSEP, pTsdb->path, TD_DIRSEP);
}
} else {
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP);
......@@ -287,6 +466,12 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP);
}
if (current_json) {
snprintf(current_json, TSDB_FILENAME_LEN - 1, "%s%scurrent.json", pTsdb->path, TD_DIRSEP);
}
if (current_json_t) {
snprintf(current_json_t, TSDB_FILENAME_LEN - 1, "%s%scurrent.json.t", pTsdb->path, TD_DIRSEP);
}
}
}
......@@ -702,20 +887,15 @@ _exit:
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tsdbFSCommit(STsdb *pTsdb) {
static int32_t tsdbFSCommitImpl(STsdb *pTsdb, const char *fname, const char *tfname, bool isJson) {
int32_t code = 0;
int32_t lino = 0;
STsdbFS fs = {0};
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, current, current_t);
if (!taosCheckExistFile(current_t)) goto _exit;
if (!taosCheckExistFile(tfname)) goto _exit;
// rename the file
if (taosRenameFile(current_t, current) < 0) {
if (taosRenameFile(tfname, fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -724,7 +904,11 @@ int32_t tsdbFSCommit(STsdb *pTsdb) {
code = tsdbFSCreate(&fs);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbLoadFSFromFile(current, &fs);
if (isJson) {
code = tsdbLoadFSFromJsonFile(fname, &fs);
} else {
code = tsdbLoadFSFromFile(fname, &fs);
}
TSDB_CHECK_CODE(code, lino, _exit);
// apply file change
......@@ -739,18 +923,19 @@ _exit:
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tsdbFSCommit(STsdb *pTsdb) {
char current_json[TSDB_FILENAME_LEN] = {0};
char current_json_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, NULL, NULL, current_json, current_json_t);
return tsdbFSCommitImpl(pTsdb, current_json, current_json_t, true);
}
int32_t tsdbFSRollback(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
char current_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
}
char current_json_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, NULL, NULL, NULL, current_json_t);
(void)taosRemoveFile(current_json_t);
return code;
}
......@@ -766,13 +951,33 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
// open impl
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, current, current_t);
char current_json[TSDB_FILENAME_LEN] = {0};
char current_json_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, current, current_t, current_json, current_json_t);
if (taosCheckExistFile(current)) {
// CURRENT file exists
code = tsdbLoadFSFromFile(current, &pTsdb->fs);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) {
if (rollback) {
(void)taosRemoveFile(current_t);
} else {
code = tsdbFSCommitImpl(pTsdb, current, current_t, false);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tsdbSaveFSToJsonFile(&pTsdb->fs, current_json);
TSDB_CHECK_CODE(code, lino, _exit);
(void)taosRemoveFile(current);
} else if (taosCheckExistFile(current_json)) {
// current.json exists
code = tsdbLoadFSFromJsonFile(current_json, &pTsdb->fs);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_json_t)) {
if (rollback) {
code = tsdbFSRollback(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -782,11 +987,10 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
}
}
} else {
// empty one
code = tsdbSaveFSToFile(&pTsdb->fs, current);
TSDB_CHECK_CODE(code, lino, _exit);
// empty TSDB
ASSERT(!rollback);
code = tsdbSaveFSToJsonFile(&pTsdb->fs, current_json);
TSDB_CHECK_CODE(code, lino, _exit);
}
// scan and fix FS
......@@ -1024,12 +1228,12 @@ _exit:
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
char tfname[TSDB_FILENAME_LEN];
char current_json_t[TSDB_FILENAME_LEN];
tsdbGetCurrentFName(pTsdb, NULL, tfname);
tsdbGetCurrentFName(pTsdb, NULL, NULL, NULL, current_json_t);
// gnrt CURRENT.t
code = tsdbSaveFSToFile(pFSNew, tfname);
// generate current.json
code = tsdbSaveFSToJsonFile(pFSNew, current_json_t);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......
......@@ -92,11 +92,11 @@ static int32_t tGetSmaFile(uint8_t *p, SSmaFile *pSmaFile) {
}
// EXPOSED APIS ==================================================
static char* getFileNamePrefix(STsdb *pTsdb, SDiskID did, int32_t fid, uint64_t commitId, char fname[]) {
const char* p1 = tfsGetDiskPath(pTsdb->pVnode->pTfs, did);
int32_t len = strlen(p1);
static char *getFileNamePrefix(STsdb *pTsdb, SDiskID did, int32_t fid, uint64_t commitId, char fname[]) {
const char *p1 = tfsGetDiskPath(pTsdb->pVnode->pTfs, did);
int32_t len = strlen(p1);
char* p = memcpy(fname, p1, len);
char *p = memcpy(fname, p1, len);
p += len;
*(p++) = TD_DIRSEP[0];
......@@ -121,25 +121,25 @@ static char* getFileNamePrefix(STsdb *pTsdb, SDiskID did, int32_t fid, uint64_t
}
void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]) {
char* p = getFileNamePrefix(pTsdb, did, fid, pHeadF->commitID, fname);
char *p = getFileNamePrefix(pTsdb, did, fid, pHeadF->commitID, fname);
memcpy(p, ".head", 5);
p[5] = 0;
}
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]) {
char* p = getFileNamePrefix(pTsdb, did, fid, pDataF->commitID, fname);
char *p = getFileNamePrefix(pTsdb, did, fid, pDataF->commitID, fname);
memcpy(p, ".data", 5);
p[5] = 0;
}
void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]) {
char* p = getFileNamePrefix(pTsdb, did, fid, pSttF->commitID, fname);
char *p = getFileNamePrefix(pTsdb, did, fid, pSttF->commitID, fname);
memcpy(p, ".stt", 4);
p[4] = 0;
}
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) {
char* p = getFileNamePrefix(pTsdb, did, fid, pSmaF->commitID, fname);
char *p = getFileNamePrefix(pTsdb, did, fid, pSmaF->commitID, fname);
memcpy(p, ".sma", 4);
p[4] = 0;
}
......@@ -280,6 +280,272 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
return n;
}
static int32_t tDiskIdToJson(const SDiskID *pDiskId, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "level", pDiskId->level), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "id", pDiskId->id), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToDiskId(const cJSON *pJson, SDiskID *pDiskId) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// level
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "level")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDiskId->level = (int32_t)pItem->valuedouble;
// id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "id")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDiskId->id = (int32_t)pItem->valuedouble;
_exit:
return code;
}
static int32_t tHeadFileToJson(const SHeadFile *pHeadF, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pHeadF->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pHeadF->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "offset", pHeadF->offset), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToHeadFile(const cJSON *pJson, SHeadFile *pHeadF) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// commit id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pHeadF->commitID = (int64_t)pItem->valuedouble;
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pHeadF->size = (int64_t)pItem->valuedouble;
// offset
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "offset")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pHeadF->offset = (int64_t)pItem->valuedouble;
_exit:
return code;
}
static int32_t tDataFileToJson(const SDataFile *pDataF, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pDataF->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pDataF->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToDataFile(const cJSON *pJson, SDataFile *pDataF) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// commit id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pDataF->commitID = (int64_t)pItem->valuedouble;
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDataF->size = (int64_t)pItem->valuedouble;
_exit:
return code;
}
static int32_t tSmaFileToJson(const SSmaFile *pSmaF, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pSmaF->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pSmaF->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToSmaFile(const cJSON *pJson, SSmaFile *pSmaF) {
int32_t code = 0;
int32_t lino;
// commit id
const cJSON *pItem;
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pSmaF->commitID = (int64_t)pItem->valuedouble;
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSmaF->size = (int64_t)pItem->valuedouble;
_exit:
return code;
}
static int32_t tSttFileToJson(const SSttFile *pSttF, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pSttF->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pSttF->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "offset", pSttF->offset), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToSttFile(const cJSON *pJson, SSttFile *pSttF) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// commit id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pSttF->commitID = (int64_t)pItem->valuedouble;
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSttF->size = (int64_t)pItem->valuedouble;
// offset
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "offset")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSttF->offset = (int64_t)pItem->valuedouble;
_exit:
return code;
}
int32_t tsdbDFileSetToJson(const SDFileSet *pSet, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
code = tDiskIdToJson(&pSet->diskId, cJSON_AddObjectToObject(pJson, "disk id"));
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "fid", pSet->fid), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
// head
code = tHeadFileToJson(pSet->pHeadF, cJSON_AddObjectToObject(pJson, "head"));
TSDB_CHECK_CODE(code, lino, _exit);
// data
code = tDataFileToJson(pSet->pDataF, cJSON_AddObjectToObject(pJson, "data"));
TSDB_CHECK_CODE(code, lino, _exit);
// sma
code = tSmaFileToJson(pSet->pSmaF, cJSON_AddObjectToObject(pJson, "sma"));
TSDB_CHECK_CODE(code, lino, _exit);
// stt array
cJSON *aSttJson = cJSON_AddArrayToObject(pJson, "stt");
TSDB_CHECK_NULL(aSttJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
cJSON *pSttJson = cJSON_CreateObject();
TSDB_CHECK_NULL(pSttJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
cJSON_AddItemToArray(aSttJson, pSttJson);
code = tSttFileToJson(pSet->aSttF[iStt], pSttJson);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
return code;
}
int32_t tsdbJsonToDFileSet(const cJSON *pJson, SDFileSet *pSet) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// disk id
TSDB_CHECK(cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "disk id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
code = tJsonToDiskId(pItem, &pSet->diskId);
TSDB_CHECK_CODE(code, lino, _exit);
// fid
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "fid")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSet->fid = (int32_t)pItem->valuedouble;
// head
TSDB_CHECK(cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "head")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
TSDB_CHECK_NULL(pSet->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code = tJsonToHeadFile(pItem, pSet->pHeadF), lino, _exit);
pSet->pHeadF->nRef = 1;
// data
TSDB_CHECK(cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "data")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
TSDB_CHECK_NULL(pSet->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code = tJsonToDataFile(pItem, pSet->pDataF), lino, _exit);
pSet->pDataF->nRef = 1;
// sma
TSDB_CHECK(cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "sma")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
TSDB_CHECK_NULL(pSet->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code = tJsonToSmaFile(pItem, pSet->pSmaF), lino, _exit);
pSet->pSmaF->nRef = 1;
// stt array
const cJSON *element;
pSet->nSttF = 0;
TSDB_CHECK(cJSON_IsArray(pItem = cJSON_GetObjectItem(pJson, "stt")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
cJSON_ArrayForEach(element, pItem) {
TSDB_CHECK(cJSON_IsObject(element), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSet->aSttF[pSet->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
TSDB_CHECK_NULL(pSet->aSttF[pSet->nSttF], code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code = tJsonToSttFile(element, pSet->aSttF[pSet->nSttF]), lino, _exit);
pSet->aSttF[pSet->nSttF]->nRef = 1;
pSet->nSttF++;
}
_exit:
if (code) tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
// SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTsdb->pVnode->pTfs),
......@@ -305,3 +571,42 @@ int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile) {
return n;
}
int32_t tsdbDelFileToJson(const SDelFile *pDelFile, cJSON *pJson) {
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = 0;
int32_t lino;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pDelFile->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pDelFile->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "offset", pDelFile->offset), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
int32_t tsdbJsonToDelFile(const cJSON *pJson, SDelFile *pDelFile) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// commit id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pDelFile->commitID = cJSON_GetNumberValue(pItem);
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDelFile->size = cJSON_GetNumberValue(pItem);
// offset
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "offset")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDelFile->offset = cJSON_GetNumberValue(pItem);
_exit:
return code;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册