提交 f2f353c0 编写于 作者: H Hongze Cheng

finish refact

上级 c29b78d3
...@@ -81,9 +81,9 @@ typedef void TSDB_REPO_T; // use void to hide implementation details from outsi ...@@ -81,9 +81,9 @@ typedef void TSDB_REPO_T; // use void to hide implementation details from outsi
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo); STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
// --------- TSDB REPOSITORY DEFINITION // --------- TSDB REPOSITORY DEFINITION
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg); int32_t tsdbCreateRepo(int repoid);
int32_t tsdbDropRepo(char *rootDir); int32_t tsdbDropRepo(int repoid);
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH); TSDB_REPO_T *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit); int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg);
int tsdbGetState(TSDB_REPO_T *repo); int tsdbGetState(TSDB_REPO_T *repo);
...@@ -120,7 +120,6 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); ...@@ -120,7 +120,6 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
// TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
......
...@@ -3,7 +3,6 @@ PROJECT(TDengine) ...@@ -3,7 +3,6 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
list(REMOVE_ITEM SRC "src/tsdbFS.c")
ADD_LIBRARY(tsdb ${SRC}) ADD_LIBRARY(tsdb ${SRC})
TARGET_LINK_LIBRARIES(tsdb tfs common tutil) TARGET_LINK_LIBRARIES(tsdb tfs common tutil)
......
...@@ -69,10 +69,10 @@ typedef struct { ...@@ -69,10 +69,10 @@ typedef struct {
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFS *tsdbNewFS(int keep, int days); STsdbFS *tsdbNewFS(STsdbCfg *pCfg);
void * tsdbFreeFS(STsdbFS *pfs); void * tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdbFS *pFs, int keep, int days); int tsdbOpenFS(STsdbRepo *pRepo);
void tsdbCloseFS(STsdbFS *pFs); void tsdbCloseFS(STsdbRepo *pRepo);
void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd); void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdbFS *pfs); int tsdbEndFSTxn(STsdbFS *pfs);
int tsdbEndFSTxnWithError(STsdbFS *pfs); int tsdbEndFSTxnWithError(STsdbFS *pfs);
......
...@@ -71,7 +71,6 @@ typedef struct STsdbRepo STsdbRepo; ...@@ -71,7 +71,6 @@ typedef struct STsdbRepo STsdbRepo;
struct STsdbRepo { struct STsdbRepo {
int8_t state; int8_t state;
char* rootDir;
STsdbCfg config; STsdbCfg config;
STsdbAppH appH; STsdbAppH appH;
STsdbStat stat; STsdbStat stat;
...@@ -92,12 +91,8 @@ struct STsdbRepo { ...@@ -92,12 +91,8 @@ struct STsdbRepo {
#define IS_REPO_LOCKED(r) (r)->repoLocked #define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
char* tsdbGetMetaFileName(char* rootDir);
void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname);
int tsdbLockRepo(STsdbRepo* pRepo); int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo);
char* tsdbGetDataDirName(char* rootDir);
int tsdbGetNextMaxTables(int tid);
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo);
...@@ -114,6 +109,18 @@ static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { ...@@ -114,6 +109,18 @@ static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
return pBufBlock; return pBufBlock;
} }
static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
int maxTables = TSDB_INIT_NTABLES;
while (true) {
maxTables = MIN(maxTables, TSDB_MAX_TABLES);
if (tid <= maxTables) break;
maxTables *= 2;
}
return maxTables + 1;
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -19,6 +19,11 @@ ...@@ -19,6 +19,11 @@
#define TSDB_FS_TEMP_FNAME "current.t" #define TSDB_FS_TEMP_FNAME "current.t"
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3) #define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
static int tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbApplyFSTxn(STsdbFS *pfs);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
// ================== CURRENT file header info // ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
int tlen = 0; int tlen = 0;
...@@ -29,7 +34,7 @@ static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { ...@@ -29,7 +34,7 @@ static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
return tlen; return tlen;
} }
static void *tsdbDecodeFSHeader(void *buf, SFSHeader *pHeader) { static UNUSED_FUNC void *tsdbDecodeFSHeader(void *buf, SFSHeader *pHeader) {
buf = taosDecodeFixedU32(buf, &(pHeader->version)); buf = taosDecodeFixedU32(buf, &(pHeader->version));
buf = taosDecodeFixedU32(buf, &(pHeader->len)); buf = taosDecodeFixedU32(buf, &(pHeader->len));
...@@ -47,7 +52,7 @@ static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) { ...@@ -47,7 +52,7 @@ static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
return tlen; return tlen;
} }
static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) { static UNUSED_FUNC void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
buf = taosDecodeFixedU32(buf, &(pMeta->version)); buf = taosDecodeFixedU32(buf, &(pMeta->version));
buf = taosDecodeFixedI64(buf, &(pMeta->totalPoints)); buf = taosDecodeFixedI64(buf, &(pMeta->totalPoints));
buf = taosDecodeFixedI64(buf, &(pMeta->totalStorage)); buf = taosDecodeFixedI64(buf, &(pMeta->totalStorage));
...@@ -95,7 +100,7 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { ...@@ -95,7 +100,7 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
return tlen; return tlen;
} }
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) { static UNUSED_FUNC void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
tsdbResetFSStatus(pStatus); tsdbResetFSStatus(pStatus);
pStatus->pmf = &(pStatus->mf); pStatus->pmf = &(pStatus->mf);
...@@ -113,7 +118,7 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) { ...@@ -113,7 +118,7 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) {
return NULL; return NULL;
} }
TSDB_FSET_SET_CLOSED(&(pStatus->mf)); TSDB_FILE_SET_CLOSED(&(pStatus->mf));
pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet)); pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet));
if (pStatus->df == NULL) { if (pStatus->df == NULL) {
...@@ -139,7 +144,7 @@ static void tsdbResetFSStatus(SFSStatus *pStatus) { ...@@ -139,7 +144,7 @@ static void tsdbResetFSStatus(SFSStatus *pStatus) {
return; return;
} }
TSDB_FSET_SET_CLOSED(&(pStatus->mf)); TSDB_FILE_SET_CLOSED(&(pStatus->mf));
pStatus->pmf = NULL; pStatus->pmf = NULL;
taosArrayClear(pStatus->df); taosArrayClear(pStatus->df);
...@@ -167,7 +172,9 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) { ...@@ -167,7 +172,9 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
// ================== STsdbFS // ================== STsdbFS
// TODO // TODO
STsdbFS *tsdbNewFS(int keep, int days) { STsdbFS *tsdbNewFS(STsdbCfg *pCfg) {
int keep = pCfg->keep;
int days = pCfg->daysPerFile;
int maxFSet = TSDB_MAX_FSETS(keep, days); int maxFSet = TSDB_MAX_FSETS(keep, days);
STsdbFS *pfs; STsdbFS *pfs;
...@@ -220,14 +227,13 @@ void *tsdbFreeFS(STsdbFS *pfs) { ...@@ -220,14 +227,13 @@ void *tsdbFreeFS(STsdbFS *pfs) {
} }
// TODO // TODO
int tsdbOpenFS(STsdbFS *pFs, int keep, int days) { int tsdbOpenFS(STsdbRepo *pRepo) {
// TODO // TODO
return 0; return 0;
} }
// TODO // TODO
void tsdbCloseFS(STsdbFS *pFs) { void tsdbCloseFS(STsdbRepo *pRepo) {
// TODO // TODO
} }
...@@ -298,7 +304,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) { ...@@ -298,7 +304,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) {
ASSERT(taosArrayGetSize(pfs->nstatus->df) == 0); ASSERT(taosArrayGetSize(pfs->nstatus->df) == 0);
fsheader.len = 0; fsheader.len = 0;
} else { } else {
fsheader.len = tsdbEncodeFSHeader(NULL, pfs->nstatus) + sizeof(TSCKSUM); fsheader.len = tsdbEncodeFSHeader(NULL, &fsheader) + sizeof(TSCKSUM);
} }
// Encode header part and write // Encode header part and write
...@@ -325,7 +331,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) { ...@@ -325,7 +331,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs) {
ptr = pBuf; ptr = pBuf;
tsdbEncodeFSStatus(&ptr, pfs->nstatus); tsdbEncodeFSStatus(&ptr, pfs->nstatus);
taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len) taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len);
if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) { if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -455,7 +461,7 @@ void tsdbFSIterSeek(SFSIter *pIter, int fid) { ...@@ -455,7 +461,7 @@ void tsdbFSIterSeek(SFSIter *pIter, int fid) {
flags = TD_LE; flags = TD_LE;
} }
void *ptr = taosbsearch(&fid, pfs->cstatus->df->pData, size, sizeof(SDFileSet), , flags); void *ptr = taosbsearch(&fid, pfs->cstatus->df->pData, size, sizeof(SDFileSet), tsdbComparFidFSet, flags);
if (ptr == NULL) { if (ptr == NULL) {
pIter->index = -1; pIter->index = -1;
pIter->fid = TSDB_IVLD_FID; pIter->fid = TSDB_IVLD_FID;
...@@ -503,4 +509,17 @@ SDFileSet *tsdbFSIterNext(SFSIter *pIter) { ...@@ -503,4 +509,17 @@ SDFileSet *tsdbFSIterNext(SFSIter *pIter) {
} }
return pSet; return pSet;
}
static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
int fid = *(int *)arg1;
SDFileSet *pSet = (SDFileSet *)arg2;
if (fid < pSet->fid) {
return -1;
} else if (fid == pSet->fid) {
return 0;
} else {
return 1;
}
} }
\ No newline at end of file
...@@ -16,111 +16,100 @@ ...@@ -16,111 +16,100 @@
// no test file errors here // no test file errors here
#include "tsdbint.h" #include "tsdbint.h"
#define TSDB_CFG_FILE_NAME "config"
#define TSDB_DATA_DIR_NAME "data"
#define TSDB_META_FILE_NAME "meta"
#define TSDB_META_FILE_INDEX 10000000
#define IS_VALID_PRECISION(precision) \ #define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP #define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) #define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static void tsdbGetDataDir(int repoid, char dirName[]);
static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg); static void tsdbGetRootDir(int repoid, char dirName[]);
static int32_t tsdbUnsetRepoEnv(char *rootDir); static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg); static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg); static void tsdbFreeRepo(STsdbRepo *pRepo);
static char * tsdbGetCfgFname(char *rootDir); static void tsdbStartStream(STsdbRepo *pRepo);
static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg); static void tsdbStopStream(STsdbRepo *pRepo);
static void tsdbFreeRepo(STsdbRepo *pRepo); static int tsdbRestoreInfo(STsdbRepo *pRepo);
static int tsdbRestoreInfo(STsdbRepo *pRepo);
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
static int keyFGroupCompFunc(const void *key, const void *fgroup);
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo);
// Function declaration // Function declaration
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { int32_t tsdbCreateRepo(int repoid) {
char tsdbDir[TSDB_FILENAME_LEN] = "\0"; char tsdbDir[TSDB_FILENAME_LEN] = "\0";
char dataDir[TSDB_FILENAME_LEN] = "\0";
snprintf(tsdbDir, TSDB_FILENAME_LEN, "%s/%s", TFS_PRIMARY_PATH(), rootDir); tsdbGetRootDir(repoid, tsdbDir);
DIR *dir = opendir(tsdbDir); if (tfsMkdir(tsdbDir) < 0) {
if (dir) { goto _err;
tsdbDebug("repository %s already exists", rootDir);
closedir(dir);
return 0;
} else {
if (ENOENT != errno) {
tsdbError("failed to open directory %s since %s", rootDir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
} }
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; tsdbGetDataDir(repoid, dataDir);
if (tfsMkdir(dataDir) < 0) {
if (tsdbSetRepoEnv(rootDir, pCfg) < 0) return -1; goto _err;
}
tsdbDebug(
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep "
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d update %d cacheLastRow %d",
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock,
pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression, pCfg->update, pCfg->cacheLastRow);
return 0; return 0;
_err:
tsdbError("vgId:%d failed to create TSDB repository since %s", repoid, tstrerror(terrno));
return -1;
} }
int32_t tsdbDropRepo(char *rootDir) { return tsdbUnsetRepoEnv(rootDir); } int32_t tsdbDropRepo(int repoid) {
char tsdbDir[TSDB_FILENAME_LEN] = "\0";
tsdbGetRootDir(repoid, tsdbDir);
return tfsRmdir(tsdbDir);
}
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { TSDB_REPO_T *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdbCfg config = {0}; STsdbRepo *pRepo;
STsdbRepo *pRepo = NULL; STsdbCfg config = *pCfg;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (tsdbLoadConfig(rootDir, &config) < 0) { // Check and set default configurations
tsdbError("failed to open repo in rootDir %s since %s", rootDir, tstrerror(terrno)); if (tsdbCheckAndSetDefaultCfg(&config) < 0) {
tsdbError("vgId:%d failed to open TSDB repository since %s", config.tsdbId, tstrerror(terrno));
return NULL; return NULL;
} }
pRepo = tsdbNewRepo(rootDir, pAppH, &config); // Create new TSDB object
if (pRepo == NULL) { if ((pRepo = tsdbNewRepo(&config, pAppH)) == NULL) {
tsdbError("failed to open repo in rootDir %s since %s", rootDir, tstrerror(terrno)); tsdbError("vgId:%d failed to open TSDB repository while creating TSDB object since %s", config.tsdbId,
tstrerror(terrno));
return NULL; return NULL;
} }
// Open meta
if (tsdbOpenMeta(pRepo) < 0) { if (tsdbOpenMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to open meta since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to open TSDB repository while opening Meta since %s", config.tsdbId, tstrerror(terrno));
goto _err; tsdbCloseRepo(pRepo, false);
return NULL;
} }
if (tsdbOpenBufPool(pRepo) < 0) { if (tsdbOpenBufPool(pRepo) < 0) {
tsdbError("vgId:%d failed to open buffer pool since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to open TSDB repository while opening buffer pool since %s", config.tsdbId,
goto _err; tstrerror(terrno));
tsdbCloseRepo(pRepo, false);
return NULL;
} }
if (tsdbOpenFS(pRepo) < 0) { if (tsdbOpenFS(pRepo) < 0) {
tsdbError("vgId:%d failed to open file handle since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to open TSDB repository while opening FS since %s", config.tsdbId, tstrerror(terrno));
goto _err; tsdbCloseRepo(pRepo, false);
return NULL;
} }
// TODO: Restore information from data
if (tsdbRestoreInfo(pRepo) < 0) { if (tsdbRestoreInfo(pRepo) < 0) {
tsdbError("vgId:%d failed to restore info from file since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to open TSDB repository while restore info since %s", config.tsdbId, tstrerror(terrno));
goto _err; tsdbCloseRepo(pRepo, false);
return NULL;
} }
tsdbStartStream(pRepo); tsdbStartStream(pRepo);
tsdbDebug("vgId:%d open tsdb repository succeed!", REPO_ID(pRepo)); tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pRepo));
return (TSDB_REPO_T *)pRepo; return (TSDB_REPO_T *)pRepo;
_err:
tsdbCloseRepo(pRepo, false);
return NULL;
} }
// Note: all working thread and query thread must stopped when calling this function // Note: all working thread and query thread must stopped when calling this function
...@@ -135,16 +124,14 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { ...@@ -135,16 +124,14 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
tsdbStopStream(pRepo); tsdbStopStream(pRepo);
if (toCommit) { if (toCommit) {
tsdbAsyncCommit(pRepo); tsdbSyncCommit(repo);
sem_wait(&(pRepo->readyToCommit));
terrno = pRepo->code;
} }
tsdbUnRefMemTable(pRepo, pRepo->mem); tsdbUnRefMemTable(pRepo, pRepo->mem);
tsdbUnRefMemTable(pRepo, pRepo->imem); tsdbUnRefMemTable(pRepo, pRepo->imem);
pRepo->mem = NULL; pRepo->mem = NULL;
pRepo->imem = NULL; pRepo->imem = NULL;
tsdbCloseFileH(pRepo, !toCommit); tsdbCloseFS(pRepo);
tsdbCloseBufPool(pRepo); tsdbCloseBufPool(pRepo);
tsdbCloseMeta(pRepo); tsdbCloseMeta(pRepo);
tsdbFreeRepo(pRepo); tsdbFreeRepo(pRepo);
...@@ -157,7 +144,119 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { ...@@ -157,7 +144,119 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
} }
} }
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) {
ASSERT(repo != NULL);
return &((STsdbRepo *)repo)->config;
}
int tsdbLockRepo(STsdbRepo *pRepo) {
int code = pthread_mutex_lock(&pRepo->mutex);
if (code != 0) {
tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
pRepo->repoLocked = true;
return 0;
}
int tsdbUnlockRepo(STsdbRepo *pRepo) {
ASSERT(IS_REPO_LOCKED(pRepo));
pRepo->repoLocked = false;
int code = pthread_mutex_unlock(&pRepo->mutex);
if (code != 0) {
tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
int tsdbCheckCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->mem != NULL);
STsdbCfg *pCfg = &(pRepo->config);
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL);
if ((pRepo->mem->extraBuffList != NULL) ||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1;
}
return 0;
}
STsdbMeta *tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
int tsdbGetState(TSDB_REPO_T *repo) { return ((STsdbRepo *)repo)->state; }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL);
STsdbRepo *pRepo = repo;
*totalPoints = pRepo->stat.pointsWritten;
*totalStorage = pRepo->stat.totalStorage;
*compStorage = pRepo->stat.compStorage;
}
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
// TODO: think about multithread cases
return 0;
#if 0
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbCfg config = pRepo->config;
STsdbCfg * pRCfg = &pRepo->config;
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
ASSERT(pRCfg->precision == pCfg->precision);
bool configChanged = false;
if (pRCfg->compression != pCfg->compression) {
tsdbAlterCompression(pRepo, pCfg->compression);
config.compression = pCfg->compression;
configChanged = true;
}
if (pRCfg->keep != pCfg->keep) {
if (tsdbAlterKeep(pRepo, pCfg->keep) < 0) {
tsdbError("vgId:%d failed to configure repo when alter keep since %s", REPO_ID(pRepo), tstrerror(terrno));
config.keep = pCfg->keep;
return -1;
}
configChanged = true;
}
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
config.totalBlocks = pCfg->totalBlocks;
configChanged = true;
}
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
config.cacheLastRow = pCfg->cacheLastRow;
configChanged = true;
}
if (configChanged) {
if (tsdbSaveConfig(pRepo->rootDir, &config) < 0) {
tsdbError("vgId:%d failed to configure repository while save config since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
return 0;
#endif
}
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
// TODO
return 0;
#if 0
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbMeta *pMeta = pRepo->tsdbMeta; // STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
...@@ -233,166 +332,33 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ ...@@ -233,166 +332,33 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
tfree(fname); tfree(fname);
return magic; return magic;
#endif
} }
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { static void tsdbGetRootDir(int repoid, char dirName[]) {
ASSERT(repo != NULL); snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid);
return &((STsdbRepo *)repo)->config;
}
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
// TODO: think about multithread cases
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbCfg config = pRepo->config;
STsdbCfg * pRCfg = &pRepo->config;
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
ASSERT(pRCfg->precision == pCfg->precision);
bool configChanged = false;
if (pRCfg->compression != pCfg->compression) {
tsdbAlterCompression(pRepo, pCfg->compression);
config.compression = pCfg->compression;
configChanged = true;
}
if (pRCfg->keep != pCfg->keep) {
if (tsdbAlterKeep(pRepo, pCfg->keep) < 0) {
tsdbError("vgId:%d failed to configure repo when alter keep since %s", REPO_ID(pRepo), tstrerror(terrno));
config.keep = pCfg->keep;
return -1;
}
configChanged = true;
}
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
config.totalBlocks = pCfg->totalBlocks;
configChanged = true;
}
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
config.cacheLastRow = pCfg->cacheLastRow;
configChanged = true;
}
if (configChanged) {
if (tsdbSaveConfig(pRepo->rootDir, &config) < 0) {
tsdbError("vgId:%d failed to configure repository while save config since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
return 0;
}
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL);
STsdbRepo *pRepo = repo;
*totalPoints = pRepo->stat.pointsWritten;
*totalStorage = pRepo->stat.totalStorage;
*compStorage = pRepo->stat.compStorage;
}
int tsdbGetState(TSDB_REPO_T *repo) {
return ((STsdbRepo *)repo)->state;
}
// ----------------- INTERNAL FUNCTIONS -----------------
char *tsdbGetMetaFileName(char *rootDir) {
int tlen = (int)(strlen(TFS_PRIMARY_PATH()) + strlen(rootDir) + strlen(TSDB_META_FILE_NAME) + 3);
char *fname = calloc(1, tlen);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
snprintf(fname, tlen, "%s/%s/%s", TFS_PRIMARY_PATH(), rootDir, TSDB_META_FILE_NAME);
return fname;
}
void tsdbGetDataFileName(char *rootDir, int vid, int fid, int type, char *fname) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type]);
} }
int tsdbLockRepo(STsdbRepo *pRepo) { static void tsdbGetDataDir(int repoid, char dirName[]) {
int code = pthread_mutex_lock(&pRepo->mutex); snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid);
if (code != 0) {
tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
pRepo->repoLocked = true;
return 0;
} }
int tsdbUnlockRepo(STsdbRepo *pRepo) { static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
ASSERT(IS_REPO_LOCKED(pRepo)); // Check tsdbId
pRepo->repoLocked = false; if (pCfg->tsdbId < 0) {
int code = pthread_mutex_unlock(&pRepo->mutex); tsdbError("vgId:%d invalid vgroup ID", pCfg->tsdbId);
if (code != 0) { terrno = TSDB_CODE_TDB_INVALID_CONFIG;
tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1; return -1;
} }
return 0;
}
char *tsdbGetDataDirName(char *rootDir) {
int tlen = (int)(strlen(rootDir) + strlen(TSDB_DATA_DIR_NAME) + 2);
char *fname = calloc(1, tlen);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
snprintf(fname, tlen, "%s/%s", rootDir, TSDB_DATA_DIR_NAME);
return fname;
}
int tsdbGetNextMaxTables(int tid) {
ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
int maxTables = TSDB_INIT_NTABLES;
while (true) {
maxTables = MIN(maxTables, TSDB_MAX_TABLES);
if (tid <= maxTables) break;
maxTables *= 2;
}
return maxTables + 1;
}
int tsdbCheckCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->mem != NULL);
STsdbCfg *pCfg = &(pRepo->config);
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL);
if ((pRepo->mem->extraBuffList != NULL) ||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1;
}
return 0;
}
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
// STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
// ----------------- LOCAL FUNCTIONS -----------------
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// Check precision // Check precision
if (pCfg->precision == -1) { if (pCfg->precision == -1) {
pCfg->precision = TSDB_DEFAULT_PRECISION; pCfg->precision = TSDB_DEFAULT_PRECISION;
} else { } else {
if (!IS_VALID_PRECISION(pCfg->precision)) { if (!IS_VALID_PRECISION(pCfg->precision)) {
tsdbError("vgId:%d invalid precision configuration %d", pCfg->tsdbId, pCfg->precision); tsdbError("vgId:%d invalid precision configuration %d", pCfg->tsdbId, pCfg->precision);
goto _err; terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
} }
} }
...@@ -402,16 +368,11 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -402,16 +368,11 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
} else { } else {
if (!IS_VALID_COMPRESSION(pCfg->compression)) { if (!IS_VALID_COMPRESSION(pCfg->compression)) {
tsdbError("vgId:%d invalid compression configuration %d", pCfg->tsdbId, pCfg->precision); tsdbError("vgId:%d invalid compression configuration %d", pCfg->tsdbId, pCfg->precision);
goto _err; terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
} }
} }
// Check tsdbId
if (pCfg->tsdbId < 0) {
tsdbError("vgId:%d invalid vgroup ID", pCfg->tsdbId);
goto _err;
}
// Check daysPerFile // Check daysPerFile
if (pCfg->daysPerFile == -1) { if (pCfg->daysPerFile == -1) {
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
...@@ -421,7 +382,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -421,7 +382,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
"vgId:%d invalid daysPerFile configuration! daysPerFile %d TSDB_MIN_DAYS_PER_FILE %d TSDB_MAX_DAYS_PER_FILE " "vgId:%d invalid daysPerFile configuration! daysPerFile %d TSDB_MIN_DAYS_PER_FILE %d TSDB_MAX_DAYS_PER_FILE "
"%d", "%d",
pCfg->tsdbId, pCfg->daysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE); pCfg->tsdbId, pCfg->daysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
goto _err; terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
} }
} }
...@@ -434,7 +396,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -434,7 +396,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
"vgId:%d invalid minRowsPerFileBlock configuration! minRowsPerFileBlock %d TSDB_MIN_MIN_ROW_FBLOCK %d " "vgId:%d invalid minRowsPerFileBlock configuration! minRowsPerFileBlock %d TSDB_MIN_MIN_ROW_FBLOCK %d "
"TSDB_MAX_MIN_ROW_FBLOCK %d", "TSDB_MAX_MIN_ROW_FBLOCK %d",
pCfg->tsdbId, pCfg->minRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK); pCfg->tsdbId, pCfg->minRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
goto _err; terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
} }
} }
...@@ -446,14 +409,16 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -446,14 +409,16 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
"vgId:%d invalid maxRowsPerFileBlock configuration! maxRowsPerFileBlock %d TSDB_MIN_MAX_ROW_FBLOCK %d " "vgId:%d invalid maxRowsPerFileBlock configuration! maxRowsPerFileBlock %d TSDB_MIN_MAX_ROW_FBLOCK %d "
"TSDB_MAX_MAX_ROW_FBLOCK %d", "TSDB_MAX_MAX_ROW_FBLOCK %d",
pCfg->tsdbId, pCfg->maxRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK); pCfg->tsdbId, pCfg->maxRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
goto _err; terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
} }
} }
if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) { if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) {
tsdbError("vgId:%d invalid configuration! minRowsPerFileBlock %d maxRowsPerFileBlock %d", pCfg->tsdbId, tsdbError("vgId:%d invalid configuration! minRowsPerFileBlock %d maxRowsPerFileBlock %d", pCfg->tsdbId,
pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock); pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock);
goto _err; terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
} }
// Check keep // Check keep
...@@ -465,238 +430,119 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -465,238 +430,119 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
"vgId:%d invalid keep configuration! keep %d TSDB_MIN_KEEP %d " "vgId:%d invalid keep configuration! keep %d TSDB_MIN_KEEP %d "
"TSDB_MAX_KEEP %d", "TSDB_MAX_KEEP %d",
pCfg->tsdbId, pCfg->keep, TSDB_MIN_KEEP, TSDB_MAX_KEEP); pCfg->tsdbId, pCfg->keep, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
goto _err; terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
} }
} }
// update check if (pCfg->keep1 == 0) {
if (pCfg->update != 0) pCfg->update = 1; pCfg->keep1 = pCfg->keep;
// update cacheLastRow
if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1;
return 0;
_err:
terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
}
static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg) {
if (tfsMkdir(rootDir) < 0) {
tsdbError("vgId:%d failed to create rootDir %s since %s", pCfg->tsdbId, rootDir, tstrerror(terrno));
return -1;
}
if (tsdbSaveConfig(rootDir, pCfg) < 0) {
tsdbError("vgId:%d failed to set TSDB environment since %s", pCfg->tsdbId, tstrerror(terrno));
return -1;
} }
char *dirName = tsdbGetDataDirName(rootDir); if (pCfg->keep2 == 0) {
if (dirName == NULL) return -1; pCfg->keep2 = pCfg->keep;
if (tfsMkdir(dirName) < 0) {
tsdbError("vgId:%d failed to create directory %s since %s", pCfg->tsdbId, dirName, strerror(errno));
free(dirName);
return -1;
}
free(dirName);
char *fname = tsdbGetMetaFileName(rootDir);
if (fname == NULL) return -1;
if (tdCreateKVStore(fname) < 0) {
tsdbError("vgId:%d failed to open KV store since %s", pCfg->tsdbId, tstrerror(terrno));
free(fname);
return -1;
} }
free(fname); // update check
return 0; if (pCfg->update != 0) pCfg->update = 1;
}
static int32_t tsdbUnsetRepoEnv(char *rootDir) {
tfsRmdir(rootDir);
tsdbDebug("repository %s is removed", rootDir);
return 0;
}
static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
int fd = -1;
char *fname = NULL;
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
char *pBuf = buf;
fname = tsdbGetCfgFname(rootDir);
if (fname == NULL) {
tsdbError("vgId:%d failed to save configuration since %s", pCfg->tsdbId, tstrerror(terrno));
goto _err;
}
fd = open(fname, O_WRONLY | O_CREAT, 0755);
if (fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", pCfg->tsdbId, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
int tlen = tsdbEncodeCfg((void *)(&pBuf), pCfg);
ASSERT((tlen + sizeof(TSCKSUM) <= TSDB_FILE_HEAD_SIZE) && (POINTER_DISTANCE(pBuf, buf) == tlen));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
if (taosWrite(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", pCfg->tsdbId, TSDB_FILE_HEAD_SIZE, fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (fsync(fd) < 0) {
tsdbError("vgId:%d failed to fsync file %s since %s", pCfg->tsdbId, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
free(fname);
close(fd);
return 0;
_err:
tfree(fname);
if (fd >= 0) close(fd);
return -1;
}
static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
char *fname = NULL;
int fd = -1;
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
fname = tsdbGetCfgFname(rootDir);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
fd = open(fname, O_RDONLY);
if (fd < 0) {
tsdbError("failed to open file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosRead(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read %d bytes from file %s since %s", TSDB_FILE_HEAD_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
tsdbError("file %s is corrupted", fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
tsdbDecodeCfg(buf, pCfg);
tfree(fname); // update cacheLastRow
close(fd); if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1;
return 0; return 0;
_err:
tfree(fname);
if (fd >= 0) close(fd);
return -1;
} }
static char *tsdbGetCfgFname(char *rootDir) { static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
int tlen = (int)(strlen(TFS_PRIMARY_PATH()) + strlen(rootDir) + strlen(TSDB_CFG_FILE_NAME) + 3); STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(*pRepo));
char *fname = calloc(1, tlen);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
snprintf(fname, tlen, "%s/%s/%s", TFS_PRIMARY_PATH(), rootDir, TSDB_CFG_FILE_NAME);
return fname;
}
static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(STsdbRepo));
if (pRepo == NULL) { if (pRepo == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; return NULL;
} }
pRepo->state = TSDB_STATE_OK; pRepo->state = TSDB_STATE_OK;
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
pRepo->config = *pCfg;
if (pAppH) {
pRepo->appH = *pAppH;
}
pRepo->repoLocked = false;
int code = pthread_mutex_init(&pRepo->mutex, NULL); int code = pthread_mutex_init(&(pRepo->mutex), NULL);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
goto _err; tsdbFreeRepo(pRepo);
return NULL;
} }
code = sem_init(&(pRepo->readyToCommit), 0, 1); code = sem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
goto _err; tsdbFreeRepo(pRepo);
} return NULL;
pRepo->repoLocked = false;
pRepo->rootDir = strdup(rootDir);
if (pRepo->rootDir == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
} }
pRepo->config = *pCfg;
if (pAppH) pRepo->appH = *pAppH;
pRepo->tsdbMeta = tsdbNewMeta(pCfg); pRepo->tsdbMeta = tsdbNewMeta(pCfg);
if (pRepo->tsdbMeta == NULL) { if (pRepo->tsdbMeta == NULL) {
tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; tsdbFreeRepo(pRepo);
return NULL;
} }
pRepo->pPool = tsdbNewBufPool(pCfg); pRepo->pPool = tsdbNewBufPool(pCfg);
if (pRepo->pPool == NULL) { if (pRepo->pPool == NULL) {
tsdbError("vgId:%d failed to create buffer pool since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to create buffer pool since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; tsdbFreeRepo(pRepo);
return NULL;
} }
pRepo->tsdbFileH = tsdbNewFileH(pCfg); pRepo->fs = tsdbNewFS(pCfg);
if (pRepo->tsdbFileH == NULL) { if (pRepo->fs == NULL) {
tsdbError("vgId:%d failed to create file handle since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to TSDB file system since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; tsdbFreeRepo(pRepo);
return NULL;
} }
return pRepo; return pRepo;
_err:
tsdbFreeRepo(pRepo);
return NULL;
} }
static void tsdbFreeRepo(STsdbRepo *pRepo) { static void tsdbFreeRepo(STsdbRepo *pRepo) {
if (pRepo) { if (pRepo) {
tsdbFreeFileH(pRepo->tsdbFileH); tsdbFreeFS(pRepo->fs);
tsdbFreeBufPool(pRepo->pPool); tsdbFreeBufPool(pRepo->pPool);
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
// tsdbFreeMemTable(pRepo->mem);
// tsdbFreeMemTable(pRepo->imem);
tfree(pRepo->rootDir);
sem_destroy(&(pRepo->readyToCommit)); sem_destroy(&(pRepo->readyToCommit));
pthread_mutex_destroy(&pRepo->mutex); pthread_mutex_destroy(&pRepo->mutex);
free(pRepo); free(pRepo);
} }
} }
static int tsdbRestoreInfo(STsdbRepo *pRepo) { // TODO static void tsdbStartStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
}
}
}
static void tsdbStopStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
(*pRepo->appH.cqDropFunc)(pTable->cqhandle);
}
}
}
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
// TODO: add restore meta
return 0;
#if 0
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pFGroup = NULL; SFileGroup *pFGroup = NULL;
...@@ -754,171 +600,5 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { // TODO ...@@ -754,171 +600,5 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { // TODO
_err: _err:
tsdbDestroyHelper(&rhelper); tsdbDestroyHelper(&rhelper);
return -1; return -1;
}
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
int8_t ocompression = pRepo->config.compression;
pRepo->config.compression = compression;
tsdbDebug("vgId:%d tsdb compression is changed from %d to %d", REPO_ID(pRepo), ocompression, compression);
}
static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
STsdbCfg * pCfg = &pRepo->config;
STsdbFileH *pFileH = pRepo->tsdbFileH;
int okeep = pCfg->keep;
SFileGroup *pFGroup = NULL;
ASSERT(pCfg->keep != keep);
int maxFiles = TSDB_MAX_FILE(keep, pCfg->daysPerFile);
if (maxFiles != pFileH->maxFGroups) {
pthread_rwlock_wrlock(&(pFileH->fhlock));
pCfg->keep = keep;
pFGroup = (SFileGroup *)calloc(maxFiles, sizeof(SFileGroup));
if (pFGroup == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
pthread_rwlock_unlock(&(pFileH->fhlock));
return -1;
}
int mfid = (int)(TSDB_KEY_FILEID(taosGetTimestamp(pCfg->precision), pCfg->daysPerFile, pCfg->precision) -
TSDB_MAX_FILE(keep, pCfg->daysPerFile));
int i = 0;
for (; i < pFileH->nFGroups; i++) {
if (pFileH->pFGroup[i].fileId >= mfid) break;
tsdbRemoveFileGroup(pRepo, &(pFileH->pFGroup[i]));
}
for (int j = 0; i < pFileH->nFGroups; i++, j++) {
pFGroup[j] = pFileH->pFGroup[i];
}
free(pFileH->pFGroup);
pFileH->pFGroup = pFGroup;
pthread_rwlock_unlock(&(pFileH->fhlock));
}
tsdbDebug("vgId:%d keep is changed from %d to %d", REPO_ID(pRepo), okeep, keep);
return 0;
}
static int keyFGroupCompFunc(const void *key, const void *fgroup) {
int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup;
if (fid == pFGroup->fileId) {
return 0;
} else {
return fid > pFGroup->fileId ? 1 : -1;
}
}
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
int tlen = 0;
tlen += taosEncodeVariantI32(buf, pCfg->tsdbId);
tlen += taosEncodeFixedI32(buf, pCfg->cacheBlockSize);
tlen += taosEncodeVariantI32(buf, pCfg->totalBlocks);
tlen += taosEncodeVariantI32(buf, pCfg->daysPerFile);
tlen += taosEncodeVariantI32(buf, pCfg->keep);
tlen += taosEncodeVariantI32(buf, pCfg->keep1);
tlen += taosEncodeVariantI32(buf, pCfg->keep2);
tlen += taosEncodeVariantI32(buf, pCfg->minRowsPerFileBlock);
tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock);
tlen += taosEncodeFixedI8(buf, pCfg->precision);
tlen += taosEncodeFixedI8(buf, pCfg->compression);
tlen += taosEncodeFixedI8(buf, pCfg->update);
tlen += taosEncodeFixedI8(buf, pCfg->cacheLastRow);
return tlen;
}
static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf = taosDecodeVariantI32(buf, &(pCfg->tsdbId));
buf = taosDecodeFixedI32(buf, &(pCfg->cacheBlockSize));
buf = taosDecodeVariantI32(buf, &(pCfg->totalBlocks));
buf = taosDecodeVariantI32(buf, &(pCfg->daysPerFile));
buf = taosDecodeVariantI32(buf, &(pCfg->keep));
buf = taosDecodeVariantI32(buf, &(pCfg->keep1));
buf = taosDecodeVariantI32(buf, &(pCfg->keep2));
buf = taosDecodeVariantI32(buf, &(pCfg->minRowsPerFileBlock));
buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock));
buf = taosDecodeFixedI8(buf, &(pCfg->precision));
buf = taosDecodeFixedI8(buf, &(pCfg->compression));
buf = taosDecodeFixedI8(buf, &(pCfg->update));
buf = taosDecodeFixedI8(buf, &(pCfg->cacheLastRow));
return buf;
}
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
// TODO
// STsdbCache *pCache = pRepo->tsdbCache;
// int oldNumOfBlocks = pCache->totalCacheBlocks;
// tsdbLockRepo((TsdbRepoT *)pRepo);
// ASSERT(pCache->totalCacheBlocks != totalBlocks);
// if (pCache->totalCacheBlocks < totalBlocks) {
// ASSERT(pCache->totalCacheBlocks == pCache->pool.numOfCacheBlocks);
// int blocksToAdd = pCache->totalCacheBlocks - totalBlocks;
// pCache->totalCacheBlocks = totalBlocks;
// for (int i = 0; i < blocksToAdd; i++) {
// if (tsdbAddCacheBlockToPool(pCache) < 0) {
// tsdbUnLockRepo((TsdbRepoT *)pRepo);
// tsdbError("tsdbId:%d, failed to add cache block to cache pool", pRepo->config.tsdbId);
// return -1;
// }
// }
// } else {
// pCache->totalCacheBlocks = totalBlocks;
// tsdbAdjustCacheBlocks(pCache);
// }
// pRepo->config.totalBlocks = totalBlocks;
// tsdbUnLockRepo((TsdbRepoT *)pRepo);
// tsdbDebug("vgId:%d, tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks,
// totalBlocks);
return 0;
}
#if 0
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, uid);
if (pTable == NULL) return -1;
return TSDB_GET_TABLE_LAST_KEY(pTable);
}
#endif #endif
}
static void tsdbStartStream(STsdbRepo *pRepo) { \ No newline at end of file
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
}
}
}
static void tsdbStopStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
(*pRepo->appH.cqDropFunc)(pTable->cqhandle);
}
}
}
...@@ -463,6 +463,8 @@ void tsdbFreeMeta(STsdbMeta *pMeta) { ...@@ -463,6 +463,8 @@ void tsdbFreeMeta(STsdbMeta *pMeta) {
} }
int tsdbOpenMeta(STsdbRepo *pRepo) { int tsdbOpenMeta(STsdbRepo *pRepo) {
return 0;
#if 0
char * fname = NULL; char * fname = NULL;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
ASSERT(pMeta != NULL); ASSERT(pMeta != NULL);
...@@ -486,6 +488,7 @@ int tsdbOpenMeta(STsdbRepo *pRepo) { ...@@ -486,6 +488,7 @@ int tsdbOpenMeta(STsdbRepo *pRepo) {
_err: _err:
tfree(fname); tfree(fname);
return -1; return -1;
#endif
} }
int tsdbCloseMeta(STsdbRepo *pRepo) { int tsdbCloseMeta(STsdbRepo *pRepo) {
......
...@@ -62,24 +62,24 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { ...@@ -62,24 +62,24 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
return code; return code;
} }
STsdbCfg tsdbCfg = {0}; // STsdbCfg tsdbCfg = {0};
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; // tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize; // tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize;
tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks; // tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; // tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep; // tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep;
tsdbCfg.keep1 = pVnodeCfg->cfg.daysToKeep1; // tsdbCfg.keep1 = pVnodeCfg->cfg.daysToKeep1;
tsdbCfg.keep2 = pVnodeCfg->cfg.daysToKeep2; // tsdbCfg.keep2 = pVnodeCfg->cfg.daysToKeep2;
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock; // tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; // tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
tsdbCfg.precision = pVnodeCfg->cfg.precision; // tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression; // tsdbCfg.compression = pVnodeCfg->cfg.compression;
tsdbCfg.update = pVnodeCfg->cfg.update; // tsdbCfg.update = pVnodeCfg->cfg.update;
tsdbCfg.cacheLastRow = pVnodeCfg->cfg.cacheLastRow; // tsdbCfg.cacheLastRow = pVnodeCfg->cfg.cacheLastRow;
char tsdbDir[TSDB_FILENAME_LEN] = {0}; // char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "vnode/vnode%d/tsdb", pVnodeCfg->cfg.vgId); // sprintf(tsdbDir, "vnode/vnode%d/tsdb", pVnodeCfg->cfg.vgId);
if (tsdbCreateRepo(tsdbDir, &tsdbCfg) < 0) { if (tsdbCreateRepo(pVnodeCfg->cfg.vgId) < 0) {
vError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno)); vError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
return TSDB_CODE_VND_INIT_FAILED; return TSDB_CODE_VND_INIT_FAILED;
} }
...@@ -234,10 +234,9 @@ int32_t vnodeOpen(int32_t vgId) { ...@@ -234,10 +234,9 @@ int32_t vnodeOpen(int32_t vgId) {
appH.cqH = pVnode->cq; appH.cqH = pVnode->cq;
appH.cqCreateFunc = cqCreate; appH.cqCreateFunc = cqCreate;
appH.cqDropFunc = cqDrop; appH.cqDropFunc = cqDrop;
sprintf(temp, "vnode/vnode%d/tsdb", vgId);
terrno = 0; terrno = 0;
pVnode->tsdb = tsdbOpenRepo(temp, &appH); pVnode->tsdb = tsdbOpenRepo(&(pVnode->tsdbCfg), &appH);
if (pVnode->tsdb == NULL) { if (pVnode->tsdb == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
...@@ -456,9 +455,6 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { ...@@ -456,9 +455,6 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
} }
int32_t vnodeReset(SVnodeObj *pVnode) { int32_t vnodeReset(SVnodeObj *pVnode) {
char rootDir[128] = "\0";
sprintf(rootDir, "vnode/vnode%d/tsdb", pVnode->vgId);
if (!vnodeSetResetStatus(pVnode)) { if (!vnodeSetResetStatus(pVnode)) {
return -1; return -1;
} }
...@@ -481,7 +477,7 @@ int32_t vnodeReset(SVnodeObj *pVnode) { ...@@ -481,7 +477,7 @@ int32_t vnodeReset(SVnodeObj *pVnode) {
appH.cqH = pVnode->cq; appH.cqH = pVnode->cq;
appH.cqCreateFunc = cqCreate; appH.cqCreateFunc = cqCreate;
appH.cqDropFunc = cqDrop; appH.cqDropFunc = cqDrop;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); pVnode->tsdb = tsdbOpenRepo(&(pVnode->tsdbCfg), &appH);
vnodeSetReadyStatus(pVnode); vnodeSetReadyStatus(pVnode);
vnodeRelease(pVnode); vnodeRelease(pVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册