提交 ed34f490 编写于 作者: C Cary Xu

enh: async commit api for rsma

上级 edfd4b51
...@@ -67,7 +67,9 @@ struct SRSmaStat { ...@@ -67,7 +67,9 @@ struct SRSmaStat {
int64_t submitVer; int64_t submitVer;
int64_t refId; // shared by fetch tasks int64_t refId; // shared by fetch tasks
int8_t triggerStat; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash
}; };
struct SSmaStat { struct SSmaStat {
...@@ -78,12 +80,29 @@ struct SSmaStat { ...@@ -78,12 +80,29 @@ struct SSmaStat {
T_REF_DECLARE() T_REF_DECLARE()
}; };
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) #define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) #define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) #define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_IMU_INFO_HASH(r) ((r)->iRsmaInfoHash)
#define RSMA_REF_ID(r) ((r)->refId) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_SUBMIT_VER(r) ((r)->submitVer) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_SUBMIT_VER(r) ((r)->submitVer)
struct SRSmaInfoItem {
void *taskInfo; // qTaskInfo_t
int64_t refId;
tmr_h tmrId;
int32_t maxDelay;
int8_t level;
int8_t triggerStat;
};
struct SRSmaInfo {
STSchema *pTSchema;
int64_t suid;
SRSmaInfoItem items[TSDB_RETENTION_L2];
};
enum { enum {
TASK_TRIGGER_STAT_INIT = 0, TASK_TRIGGER_STAT_INIT = 0,
...@@ -94,6 +113,8 @@ enum { ...@@ -94,6 +113,8 @@ enum {
TASK_TRIGGER_STAT_DROPPED = 5, TASK_TRIGGER_STAT_DROPPED = 5,
}; };
#define RSMA_TASK_INFO_HASH_SLOT 8
void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
...@@ -183,7 +204,7 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { ...@@ -183,7 +204,7 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED); atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED);
} }
} }
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
......
...@@ -164,10 +164,11 @@ void smaCleanUp(); ...@@ -164,10 +164,11 @@ void smaCleanUp();
int32_t smaOpen(SVnode* pVnode); int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma); int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma); int32_t smaBegin(SSma* pSma);
int32_t smaPreCommit(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma); int32_t smaSyncCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma);
int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPreCommit(SSma* pSma);
int32_t smaAsyncPostCommit(SSma* pSma);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaCommitImpl(SSma *pSma); static int32_t tdProcessRSmaCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma); static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
/** /**
* @brief Only applicable to Rollup SMA * @brief Only applicable to Rollup SMA
...@@ -26,7 +27,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); ...@@ -26,7 +27,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); } int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); }
/** /**
* @brief Only applicable to Rollup SMA * @brief Only applicable to Rollup SMA
...@@ -34,7 +35,7 @@ int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); } ...@@ -34,7 +35,7 @@ int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); }
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); }
/** /**
* @brief Only applicable to Rollup SMA * @brief Only applicable to Rollup SMA
...@@ -42,7 +43,7 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } ...@@ -42,7 +43,7 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); }
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); } int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); }
/** /**
* @brief Only applicable to Rollup SMA * @brief Only applicable to Rollup SMA
...@@ -50,7 +51,15 @@ int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); } ...@@ -50,7 +51,15 @@ int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); }
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } int32_t smaAsyncPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }
/**
* @brief Only applicable to Rollup SMA
*
* @param pSma
* @return int32_t
*/
int32_t smaAsyncPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); }
/** /**
* @brief set rsma trigger stat active * @brief set rsma trigger stat active
...@@ -71,18 +80,17 @@ int32_t smaBegin(SSma *pSma) { ...@@ -71,18 +80,17 @@ int32_t smaBegin(SSma *pSma) {
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE); atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
switch (rsmaTriggerStat) { switch (rsmaTriggerStat) {
case TASK_TRIGGER_STAT_PAUSED: { case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d rsma trigger stat from paused to active", SMA_VID(pSma)); smaDebug("vgId:%d, rsma trigger stat from paused to active", SMA_VID(pSma));
break; break;
} }
case TASK_TRIGGER_STAT_INIT: { case TASK_TRIGGER_STAT_INIT: {
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
smaDebug("vgId:%d rsma trigger stat from init to active", SMA_VID(pSma)); smaDebug("vgId:%d, rsma trigger stat from init to active", SMA_VID(pSma));
break; break;
} }
default: { default: {
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
smaWarn("vgId:%d rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat); smaError("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat);
ASSERT(0);
break; break;
} }
} }
...@@ -90,7 +98,7 @@ int32_t smaBegin(SSma *pSma) { ...@@ -90,7 +98,7 @@ int32_t smaBegin(SSma *pSma) {
} }
/** /**
* @brief pre-commit for rollup sma. * @brief pre-commit for rollup sma(sync commit).
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
* 2) wait all triggered fetch tasks finished * 2) wait all triggered fetch tasks finished
* 3) perform persist task for qTaskInfo * 3) perform persist task for qTaskInfo
...@@ -107,8 +115,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { ...@@ -107,8 +115,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: set rsma stat paused
// step 1: set persistence task paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
// step 2: wait all triggered fetch tasks finished // step 2: wait all triggered fetch tasks finished
...@@ -236,12 +243,101 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) { ...@@ -236,12 +243,101 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
/** /**
* @brief Rsma async commit implementation * @brief Rsma async commit implementation
* 1) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) * 1) set rsma stat TASK_TRIGGER_STAT_PAUSED
* 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
* 3)
* *
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
// TODO SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: set rsma stat paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
// step 2: wait all triggered fetch tasks finished
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
break;
} else {
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
// step 3: swap rsmaInfoHash and iRsmaInfoHash
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat));
ASSERT(RSMA_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) =
taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) {
smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty.
*
* @param pSma
* @return int32_t
*/
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: merge rsmaInfoHash and iRsmaInfoHash
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
// TODO:
}
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
} else {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pRSmaInfo->items[i];
if (pItem->taskInfo) {
tdFreeQTaskInfo(pItem->taskInfo, SMA_VID(pSma), i + 1);
}
}
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
}
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
}
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
#define RSMA_TASK_INFO_HASH_SLOT 8
#define SMA_MGMT_REF_NUM 10240 #define SMA_MGMT_REF_NUM 10240
extern SSmaMgmt smaMgmt; extern SSmaMgmt smaMgmt;
...@@ -311,7 +310,6 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { ...@@ -311,7 +310,6 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) { if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", SMA_VID(pRSmaStat->pSma), smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", SMA_VID(pRSmaStat->pSma),
RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId, terrstr()); RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId, terrstr());
ASSERT(0);
} else { } else {
smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", SMA_VID(pRSmaStat->pSma), smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", SMA_VID(pRSmaStat->pSma),
RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId); RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId);
...@@ -361,7 +359,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { ...@@ -361,7 +359,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
} }
break; break;
default: default:
smaError("vgId:%d undefined smaType:%", SMA_VID(pSma), smaType); smaError("vgId:%d, undefined smaType:%", SMA_VID(pSma), smaType);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
......
...@@ -48,20 +48,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); ...@@ -48,20 +48,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed); static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
struct SRSmaInfoItem {
void *taskInfo; // qTaskInfo_t
int64_t refId;
tmr_h tmrId;
int32_t maxDelay;
int8_t level;
int8_t triggerStat;
};
struct SRSmaInfo {
STSchema *pTSchema;
int64_t suid;
SRSmaInfoItem items[TSDB_RETENTION_L2];
};
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) { static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
// adapt accordingly if definition of SRSmaInfo update // adapt accordingly if definition of SRSmaInfo update
...@@ -102,7 +89,7 @@ static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { ...@@ -102,7 +89,7 @@ static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); } static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); }
static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
...@@ -123,7 +110,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -123,7 +110,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
i + 1); i + 1);
taosTmrStopA(&pItem->tmrId); taosTmrStopA(&pItem->tmrId);
} }
tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1); tdFreeQTaskInfo(&pItem->taskInfo, SMA_VID(pSma), i + 1);
} else { } else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
pInfo->suid, i + 1); pInfo->suid, i + 1);
...@@ -257,22 +244,21 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui ...@@ -257,22 +244,21 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx) { int8_t idx) {
SRetention *pRetention = SMA_RETENTION(pSma); if ((param->qmsgLen > 0) && param->qmsg[idx]) {
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
SReadHandle handle = {
.meta = pSma->pVnode->pMeta,
.vnode = pSma->pVnode,
.initTqReader = 1,
};
SReadHandle handle = {
.meta = pSma->pVnode->pMeta,
.vnode = pSma->pVnode,
.initTqReader = 1,
};
if (param->qmsg[idx]) {
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->refId = RSMA_REF_ID(pStat); pItem->refId = RSMA_REF_ID(pStat);
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!pItem->taskInfo) { if (!pItem->taskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
goto _err; return TSDB_CODE_FAILED;
} }
pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE; pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
...@@ -286,13 +272,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -286,13 +272,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
} }
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
smaInfo("vgId:%d table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
", finally maxdelay:%" PRIi32, ", finally maxdelay:%" PRIi32,
SMA_VID(pSma), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); SMA_VID(pSma), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err:
return TSDB_CODE_FAILED;
} }
/** /**
...@@ -562,7 +546,9 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche ...@@ -562,7 +546,9 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
SSDataBlock *output = NULL; SSDataBlock *output = NULL;
uint64_t ts; uint64_t ts;
if (qExecTask(pItem->taskInfo, &output, &ts) < 0) { if (qExecTask(pItem->taskInfo, &output, &ts) < 0) {
ASSERT(false); smaError("vgId:%d, qExecTask for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
pItem->level, terrstr());
goto _err;
} }
if (!output) { if (!output) {
break; break;
...@@ -572,7 +558,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche ...@@ -572,7 +558,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
pResult = taosArrayInit(1, sizeof(SSDataBlock)); pResult = taosArrayInit(1, sizeof(SSDataBlock));
if (!pResult) { if (!pResult) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; goto _err;
} }
} }
...@@ -891,7 +877,7 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { ...@@ -891,7 +877,7 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("vgId:%d failed to restore rsma task since %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, failed to restore rsma task since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1217,6 +1203,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1217,6 +1203,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64
" refId:%d", " refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId); SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle,
&pItem->tmrId);
}
return; return;
} }
default: default:
......
...@@ -233,7 +233,7 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -233,7 +233,7 @@ int vnodeCommit(SVnode *pVnode) {
walBeginSnapshot(pVnode->pWal, pVnode->state.applied); walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit // preCommit
smaPreCommit(pVnode->pSma); smaSyncPreCommit(pVnode->pSma);
// commit each sub-system // commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) { if (metaCommit(pVnode->pMeta) < 0) {
...@@ -276,7 +276,7 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -276,7 +276,7 @@ int vnodeCommit(SVnode *pVnode) {
pVnode->state.committed = info.state.committed; pVnode->state.committed = info.state.committed;
// postCommit // postCommit
smaPostCommit(pVnode->pSma); smaSyncPostCommit(pVnode->pSma);
// apply the commit (TODO) // apply the commit (TODO)
walEndSnapshot(pVnode->pWal); walEndSnapshot(pVnode->pWal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册