提交 341e3a02 编写于 作者: C Cary Xu

feat: rollup vnode refactor

上级 4674fe7d
......@@ -71,13 +71,19 @@ typedef enum {
TSDB_SMA_STAT_DROPPED = 2, // sma dropped
} ETsdbSmaStat; // bit operation
typedef enum {
TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA
TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA
TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA
} ETsdbSmaType;
typedef enum {
TSDB_RSMA_RETENTION_0 = 0,
TSDB_RSMA_RETENTION_1 = 1,
TSDB_RSMA_RETENTION_2 = 2,
TSDB_RSMA_RETENTION_MAX = 3
} ERSmaRetention;
extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11
......
......@@ -112,7 +112,9 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->tsdbCfg.keep2 = 3650;
pCfg->tsdbCfg.keep0 = 3650;
pCfg->tsdbCfg.keep1 = 3650;
pCfg->tsdbCfg.retentions = pCreate->pRetensions;
for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
memcpy(&pCfg->tsdbCfg.retentions[i], taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
}
pCfg->walCfg.vgId = pCreate->vgId;
pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd;
......
......@@ -37,7 +37,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_VNODE_SMA_DEBUG // TODO: evaluate to remove the macro and the relative codes
// vnode
typedef struct SVnode SVnode;
typedef struct STsdbCfg STsdbCfg; // todo: remove
......@@ -146,9 +146,19 @@ struct STsdbCfg {
int32_t keep0;
int32_t keep1;
int32_t keep2;
SArray *retentions;
// TODO: save to tsdb cfg file
int8_t type; // ETsdbType
SRetention retentions[TSDB_RSMA_RETENTION_MAX];
};
typedef enum {
TSDB_TYPE_TSDB = 0, // TSDB
TSDB_TYPE_TSMA = 1, // TSMA
TSDB_TYPE_RSMA_L0 = 2, // RSMA Level 0
TSDB_TYPE_RSMA_L1 = 3, // RSMA Level 1
TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2
} ETsdbType;
struct SVnodeCfg {
int32_t vgId;
char dbname[TSDB_DB_NAME_LEN];
......
......@@ -54,10 +54,13 @@ typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool;
typedef struct SQWorkerMgmt SQHandle;
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
#define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal"
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
#define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal"
#define VNODE_TSMA_DIR "tsma"
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
// vnd.h
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
......@@ -87,7 +90,7 @@ int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
// tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb);
int tsdbOpen(SVnode* pVnode, int8_t type);
int tsdbClose(STsdb* pTsdb);
int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb);
......@@ -160,6 +163,8 @@ struct SVnode {
SVBufPool* onRecycle;
SMeta* pMeta;
STsdb* pTsdb;
STsdb* pRSma1;
STsdb* pRSma2;
SWal* pWal;
STQ* pTq;
SSink* pSink;
......@@ -168,6 +173,11 @@ struct SVnode {
SQHandle* pQuery;
};
#define VND_TSDB(vnd) ((vnd)->pTsdb)
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
#define VND_RSMA1(vnd) ((vnd)->pRSma1)
#define VND_RSMA2(vnd) ((vnd)->pRSma2)
struct STbUidStore {
tb_uid_t suid;
SArray* tbUids;
......@@ -176,7 +186,11 @@ struct STbUidStore {
#define TD_VID(PVNODE) (PVNODE)->config.vgId
// typedef struct STbDdlH STbDdlH;
static FORCE_INLINE bool tsdbIsRollup(SVnode* pVnode) {
SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]);
return (pRetention->freq > 0 && pRetention->keep > 0);
}
// sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
......
......@@ -15,12 +15,34 @@
#include "tsdb.h"
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) {
static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir);
int tsdbOpen(SVnode *pVnode, int8_t type) {
switch (type) {
case TSDB_TYPE_TSDB:
return tsdbOpenImpl(pVnode, type, &VND_TSDB(pVnode), VNODE_TSDB_DIR);
case TSDB_TYPE_TSMA:
ASSERT(0);
break;
case TSDB_TYPE_RSMA_L0:
return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR);
case TSDB_TYPE_RSMA_L1:
return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR);
case TSDB_TYPE_RSMA_L2:
return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR);
default:
ASSERT(0);
break;
}
return 0;
}
int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) {
STsdb *pTsdb = NULL;
int slen = 0;
*ppTsdb = NULL;
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_TSDB_DIR) + 3;
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + 3;
// create handle
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen);
......@@ -31,7 +53,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) {
pTsdb->path = (char *)&pTsdb[1];
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
VNODE_TSDB_DIR);
dir);
pTsdb->pVnode = pVnode;
pTsdb->repoLocked = false;
tdbMutexInit(&pTsdb->mutex, NULL);
......@@ -46,7 +68,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) {
goto _err;
}
tsdbDebug("vgId: %d tsdb is opened", TD_VID(pVnode));
tsdbDebug("vgId: %d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path);
*ppTsdb = pTsdb;
return 0;
......
......@@ -103,10 +103,9 @@ typedef struct {
STSma *pSma; // cache schema
} SSmaStatItem;
#define RSMA_MAX_LEVEL 2
#define RSMA_TASK_INFO_HASH_SLOT 8
struct SRSmaInfo {
void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t
void *taskInfo[TSDB_RSMA_RETENTION_2]; // qTaskInfo_t
};
struct SSmaStat {
......@@ -128,7 +127,7 @@ static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) {
}
static FORCE_INLINE void *tsdbFreeRSmaInfo(SRSmaInfo *pInfo) {
for (int32_t i = 0; i < RSMA_MAX_LEVEL; ++i) {
for (int32_t i = 0; i < TSDB_RSMA_RETENTION_MAX; ++i) {
if (pInfo->taskInfo[i]) {
tsdbFreeTaskHandle(pInfo->taskInfo[i]);
}
......
......@@ -66,6 +66,28 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
#ifdef TSDB_VNODE_SMA_DEBUG
if (pCfg->tsdbCfg.retentions[0].freq > 0) {
int32_t nRetention = 1;
if (pCfg->tsdbCfg.retentions[1].freq > 0) {
++nRetention;
if (pCfg->tsdbCfg.retentions[2].freq > 0) {
++nRetention;
}
}
SJson *pNodeRetentions = tjsonCreateArray();
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
for (int32_t i = 0; i < nRetention; ++i) {
SJson *pNodeRetention = tjsonCreateObject();
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq);
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit);
tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep);
tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit);
tjsonAddItemToArray(pNodeRetentions, pNodeRetention);
}
}
#endif
if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
......@@ -113,6 +135,20 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
#ifdef TSDB_VNODE_SMA_DEBUG
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
int nRetention = tjsonGetArraySize(pNodeRetentions);
ASSERT(nRetention <= TSDB_RSMA_RETENTION_MAX);
for (int32_t i = 0; i < nRetention; ++i) {
SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i);
ASSERT(pNodeRetention != NULL);
tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq);
tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit);
tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep);
tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit);
}
#endif
if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
......
......@@ -96,9 +96,26 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
}
// open tsdb
if (tsdbOpen(pVnode, &pVnode->pTsdb) < 0) {
vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
if (tsdbIsRollup(pVnode)) {
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L0) < 0) {
vError("vgId: %d failed to open vnode rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L1) < 0) {
vError("vgId: %d failed to open vnode rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L2) < 0) {
vError("vgId: %d failed to open vnode rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
} else {
if (tsdbOpen(pVnode, TSDB_TYPE_TSDB) < 0) {
vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
}
// open wal
......@@ -143,6 +160,8 @@ _err:
if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(pVnode->pTsdb);
if (pVnode->pMeta) metaClose(pVnode->pMeta);
tsdbClose(VND_RSMA1(pVnode));
tsdbClose(VND_RSMA2(pVnode));
tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode);
return NULL;
......@@ -155,7 +174,9 @@ void vnodeClose(SVnode *pVnode) {
vnodeQueryClose(pVnode);
walClose(pVnode->pWal);
tqClose(pVnode->pTq);
tsdbClose(pVnode->pTsdb);
tsdbClose(VND_TSDB(pVnode));
tsdbClose(VND_RSMA1(pVnode));
tsdbClose(VND_RSMA2(pVnode));
metaClose(pVnode->pMeta);
vnodeCloseBufPool(pVnode);
// destroy handle
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册