提交 68dfcfff 编写于 作者: C Cary Xu

refactor: rsma code optimization

上级 bc4e0b18
......@@ -245,7 +245,6 @@ struct SVnode {
struct STbUidStore {
tb_uid_t suid;
tb_uid_t uid; // TODO: just for debugging, remove when uid provided in SSDataBlock
SArray* tbUids;
SHashObj* uidHash;
};
......
......@@ -16,35 +16,12 @@
#include "sma.h"
#include "tstream.h"
static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
tb_uid_t suid, int8_t level);
#define SET_RSMA_INFO_ITEM_PARAMS(__idx, __level) \
if (param->qmsg[__idx]) { \
pRSmaInfo->items[__idx].pRsmaInfo = pRSmaInfo; \
pRSmaInfo->items[__idx].taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], &handle); \
if (!pRSmaInfo->items[__idx].taskInfo) { \
goto _err; \
} \
pRSmaInfo->items[__idx].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; \
if (param->maxdelay[__idx] < 1) { \
int64_t msInterval = \
convertTimeFromPrecisionToUnit(pRetention[__level].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); \
pRSmaInfo->items[__idx].maxDelay = msInterval; \
} else { \
pRSmaInfo->items[__idx].maxDelay = param->maxdelay[__idx]; \
} \
if (pRSmaInfo->items[__idx].maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { \
pRSmaInfo->items[__idx].maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; \
} \
pRSmaInfo->items[__idx].level = TSDB_RETENTION_L##__level; \
pRSmaInfo->items[__idx].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA"); \
if (!pRSmaInfo->items[__idx].tmrHandle) { \
goto _err; \
} \
}
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle,
int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
tb_uid_t suid, int8_t level);
struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo;
......@@ -81,7 +58,7 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) {
void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
if (pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
if (pItem->taskInfo) {
tdFreeTaskHandle(pItem->taskInfo);
......@@ -213,6 +190,39 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return TSDB_CODE_SUCCESS;
}
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *pReadHandle,
int8_t idx) {
SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
if (param->qmsg[idx]) {
pRSmaInfo->items[idx].pRsmaInfo = pRSmaInfo;
pRSmaInfo->items[idx].taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], pReadHandle);
if (!pRSmaInfo->items[idx].taskInfo) {
goto _err;
}
pRSmaInfo->items[idx].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
pRSmaInfo->items[idx].maxDelay = msInterval;
} else {
pRSmaInfo->items[idx].maxDelay = param->maxdelay[idx];
}
if (pRSmaInfo->items[idx].maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
pRSmaInfo->items[idx].maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
}
pRSmaInfo->items[idx].level = (idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2);
pRSmaInfo->items[idx].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
if (!pRSmaInfo->items[idx].tmrHandle) {
goto _err;
}
}
return TSDB_CODE_SUCCESS;
_err:
return TSDB_CODE_FAILED;
}
/**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
*
......@@ -282,14 +292,14 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
pRSmaInfo->pSma = pSma;
pRSmaInfo->suid = pReq->suid;
SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
SET_RSMA_INFO_ITEM_PARAMS(0, 1);
SET_RSMA_INFO_ITEM_PARAMS(1, 2);
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) {
goto _err;
}
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 1) < 0) {
goto _err;
}
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
TSDB_CODE_SUCCESS) {
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
goto _err;
} else {
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid);
......@@ -418,7 +428,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
if (!pBlock) break;
tdUidStorePut(pStore, msgIter.suid, NULL);
pStore->uid = msgIter.uid; // TODO: remove, just for debugging
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册