diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index a01de1f4fe9e13a57e6a5f828fcd4ad97bd4a925..6a6a78cd4e198175f5f9cbf0a54757472f31827c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -65,6 +65,8 @@ static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables); static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); static int keyFGroupCompFunc(const void *key, const void *fgroup); +static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); +static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); // Function declaration int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { @@ -141,8 +143,10 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { STsdbRepo *pRepo = (STsdbRepo *)repo; - tsdbAsyncCommit(pRepo); - if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); + if (toCommit) { + tsdbAsyncCommit(pRepo); + if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); + } tsdbCloseFileH(pRepo); tsdbCloseBufPool(pRepo); @@ -179,7 +183,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ // STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; uint32_t magic = 0; - char *fname = NULL; + char * fname = NULL; struct stat fState; @@ -513,6 +517,8 @@ static int32_t tsdbUnsetRepoEnv(char *rootDir) { static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) { int fd = -1; char *fname = NULL; + char buf[TSDB_FILE_HEAD_SIZE] = "\0"; + char *pBuf = buf; fname = tsdbGetCfgFname(rootDir); if (fname == NULL) { @@ -527,8 +533,13 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) { goto _err; } - if (twrite(fd, (void *)pCfg, sizeof(STsdbCfg)) < sizeof(STsdbCfg)) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", pCfg->tsdbId, sizeof(STsdbCfg), fname, + int tlen = tsdbEncodeCfg((void *)(&pBuf), pCfg); + ASSERT(tlen + sizeof(TSCKSUM) <= TSDB_FILE_HEAD_SIZE); + + taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); + + if (twrite(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", pCfg->tsdbId, TSDB_FILE_HEAD_SIZE, fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -553,6 +564,7 @@ _err: static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) { char *fname = NULL; int fd = -1; + char buf[TSDB_FILE_HEAD_SIZE] = "\0"; fname = tsdbGetCfgFname(rootDir); if (fname == NULL) { @@ -567,12 +579,20 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) { goto _err; } - if (tread(fd, (void *)pCfg, sizeof(*pCfg)) < sizeof(*pCfg)) { - tsdbError("failed to read %d bytes from file %s since %s", sizeof(*pCfg), fname, strerror(errno)); + if (tread(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + tsdbError("failed to read %d bytes from file %s since %s", TSDB_FILE_HEAD_SIZE, fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) { + tsdbError("file %s is corrupted", fname); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + goto _err; + } + + tsdbDecodeCfg(buf, pCfg); + tfree(fname); close(fd); @@ -596,7 +616,6 @@ static char *tsdbGetCfgFname(char *rootDir) { return fname; } - static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(STsdbRepo)); if (pRepo == NULL) { @@ -888,6 +907,42 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) { } } +static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) { + int tlen = 0; + + tlen += taosEncodeVariantI32(buf, pCfg->tsdbId); + tlen += taosEncodeFixedI32(buf, pCfg->cacheBlockSize); + tlen += taosEncodeVariantI32(buf, pCfg->totalBlocks); + tlen += taosEncodeVariantI32(buf, pCfg->maxTables); + tlen += taosEncodeVariantI32(buf, pCfg->daysPerFile); + tlen += taosEncodeVariantI32(buf, pCfg->keep); + tlen += taosEncodeVariantI32(buf, pCfg->keep1); + tlen += taosEncodeVariantI32(buf, pCfg->keep2); + tlen += taosEncodeVariantI32(buf, pCfg->minRowsPerFileBlock); + tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock); + tlen += taosEncodeFixedI8(buf, pCfg->precision); + tlen += taosEncodeFixedI8(buf, pCfg->compression); + + return tlen; +} + +static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { + buf = taosDecodeVariantI32(buf, &(pCfg->tsdbId)); + buf = taosDecodeFixedI32(buf, &(pCfg->cacheBlockSize)); + buf = taosDecodeVariantI32(buf, &(pCfg->totalBlocks)); + buf = taosDecodeVariantI32(buf, &(pCfg->maxTables)); + buf = taosDecodeVariantI32(buf, &(pCfg->daysPerFile)); + buf = taosDecodeVariantI32(buf, &(pCfg->keep)); + buf = taosDecodeVariantI32(buf, &(pCfg->keep1)); + buf = taosDecodeVariantI32(buf, &(pCfg->keep2)); + buf = taosDecodeVariantI32(buf, &(pCfg->minRowsPerFileBlock)); + buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock)); + buf = taosDecodeFixedI8(buf, &(pCfg->precision)); + buf = taosDecodeFixedI8(buf, &(pCfg->compression)); + + return buf; +} + static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { // TODO // STsdbCache *pCache = pRepo->tsdbCache; @@ -915,7 +970,8 @@ static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { // pRepo->config.totalBlocks = totalBlocks; // tsdbUnLockRepo((TsdbRepoT *)pRepo); - // tsdbTrace("vgId:%d, tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, totalBlocks); + // tsdbTrace("vgId:%d, tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, + // totalBlocks); return 0; }