未验证 提交 cc18aa74 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #14721 from taosdata/feature/TD-11274-3.0

enh: rsma optimization for info item and tmr handle
......@@ -49,6 +49,7 @@ struct SSmaEnv {
typedef struct {
int8_t inited;
int32_t rsetId;
void *tmrHandle; // shared by all fetch tasks
} SSmaMgmt;
#define SMA_ENV_LOCK(env) ((env)->lock)
......@@ -65,7 +66,6 @@ struct SRSmaStat {
SSma *pSma;
int64_t submitVer;
int64_t refId; // shared by fetch tasks
void *tmrHandle; // shared by fetch tasks
int8_t triggerStat; // shared by fetch tasks
int8_t runningStat; // for persistence task
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
......@@ -82,7 +82,6 @@ struct SSmaStat {
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
#define RSMA_TMR_HANDLE(r) ((r)->tmrHandle)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
#define RSMA_REF_ID(r) ((r)->refId)
......@@ -189,7 +188,7 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
......
......@@ -49,16 +49,26 @@ int32_t smaInit() {
}
if (old == 0) {
// init tref rset
smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
if (smaMgmt.rsetId < 0) {
atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma rset since %s", terrstr());
return TSDB_CODE_FAILED;
}
// init fetch timer handle
smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
if (!smaMgmt.tmrHandle) {
taosCloseRef(smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma tmr hanle since %s", terrstr());
return TSDB_CODE_FAILED;
}
smaInfo("sma rset is initialized, rsetId:%d", smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 1);
smaInfo("sma mgmt env is initialized, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
}
return TSDB_CODE_SUCCESS;
......@@ -81,8 +91,9 @@ void smaCleanUp() {
}
if (old == 1) {
smaInfo("sma rset is cleaned up, resetId:%d", smaMgmt.rsetId);
taosCloseRef(smaMgmt.rsetId);
taosTmrCleanUp(smaMgmt.tmrHandle);
smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
atomic_store_8(&smaMgmt.inited, 0);
}
}
......@@ -203,20 +214,11 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
}
pRSmaStat->refId = refId;
// init timer
RSMA_TMR_HANDLE(pRSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA");
if (!RSMA_TMR_HANDLE(pRSmaStat)) {
taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED;
}
// init hash
RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) {
if (RSMA_TMR_HANDLE(pRSmaStat)) {
taosTmrCleanUp(RSMA_TMR_HANDLE(pRSmaStat));
}
taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED;
}
......@@ -277,7 +279,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
while (infoHash) {
SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
tdFreeRSmaInfo(pSmaInfo);
tdFreeRSmaInfo(pSma, pSmaInfo);
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
}
}
......@@ -298,11 +300,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
nLoops = 0;
}
}
// step 6: cleanup the timer handle
if (RSMA_TMR_HANDLE(pStat)) {
taosTmrCleanUp(RSMA_TMR_HANDLE(pStat));
}
}
}
......
......@@ -27,16 +27,19 @@ SSmaMgmt smaMgmt = {
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle,
int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
tb_uid_t suid, int8_t level);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaPersistTrigger(void *param, void *tmrId);
static void *tdRSmaPersistExec(void *param);
static void tdRSmaQTaskInfoGetFName(int32_t vid, int64_t version, char *outputName);
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
SReadHandle *handle, int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
STSchema *pTSchema, tb_uid_t suid, int8_t level);
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
int8_t blkType);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaPersistTrigger(void *param, void *tmrId);
static void *tdRSmaPersistExec(void *param);
static void tdRSmaQTaskInfoGetFName(int32_t vid, int64_t version, char *outputName);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
......@@ -48,25 +51,26 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo;
int64_t refId;
void *taskInfo; // qTaskInfo_t
tmr_h tmrId;
int8_t level;
int8_t tmrInitFlag;
int8_t triggerStat;
int32_t maxDelay;
void *taskInfo; // qTaskInfo_t
int64_t refId;
tmr_h tmrId;
int32_t maxDelay;
int8_t level;
int8_t triggerStat;
};
struct SRSmaInfo {
STSchema *pTSchema;
SRSmaStat *pStat;
int64_t suid;
SRSmaInfoItem items[TSDB_RETENTION_L2];
};
#define RSMA_INFO_SMA(r) ((r)->pStat->pSma)
#define RSMA_INFO_STAT(r) ((r)->pStat)
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
// adapt accordingly if definition of SRSmaInfo update
int32_t rsmaInfoHeadLen = sizeof(int64_t) + sizeof(STSchema *);
ASSERT(pItem->level == 1 || pItem->level == 2);
return (SRSmaInfo *)POINTER_SHIFT(pItem, -sizeof(SRSmaInfoItem) * (pItem->level - 1) - rsmaInfoHeadLen);
}
struct SRSmaQTaskInfoItem {
int32_t len;
......@@ -108,9 +112,8 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId,
}
}
void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
if (pInfo) {
SSma *pSma = RSMA_INFO_SMA(pInfo);
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
if (pItem->taskInfo) {
......@@ -143,8 +146,6 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
}
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
if (!suid || !tbUids) {
......@@ -153,8 +154,9 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
return TSDB_CODE_FAILED;
}
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
pRSmaInfo = tdGetRSmaInfoBySuid(pSma, *suid);
if (!pRSmaInfo) {
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
terrno = TSDB_CODE_RSMA_INVALID_STAT;
return TSDB_CODE_FAILED;
......@@ -252,15 +254,14 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return TSDB_CODE_SUCCESS;
}
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *pReadHandle,
int8_t idx) {
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
SReadHandle *pReadHandle, int8_t idx) {
SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
if (param->qmsg[idx]) {
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->refId = RSMA_REF_ID(pRSmaInfo->pStat);
pItem->pRsmaInfo = pRSmaInfo;
pItem->refId = RSMA_REF_ID(pStat);
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle);
if (!pItem->taskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
......@@ -348,14 +349,13 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto _err;
}
pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->pStat = pStat;
pRSmaInfo->suid = suid;
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) {
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, &handle, 0) < 0) {
goto _err;
}
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 1) < 0) {
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, &handle, 1) < 0) {
goto _err;
}
......@@ -367,7 +367,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_SUCCESS;
_err:
tdFreeRSmaInfo(pRSmaInfo);
tdFreeRSmaInfo(pSma, pRSmaInfo);
taosMemoryFree(pReader);
return TSDB_CODE_FAILED;
}
......@@ -538,10 +538,10 @@ int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) {
return atomic_load_64(&pRSmaStat->submitVer);
}
static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) {
SArray *pResult = NULL;
SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo;
SSma *pSma = RSMA_INFO_SMA(pRSmaInfo);
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
int8_t blkType) {
SArray *pResult = NULL;
SSma *pSma = pStat->pSma;
while (1) {
SSDataBlock *output = NULL;
......@@ -573,16 +573,16 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled
if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) {
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma),
pRSmaInfo->suid, pItem->level, terrstr());
suid, pItem->level, terrstr());
goto _err;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pRSmaInfo->pStat->submitVer, 1), pReq) < 0) {
if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pStat->submitVer, 1), pReq) < 0) {
taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
pRSmaInfo->suid, pItem->level, terrstr());
suid, pItem->level, terrstr());
goto _err;
}
......@@ -600,84 +600,16 @@ _err:
return TSDB_CODE_FAILED;
}
/**
* @brief trigger to get rsma result
*
* @param param
* @param tmrId
*/
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param;
SSma *pSma = NULL;
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pItem->refId);
return;
}
pSma = RSMA_INFO_SMA(pItem->pRsmaInfo);
// if rsma trigger stat in paused, cancelled or finished, not start fetch task
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
switch (rsmaTriggerStat) {
case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED:
case TASK_TRIGGER_STAT_FINISHED: {
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is %" PRIi8
", rsetId rsetId:%" PRIi64 " refId:%d",
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId);
return;
}
default:
break;
}
int8_t fetchTriggerStat =
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
pItem->level, pItem->pRsmaInfo->suid);
tdRefSmaStat(pSma, (SSmaStat *)pStat);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_BLOCK);
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
} break;
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INACTIVE: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INIT: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma),
pItem->level, pItem->pRsmaInfo->suid);
} break;
default: {
smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown",
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
} break;
}
_end:
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
}
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid,
int8_t level) {
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem,
STSchema *pTSchema, tb_uid_t suid, int8_t level) {
if (!pItem || !pItem->taskInfo) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
return TSDB_CODE_SUCCESS;
}
if (!pTSchema) {
smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
pItem->taskInfo, suid);
......@@ -687,14 +619,14 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
return TSDB_CODE_FAILED;
}
tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_SUBMIT);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
if (pStat->tmrHandle) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pStat->tmrHandle, &pItem->tmrId);
tdRSmaFetchAndSubmitResult(pItem, pTSchema, suid, pStat, STREAM_INPUT__DATA_SUBMIT);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
if (smaMgmt.tmrHandle) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} else {
ASSERT(0);
}
......@@ -702,19 +634,29 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
return TSDB_CODE_SUCCESS;
}
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
if (!pEnv) {
// only applicable when rsma env exists
return TSDB_CODE_SUCCESS;
return NULL;
}
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
if (!pStat || !RSMA_INFO_HASH(pStat)) {
return NULL;
}
SRSmaInfo *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
return NULL;
}
return pRSmaInfo;
}
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, suid);
if (!pRSmaInfo) {
smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS;
}
......@@ -725,8 +667,8 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
}
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], suid, TSDB_RETENTION_L1);
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], suid, TSDB_RETENTION_L2);
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L1);
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L2);
}
return TSDB_CODE_SUCCESS;
......@@ -939,13 +881,11 @@ _err:
}
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) {
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv);
SRSmaInfo *pRSmaInfo = NULL;
void *qTaskInfo = NULL;
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &pItem->suid, sizeof(pItem->suid));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pItem->suid);
if (!pRSmaInfo) {
smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
return TSDB_CODE_SUCCESS;
}
......@@ -1350,3 +1290,79 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
}
taosReleaseRef(smaMgmt.rsetId, rsmaStat->refId);
}
/**
* @brief trigger to get rsma result
*
* @param param
* @param tmrId
*/
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param;
SSma *pSma = NULL;
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pItem->refId);
return;
}
pSma = pStat->pSma;
// if rsma trigger stat in paused, cancelled or finished, not start fetch task
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
switch (rsmaTriggerStat) {
case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED:
case TASK_TRIGGER_STAT_FINISHED: {
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64
" refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId);
return;
}
default:
break;
}
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
ASSERT(pRSmaInfo->suid > 0);
int8_t fetchTriggerStat =
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
pItem->level, pRSmaInfo->suid);
tdRefSmaStat(pSma, (SSmaStat *)pStat);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
} break;
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INACTIVE: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INIT: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma),
pItem->level, pRSmaInfo->suid);
} break;
default: {
smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
}
_end:
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册