提交 215613f4 编写于 作者: C Cary Xu

feat: set tsdb cfg for rollup sma

上级 ba7bd35a
...@@ -41,6 +41,7 @@ extern "C" { ...@@ -41,6 +41,7 @@ extern "C" {
// vnode // vnode
typedef struct SVnode SVnode; typedef struct SVnode SVnode;
typedef struct STsdbCfg STsdbCfg; // todo: remove typedef struct STsdbCfg STsdbCfg; // todo: remove
typedef struct STsdbKeepCfg STsdbKeepCfg;
typedef struct SVnodeCfg SVnodeCfg; typedef struct SVnodeCfg SVnodeCfg;
extern const SVnodeCfg vnodeCfgDefault; extern const SVnodeCfg vnodeCfgDefault;
...@@ -137,15 +138,21 @@ struct STsdbCfg { ...@@ -137,15 +138,21 @@ struct STsdbCfg {
int8_t update; int8_t update;
int8_t compression; int8_t compression;
int8_t slLevel; int8_t slLevel;
int32_t days;
int32_t minRows; int32_t minRows;
int32_t maxRows; int32_t maxRows;
int32_t days; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
int32_t keep0; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
SRetention retentions[TSDB_RETENTION_MAX];
};
struct STsdbKeepCfg {
int8_t precision; // precision always use with below cfg
int32_t days;
int32_t keep0; int32_t keep0;
int32_t keep1; int32_t keep1;
int32_t keep2; int32_t keep2;
// TODO: save to tsdb cfg file
int8_t type; // ETsdbType
SRetention retentions[TSDB_RETENTION_MAX];
}; };
typedef enum { typedef enum {
......
...@@ -70,9 +70,10 @@ struct SSmaEnvs { ...@@ -70,9 +70,10 @@ struct SSmaEnvs {
struct STsdb { struct STsdb {
char *path; char *path;
SVnode *pVnode; SVnode *pVnode;
TdThreadMutex mutex;
bool repoLocked; bool repoLocked;
int8_t level; // retention level int8_t level; // retention level
TdThreadMutex mutex; STsdbKeepCfg keepCfg;
STsdbMemTable *mem; STsdbMemTable *mem;
STsdbMemTable *imem; STsdbMemTable *imem;
SRtn rtn; SRtn rtn;
...@@ -185,6 +186,7 @@ struct STsdbFS { ...@@ -185,6 +186,7 @@ struct STsdbFS {
#define REPO_ID(r) TD_VID((r)->pVnode) #define REPO_ID(r) TD_VID((r)->pVnode)
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg) #define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
#define REPO_KEEP_CFG(r) (&(r)->keepCfg)
#define REPO_LEVEL(r) ((r)->level) #define REPO_LEVEL(r) ((r)->level)
#define REPO_FS(r) ((r)->fs) #define REPO_FS(r) ((r)->fs)
#define REPO_META(r) ((r)->pVnode->pMeta) #define REPO_META(r) ((r)->pVnode->pMeta)
...@@ -830,7 +832,7 @@ typedef struct { ...@@ -830,7 +832,7 @@ typedef struct {
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg); STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg);
void *tsdbFreeFS(STsdbFS *pfs); void *tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdb *pRepo); int tsdbOpenFS(STsdb *pRepo);
void tsdbCloseFS(STsdb *pRepo); void tsdbCloseFS(STsdb *pRepo);
......
...@@ -210,7 +210,7 @@ int tsdbCommit(STsdb *pRepo) { ...@@ -210,7 +210,7 @@ int tsdbCommit(STsdb *pRepo) {
} }
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = REPO_CFG(pRepo); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now; TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision); now = taosGetTimestamp(pCfg->precision);
...@@ -305,7 +305,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { ...@@ -305,7 +305,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
static int tsdbNextCommitFid(SCommitH *pCommith) { static int tsdbNextCommitFid(SCommitH *pCommith) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
int fid = TSDB_IVLD_FID; int fid = TSDB_IVLD_FID;
for (int i = 0; i < pCommith->niters; i++) { for (int i = 0; i < pCommith->niters; i++) {
...@@ -338,7 +338,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { ...@@ -338,7 +338,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
ASSERT(pSet == NULL || pSet->fid == fid); ASSERT(pSet == NULL || pSet->fid == fid);
......
...@@ -191,7 +191,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) { ...@@ -191,7 +191,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
} }
// ================== STsdbFS // ================== STsdbFS
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) { STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) {
int keep = pCfg->keep2; int keep = pCfg->keep2;
int days = pCfg->days; int days = pCfg->days;
int maxFSet = TSDB_MAX_FSETS(keep, days); int maxFSet = TSDB_MAX_FSETS(keep, days);
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
#include "tsdb.h" #include "tsdb.h"
static int tsdbSetKeepCfg(STsdbCfg *pCfg, STsdbKeepCfg *pKeepCfg, int8_t type);
static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level); static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
int tsdbOpen(SVnode *pVnode, int8_t type) { int tsdbOpen(SVnode *pVnode, int8_t type) {
...@@ -37,6 +39,43 @@ int tsdbOpen(SVnode *pVnode, int8_t type) { ...@@ -37,6 +39,43 @@ int tsdbOpen(SVnode *pVnode, int8_t type) {
return 0; return 0;
} }
static int tsdbSetKeepCfg(STsdbCfg *pCfg, STsdbKeepCfg *pKeepCfg, int8_t type) {
pKeepCfg->precision = pCfg->precision;
switch (type) {
case TSDB_TYPE_TSDB:
pKeepCfg->days = pCfg->days;
pKeepCfg->keep0 = pCfg->keep0;
pKeepCfg->keep1 = pCfg->keep1;
pKeepCfg->keep2 = pCfg->keep2;
break;
case TSDB_TYPE_TSMA:
ASSERT(0);
break;
case TSDB_TYPE_RSMA_L0:
pKeepCfg->days = pCfg->days;
pKeepCfg->keep0 = pCfg->keep0;
pKeepCfg->keep1 = pCfg->keep1;
pKeepCfg->keep2 = pCfg->keep2;
break;
case TSDB_TYPE_RSMA_L1:
pKeepCfg->days = pCfg->days;
pKeepCfg->keep0 = pCfg->keep0;
pKeepCfg->keep1 = pCfg->keep1;
pKeepCfg->keep2 = pCfg->keep2;
break;
case TSDB_TYPE_RSMA_L2:
pKeepCfg->days = pCfg->days;
pKeepCfg->keep0 = pCfg->keep0;
pKeepCfg->keep1 = pCfg->keep1;
pKeepCfg->keep2 = pCfg->keep2;
break;
default:
ASSERT(0);
break;
}
return 0;
}
/** /**
* @brief * @brief
* *
...@@ -68,7 +107,8 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, i ...@@ -68,7 +107,8 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, i
pTsdb->level = level; pTsdb->level = level;
pTsdb->repoLocked = false; pTsdb->repoLocked = false;
taosThreadMutexInit(&pTsdb->mutex, NULL); taosThreadMutexInit(&pTsdb->mutex, NULL);
pTsdb->fs = tsdbNewFS(REPO_CFG(pTsdb)); tsdbSetKeepCfg(REPO_CFG(pTsdb), REPO_KEEP_CFG(pTsdb), type);
pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
// create dir (TODO: use tfsMkdir) // create dir (TODO: use tfsMkdir)
taosMkDir(pTsdb->path); taosMkDir(pTsdb->path);
......
...@@ -320,7 +320,7 @@ static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) { ...@@ -320,7 +320,7 @@ static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return // Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already. // the expired data to client, even it is queried already.
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) { static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
STsdbCfg* pCfg = REPO_CFG(pTsdb); STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdb);
int64_t now = taosGetTimestamp(pCfg->precision); int64_t now = taosGetTimestamp(pCfg->precision);
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
...@@ -2425,7 +2425,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi ...@@ -2425,7 +2425,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
STimeWindow win = TSWINDOW_INITIALIZER; STimeWindow win = TSWINDOW_INITIALIZER;
while (true) { while (true) {
...@@ -2531,7 +2531,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* ...@@ -2531,7 +2531,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
// find the start data block in file // find the start data block in file
pTsdbReadHandle->locateStart = true; pTsdbReadHandle->locateStart = true;
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
tsdbRLockFS(pFileHandle); tsdbRLockFS(pFileHandle);
...@@ -2632,7 +2632,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis ...@@ -2632,7 +2632,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
// find the start data block in file // find the start data block in file
if (!pTsdbReadHandle->locateStart) { if (!pTsdbReadHandle->locateStart) {
pTsdbReadHandle->locateStart = true; pTsdbReadHandle->locateStart = true;
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
tsdbRLockFS(pFileHandle); tsdbRLockFS(pFileHandle);
......
...@@ -1013,7 +1013,7 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t ...@@ -1013,7 +1013,7 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) { static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
STsdbCfg *pCfg = REPO_CFG(pTsdb); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb);
int32_t daysPerFile = pCfg->days; int32_t daysPerFile = pCfg->days;
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
......
...@@ -60,7 +60,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ...@@ -60,7 +60,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
SSubmitBlk *pBlock = NULL; SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STSRow *row = NULL; STSRow *row = NULL;
STsdbCfg *pCfg = REPO_CFG(pTsdb); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb);
TSKEY now = taosGetTimestamp(pCfg->precision); TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerDay[pCfg->precision] * pCfg->keep2; TSKEY minKey = now - tsTickPerDay[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days; TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册