未验证 提交 5caefa1d 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20721 from taosdata/revert-20466-enh/tsdb_optimize

Revert "current from binary to json"
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
#define tjsonGetNumberValue(pJson, pName, val, code) \ #define tjsonGetNumberValue(pJson, pName, val, code) \
do { \ do { \
int64_t _tmp = 0; \ uint64_t _tmp = 0; \
code = tjsonGetBigIntValue(pJson, pName, &_tmp); \ code = tjsonGetBigIntValue(pJson, pName, &_tmp); \
val = _tmp; \ val = _tmp; \
} while (0) } while (0)
......
...@@ -29,7 +29,7 @@ extern "C" { ...@@ -29,7 +29,7 @@ extern "C" {
int32_t strdequote(char *src); int32_t strdequote(char *src);
size_t strtrim(char *src); size_t strtrim(char *src);
char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote); char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote);
TdUcs4 *wcsnchr(const TdUcs4 *haystack, TdUcs4 needle, size_t len); TdUcs4* wcsnchr(const TdUcs4* haystack, TdUcs4 needle, size_t len);
char **strsplit(char *src, const char *delim, int32_t *num); char **strsplit(char *src, const char *delim, int32_t *num);
char *strtolower(char *dst, const char *src); char *strtolower(char *dst, const char *src);
...@@ -92,23 +92,9 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, ...@@ -92,23 +92,9 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} }
} }
#define TSDB_CHECK(condition, CODE, LINO, LABEL, ERRNO) \
if (!(condition)) { \
(CODE) = (ERRNO); \
(LINO) = __LINE__; \
goto LABEL; \
}
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \ #define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
if ((CODE)) { \ if (CODE) { \
(LINO) = __LINE__; \ LINO = __LINE__; \
goto LABEL; \
}
#define TSDB_CHECK_NULL(ptr, CODE, LINO, LABEL, ERRNO) \
if ((ptr) == NULL) { \
(CODE) = (ERRNO); \
(LINO) = __LINE__; \
goto LABEL; \ goto LABEL; \
} }
......
...@@ -88,98 +88,6 @@ _exit: ...@@ -88,98 +88,6 @@ _exit:
return code; return code;
} }
extern int32_t tsdbDelFileToJson(const SDelFile *pDelFile, cJSON *pJson);
extern int32_t tsdbJsonToDelFile(const cJSON *pJson, SDelFile *pDelFile);
extern int32_t tsdbDFileSetToJson(const SDFileSet *pSet, cJSON *pJson);
extern int32_t tsdbJsonToDFileSet(const cJSON *pJson, SDFileSet *pDelFile);
static int32_t tsdbFSToJsonStr(STsdbFS *pFS, char **ppStr) {
int32_t code = 0;
int32_t lino = 0;
cJSON *pJson;
ppStr[0] = NULL;
pJson = cJSON_CreateObject();
TSDB_CHECK_NULL(pJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
// format version
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "format", 1), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
// SDelFile
if (pFS->pDelFile) {
code = tsdbDelFileToJson(pFS->pDelFile, cJSON_AddObjectToObject(pJson, "del"));
TSDB_CHECK_CODE(code, lino, _exit);
}
// aDFileSet
cJSON *aSetJson = cJSON_AddArrayToObject(pJson, "file set");
TSDB_CHECK_NULL(aSetJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) {
cJSON *pSetJson = cJSON_CreateObject();
TSDB_CHECK_NULL(pSetJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
cJSON_AddItemToArray(aSetJson, pSetJson);
code = tsdbDFileSetToJson(taosArrayGet(pFS->aDFileSet, iSet), pSetJson);
TSDB_CHECK_CODE(code, lino, _exit);
}
// print
ppStr[0] = cJSON_Print(pJson);
TSDB_CHECK_NULL(ppStr[0], code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
cJSON_Delete(pJson);
if (code) tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
static int32_t tsdbJsonStrToFS(const char *pStr, STsdbFS *pFS) {
int32_t code = 0;
int32_t lino;
cJSON *pJson = cJSON_Parse(pStr);
TSDB_CHECK(pJson, code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
const cJSON *pItem;
// format version
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "format")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
// SDelFile
if (cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "del"))) {
pFS->pDelFile = (SDelFile *)taosMemoryCalloc(1, sizeof(SDelFile));
TSDB_CHECK_NULL(pFS->pDelFile, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
code = tsdbJsonToDelFile(pItem, pFS->pDelFile);
TSDB_CHECK_CODE(code, lino, _exit);
pFS->pDelFile->nRef = 1;
} else {
pFS->pDelFile = NULL;
}
// aDFileSet
taosArrayClear(pFS->aDFileSet);
const cJSON *pSetJson;
TSDB_CHECK(cJSON_IsArray(pItem = cJSON_GetObjectItem(pJson, "file set")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
cJSON_ArrayForEach(pSetJson, pItem) {
SDFileSet *pSet = (SDFileSet *)taosArrayReserve(pFS->aDFileSet, 1);
TSDB_CHECK_NULL(pSet, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
code = tsdbJsonToDFileSet(pSetJson, pSet);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
cJSON_Delete(pJson);
if (code) tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) { static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -224,84 +132,6 @@ _exit: ...@@ -224,84 +132,6 @@ _exit:
return code; return code;
} }
static int32_t tsdbSaveFSToJsonFile(STsdbFS *pFS, const char *fname) {
int32_t code;
int32_t lino;
char *pData;
code = tsdbFSToJsonStr(pFS, &pData);
if (code) return code;
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t n = taosWriteFile(pFD, pData, strlen(pData) + 1);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
_exit:
taosMemoryFree(pData);
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbLoadFSFromJsonFile(const char *fname, STsdbFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
char *pData = NULL;
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t size;
if (taosFStatFile(pFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if ((pData = taosMemoryMalloc(size)) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosReadFile(pFD, pData, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code = tsdbJsonStrToFS(pData, pFS), lino, _exit);
_exit:
if (pData) taosMemoryFree(pData);
if (code) tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
int32_t tsdbFSCreate(STsdbFS *pFS) { int32_t tsdbFSCreate(STsdbFS *pFS) {
int32_t code = 0; int32_t code = 0;
...@@ -439,8 +269,7 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) { ...@@ -439,8 +269,7 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) {
return 0; return 0;
} }
static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t, char *current_json, static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
char *current_json_t) {
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
if (pVnode->pTfs) { if (pVnode->pTfs) {
if (current) { if (current) {
...@@ -451,14 +280,6 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t, ch ...@@ -451,14 +280,6 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t, ch
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
pTsdb->path, TD_DIRSEP); pTsdb->path, TD_DIRSEP);
} }
if (current_json) {
snprintf(current_json, TSDB_FILENAME_LEN - 1, "%s%s%s%scurrent.json", tfsGetPrimaryPath(pTsdb->pVnode->pTfs),
TD_DIRSEP, pTsdb->path, TD_DIRSEP);
}
if (current_json_t) {
snprintf(current_json_t, TSDB_FILENAME_LEN - 1, "%s%s%s%scurrent.json.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs),
TD_DIRSEP, pTsdb->path, TD_DIRSEP);
}
} else { } else {
if (current) { if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP); snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP);
...@@ -466,12 +287,6 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t, ch ...@@ -466,12 +287,6 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t, ch
if (current_t) { if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP); snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP);
} }
if (current_json) {
snprintf(current_json, TSDB_FILENAME_LEN - 1, "%s%scurrent.json", pTsdb->path, TD_DIRSEP);
}
if (current_json_t) {
snprintf(current_json_t, TSDB_FILENAME_LEN - 1, "%s%scurrent.json.t", pTsdb->path, TD_DIRSEP);
}
} }
} }
...@@ -887,15 +702,20 @@ _exit: ...@@ -887,15 +702,20 @@ _exit:
return code; return code;
} }
static int32_t tsdbFSCommitImpl(STsdb *pTsdb, const char *fname, const char *tfname, bool isJson) { // EXPOSED APIS ====================================================================================
int32_t tsdbFSCommit(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
STsdbFS fs = {0}; STsdbFS fs = {0};
if (!taosCheckExistFile(tfname)) goto _exit; char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, current, current_t);
if (!taosCheckExistFile(current_t)) goto _exit;
// rename the file // rename the file
if (taosRenameFile(tfname, fname) < 0) { if (taosRenameFile(current_t, current) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -904,11 +724,7 @@ static int32_t tsdbFSCommitImpl(STsdb *pTsdb, const char *fname, const char *tfn ...@@ -904,11 +724,7 @@ static int32_t tsdbFSCommitImpl(STsdb *pTsdb, const char *fname, const char *tfn
code = tsdbFSCreate(&fs); code = tsdbFSCreate(&fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (isJson) { code = tsdbLoadFSFromFile(current, &fs);
code = tsdbLoadFSFromJsonFile(fname, &fs);
} else {
code = tsdbLoadFSFromFile(fname, &fs);
}
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// apply file change // apply file change
...@@ -923,19 +739,18 @@ _exit: ...@@ -923,19 +739,18 @@ _exit:
return code; return code;
} }
// EXPOSED APIS ====================================================================================
int32_t tsdbFSCommit(STsdb *pTsdb) {
char current_json[TSDB_FILENAME_LEN] = {0};
char current_json_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, NULL, NULL, current_json, current_json_t);
return tsdbFSCommitImpl(pTsdb, current_json, current_json_t, true);
}
int32_t tsdbFSRollback(STsdb *pTsdb) { int32_t tsdbFSRollback(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
char current_json_t[TSDB_FILENAME_LEN] = {0}; int32_t lino = 0;
tsdbGetCurrentFName(pTsdb, NULL, NULL, NULL, current_json_t);
(void)taosRemoveFile(current_json_t); char current_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
}
return code; return code;
} }
...@@ -951,33 +766,13 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) { ...@@ -951,33 +766,13 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
// open impl // open impl
char current[TSDB_FILENAME_LEN] = {0}; char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0}; char current_t[TSDB_FILENAME_LEN] = {0};
char current_json[TSDB_FILENAME_LEN] = {0}; tsdbGetCurrentFName(pTsdb, current, current_t);
char current_json_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, current, current_t, current_json, current_json_t);
if (taosCheckExistFile(current)) { if (taosCheckExistFile(current)) {
// CURRENT file exists
code = tsdbLoadFSFromFile(current, &pTsdb->fs); code = tsdbLoadFSFromFile(current, &pTsdb->fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) { if (taosCheckExistFile(current_t)) {
if (rollback) {
(void)taosRemoveFile(current_t);
} else {
code = tsdbFSCommitImpl(pTsdb, current, current_t, false);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tsdbSaveFSToJsonFile(&pTsdb->fs, current_json);
TSDB_CHECK_CODE(code, lino, _exit);
(void)taosRemoveFile(current);
} else if (taosCheckExistFile(current_json)) {
// current.json exists
code = tsdbLoadFSFromJsonFile(current_json, &pTsdb->fs);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_json_t)) {
if (rollback) { if (rollback) {
code = tsdbFSRollback(pTsdb); code = tsdbFSRollback(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -987,10 +782,11 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) { ...@@ -987,10 +782,11 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
} }
} }
} else { } else {
// empty TSDB // empty one
ASSERT(!rollback); code = tsdbSaveFSToFile(&pTsdb->fs, current);
code = tsdbSaveFSToJsonFile(&pTsdb->fs, current_json);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(!rollback);
} }
// scan and fix FS // scan and fix FS
...@@ -1228,12 +1024,12 @@ _exit: ...@@ -1228,12 +1024,12 @@ _exit:
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
char current_json_t[TSDB_FILENAME_LEN]; char tfname[TSDB_FILENAME_LEN];
tsdbGetCurrentFName(pTsdb, NULL, NULL, NULL, current_json_t); tsdbGetCurrentFName(pTsdb, NULL, tfname);
// generate current.json // gnrt CURRENT.t
code = tsdbSaveFSToJsonFile(pFSNew, current_json_t); code = tsdbSaveFSToFile(pFSNew, tfname);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
......
...@@ -92,11 +92,11 @@ static int32_t tGetSmaFile(uint8_t *p, SSmaFile *pSmaFile) { ...@@ -92,11 +92,11 @@ static int32_t tGetSmaFile(uint8_t *p, SSmaFile *pSmaFile) {
} }
// EXPOSED APIS ================================================== // EXPOSED APIS ==================================================
static char *getFileNamePrefix(STsdb *pTsdb, SDiskID did, int32_t fid, uint64_t commitId, char fname[]) { static char* getFileNamePrefix(STsdb *pTsdb, SDiskID did, int32_t fid, uint64_t commitId, char fname[]) {
const char *p1 = tfsGetDiskPath(pTsdb->pVnode->pTfs, did); const char* p1 = tfsGetDiskPath(pTsdb->pVnode->pTfs, did);
int32_t len = strlen(p1); int32_t len = strlen(p1);
char *p = memcpy(fname, p1, len); char* p = memcpy(fname, p1, len);
p += len; p += len;
*(p++) = TD_DIRSEP[0]; *(p++) = TD_DIRSEP[0];
...@@ -121,25 +121,25 @@ static char *getFileNamePrefix(STsdb *pTsdb, SDiskID did, int32_t fid, uint64_t ...@@ -121,25 +121,25 @@ static char *getFileNamePrefix(STsdb *pTsdb, SDiskID did, int32_t fid, uint64_t
} }
void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]) { void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]) {
char *p = getFileNamePrefix(pTsdb, did, fid, pHeadF->commitID, fname); char* p = getFileNamePrefix(pTsdb, did, fid, pHeadF->commitID, fname);
memcpy(p, ".head", 5); memcpy(p, ".head", 5);
p[5] = 0; p[5] = 0;
} }
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]) { void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]) {
char *p = getFileNamePrefix(pTsdb, did, fid, pDataF->commitID, fname); char* p = getFileNamePrefix(pTsdb, did, fid, pDataF->commitID, fname);
memcpy(p, ".data", 5); memcpy(p, ".data", 5);
p[5] = 0; p[5] = 0;
} }
void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]) { void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]) {
char *p = getFileNamePrefix(pTsdb, did, fid, pSttF->commitID, fname); char* p = getFileNamePrefix(pTsdb, did, fid, pSttF->commitID, fname);
memcpy(p, ".stt", 4); memcpy(p, ".stt", 4);
p[4] = 0; p[4] = 0;
} }
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) { void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) {
char *p = getFileNamePrefix(pTsdb, did, fid, pSmaF->commitID, fname); char* p = getFileNamePrefix(pTsdb, did, fid, pSmaF->commitID, fname);
memcpy(p, ".sma", 4); memcpy(p, ".sma", 4);
p[4] = 0; p[4] = 0;
} }
...@@ -280,272 +280,6 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) { ...@@ -280,272 +280,6 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
return n; return n;
} }
static int32_t tDiskIdToJson(const SDiskID *pDiskId, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "level", pDiskId->level), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "id", pDiskId->id), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToDiskId(const cJSON *pJson, SDiskID *pDiskId) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// level
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "level")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDiskId->level = (int32_t)pItem->valuedouble;
// id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "id")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDiskId->id = (int32_t)pItem->valuedouble;
_exit:
return code;
}
static int32_t tHeadFileToJson(const SHeadFile *pHeadF, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pHeadF->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pHeadF->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "offset", pHeadF->offset), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToHeadFile(const cJSON *pJson, SHeadFile *pHeadF) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// commit id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pHeadF->commitID = (int64_t)pItem->valuedouble;
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pHeadF->size = (int64_t)pItem->valuedouble;
// offset
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "offset")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pHeadF->offset = (int64_t)pItem->valuedouble;
_exit:
return code;
}
static int32_t tDataFileToJson(const SDataFile *pDataF, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pDataF->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pDataF->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToDataFile(const cJSON *pJson, SDataFile *pDataF) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// commit id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pDataF->commitID = (int64_t)pItem->valuedouble;
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDataF->size = (int64_t)pItem->valuedouble;
_exit:
return code;
}
static int32_t tSmaFileToJson(const SSmaFile *pSmaF, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pSmaF->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pSmaF->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToSmaFile(const cJSON *pJson, SSmaFile *pSmaF) {
int32_t code = 0;
int32_t lino;
// commit id
const cJSON *pItem;
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pSmaF->commitID = (int64_t)pItem->valuedouble;
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSmaF->size = (int64_t)pItem->valuedouble;
_exit:
return code;
}
static int32_t tSttFileToJson(const SSttFile *pSttF, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pSttF->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pSttF->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "offset", pSttF->offset), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
static int32_t tJsonToSttFile(const cJSON *pJson, SSttFile *pSttF) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// commit id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pSttF->commitID = (int64_t)pItem->valuedouble;
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSttF->size = (int64_t)pItem->valuedouble;
// offset
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "offset")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSttF->offset = (int64_t)pItem->valuedouble;
_exit:
return code;
}
int32_t tsdbDFileSetToJson(const SDFileSet *pSet, cJSON *pJson) {
int32_t code = 0;
int32_t lino;
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
code = tDiskIdToJson(&pSet->diskId, cJSON_AddObjectToObject(pJson, "disk id"));
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "fid", pSet->fid), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
// head
code = tHeadFileToJson(pSet->pHeadF, cJSON_AddObjectToObject(pJson, "head"));
TSDB_CHECK_CODE(code, lino, _exit);
// data
code = tDataFileToJson(pSet->pDataF, cJSON_AddObjectToObject(pJson, "data"));
TSDB_CHECK_CODE(code, lino, _exit);
// sma
code = tSmaFileToJson(pSet->pSmaF, cJSON_AddObjectToObject(pJson, "sma"));
TSDB_CHECK_CODE(code, lino, _exit);
// stt array
cJSON *aSttJson = cJSON_AddArrayToObject(pJson, "stt");
TSDB_CHECK_NULL(aSttJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
cJSON *pSttJson = cJSON_CreateObject();
TSDB_CHECK_NULL(pSttJson, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
cJSON_AddItemToArray(aSttJson, pSttJson);
code = tSttFileToJson(pSet->aSttF[iStt], pSttJson);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
return code;
}
int32_t tsdbJsonToDFileSet(const cJSON *pJson, SDFileSet *pSet) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// disk id
TSDB_CHECK(cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "disk id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
code = tJsonToDiskId(pItem, &pSet->diskId);
TSDB_CHECK_CODE(code, lino, _exit);
// fid
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "fid")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSet->fid = (int32_t)pItem->valuedouble;
// head
TSDB_CHECK(cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "head")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
TSDB_CHECK_NULL(pSet->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code = tJsonToHeadFile(pItem, pSet->pHeadF), lino, _exit);
pSet->pHeadF->nRef = 1;
// data
TSDB_CHECK(cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "data")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
TSDB_CHECK_NULL(pSet->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code = tJsonToDataFile(pItem, pSet->pDataF), lino, _exit);
pSet->pDataF->nRef = 1;
// sma
TSDB_CHECK(cJSON_IsObject(pItem = cJSON_GetObjectItem(pJson, "sma")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
TSDB_CHECK_NULL(pSet->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code = tJsonToSmaFile(pItem, pSet->pSmaF), lino, _exit);
pSet->pSmaF->nRef = 1;
// stt array
const cJSON *element;
pSet->nSttF = 0;
TSDB_CHECK(cJSON_IsArray(pItem = cJSON_GetObjectItem(pJson, "stt")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
cJSON_ArrayForEach(element, pItem) {
TSDB_CHECK(cJSON_IsObject(element), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pSet->aSttF[pSet->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
TSDB_CHECK_NULL(pSet->aSttF[pSet->nSttF], code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code = tJsonToSttFile(element, pSet->aSttF[pSet->nSttF]), lino, _exit);
pSet->aSttF[pSet->nSttF]->nRef = 1;
pSet->nSttF++;
}
_exit:
if (code) tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
// SDelFile =============================================== // SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) { void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTsdb->pVnode->pTfs),
...@@ -571,42 +305,3 @@ int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile) { ...@@ -571,42 +305,3 @@ int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile) {
return n; return n;
} }
int32_t tsdbDelFileToJson(const SDelFile *pDelFile, cJSON *pJson) {
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = 0;
int32_t lino;
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "commit id", pDelFile->commitID), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "size", pDelFile->size), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_NULL(cJSON_AddNumberToObject(pJson, "offset", pDelFile->offset), code, lino, _exit,
TSDB_CODE_OUT_OF_MEMORY);
_exit:
return code;
}
int32_t tsdbJsonToDelFile(const cJSON *pJson, SDelFile *pDelFile) {
int32_t code = 0;
int32_t lino;
const cJSON *pItem;
// commit id
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "commit id")), code, lino, _exit,
TSDB_CODE_FILE_CORRUPTED);
pDelFile->commitID = cJSON_GetNumberValue(pItem);
// size
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "size")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDelFile->size = cJSON_GetNumberValue(pItem);
// offset
TSDB_CHECK(cJSON_IsNumber(pItem = cJSON_GetObjectItem(pJson, "offset")), code, lino, _exit, TSDB_CODE_FILE_CORRUPTED);
pDelFile->offset = cJSON_GetNumberValue(pItem);
_exit:
return code;
}
\ No newline at end of file
...@@ -28,7 +28,7 @@ class TDTestCase: ...@@ -28,7 +28,7 @@ class TDTestCase:
self.perf_param = ['apps','connections','consumers','queries','transactions'] self.perf_param = ['apps','connections','consumers','queries','transactions']
self.perf_param_list = ['apps','connections','consumers','queries','trans'] self.perf_param_list = ['apps','connections','consumers','queries','trans']
self.dbname = "db" self.dbname = "db"
self.vgroups = 10 self.vgroups = 4
self.stbname = f'`{tdCom.getLongName(5)}`' self.stbname = f'`{tdCom.getLongName(5)}`'
self.tbname = f'`{tdCom.getLongName(3)}`' self.tbname = f'`{tdCom.getLongName(3)}`'
self.db_param = { self.db_param = {
......
...@@ -292,7 +292,7 @@ class TDTestCase: ...@@ -292,7 +292,7 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare() tdSql.prepare()
# time.sleep(2) # time.sleep(2)
vgroups = "30" vgroups = "4"
sql = "create database db3 vgroups " + vgroups sql = "create database db3 vgroups " + vgroups
tdSql.query(sql) tdSql.query(sql)
sql = "create table db3.stb (ts timestamp, f int) tags (t int)" sql = "create table db3.stb (ts timestamp, f int) tags (t int)"
......
...@@ -181,7 +181,7 @@ class TDTestCase: ...@@ -181,7 +181,7 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare() tdSql.prepare()
# time.sleep(2) # time.sleep(2)
vgroups = "30" vgroups = "4"
sql = "create database db3 vgroups " + vgroups sql = "create database db3 vgroups " + vgroups
tdSql.query(sql) tdSql.query(sql)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册