提交 bf9b2ef8 编写于 作者: C Cary Xu

enh: rsma fetch trigger

上级 67faf7cd
...@@ -2657,6 +2657,31 @@ typedef struct { ...@@ -2657,6 +2657,31 @@ typedef struct {
SEpSet epSet; SEpSet epSet;
} SVgEpSet; } SVgEpSet;
typedef struct {
int64_t refId;
uint64_t handle;
} SRSmaFetchMsg;
static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->refId) < 0) return -1;
if (tEncodeU64(pCoder, pReq->handle) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1;
if (tDecodeU64(pCoder, &pReq->handle) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
typedef struct { typedef struct {
int8_t version; // for compatibility(default 0) int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
......
...@@ -308,12 +308,12 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { ...@@ -308,12 +308,12 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) { if (!pEnv) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: set rsma stat // step 1: set rsma stat
...@@ -337,18 +337,26 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -337,18 +337,26 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
} }
// step 3: swap rsmaInfoHash and iRsmaInfoHash // step 3: swap rsmaInfoHash and iRsmaInfoHash
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat)); // lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat)); ASSERT(RSMA_INFO_HASH(pRSmaStat));
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat); RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat) =
taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) { if (!RSMA_INFO_HASH(pRSmaStat)) {
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 4: others // step 4: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
...@@ -383,44 +391,51 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { ...@@ -383,44 +391,51 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) { if (!pEnv) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: merge rsmaInfoHash and iRsmaInfoHash // step 1: merge rsmaInfoHash and iRsmaInfoHash
taosWLockLatch(SMA_ENV_LOCK(pSmaEnv)); // lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
// TODO: optimization - just switch the hash pointer if rsmaInfoHash is empty // just switch the hash pointer if rsmaInfoHash is empty
} if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) {
SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat);
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat);
while (pIter) { RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash;
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
} else {
// free the resources
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
} }
} else {
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
} else {
// free the resources
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
}
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
}
} }
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; RSMA_IMU_INFO_HASH(pRSmaStat) = NULL;
taosWUnLockLatch(SMA_ENV_LOCK(pSmaEnv)); // unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 2: cleanup outdated qtaskinfo files // step 2: cleanup outdated qtaskinfo files
tdCleanupQTaskInfoFiles(pSma, pRSmaStat); tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
#define SMA_MGMT_REF_NUM 10240 #define SMA_MGMT_REF_NUM 10240
extern SSmaMgmt smaMgmt; extern SSmaMgmt smaMgmt;
......
...@@ -38,6 +38,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid); ...@@ -38,6 +38,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
int8_t blkType); int8_t blkType);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaFetchTrigger2(void *param, void *tmrId);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
...@@ -567,6 +568,30 @@ static void tdDestroySDataBlockArray(SArray *pArray) { ...@@ -567,6 +568,30 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
/**
* @brief retention of rsma1/rsma2
*
* @param pSma
* @param now
* @return int32_t
*/
int32_t smaDoRetention(SSma *pSma, int64_t now) {
int32_t code = TSDB_CODE_SUCCESS;
if (VND_IS_RSMA(pSma->pVnode)) {
return code;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbDoRetention(pSma->pRSmaTsdb[i], now);
if (code) goto _end;
}
}
_end:
return code;
}
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
int8_t blkType) { int8_t blkType) {
SArray *pResult = NULL; SArray *pResult = NULL;
...@@ -697,7 +722,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { ...@@ -697,7 +722,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return pRSmaInfo; return pRSmaInfo;
} }
if (RSMA_COMMIT_STAT(pStat) == 0) { if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat
return NULL; return NULL;
} }
...@@ -705,23 +730,29 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { ...@@ -705,23 +730,29 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
SRSmaInfo *pCowRSmaInfo = NULL; SRSmaInfo *pCowRSmaInfo = NULL;
// lock // lock
taosWLockLatch(SMA_ENV_LOCK(pEnv)); taosWLockLatch(SMA_ENV_LOCK(pEnv));
if (!taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t))) { // 2-phase lock if (!(pCowRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)))) { // 2-phase lock
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (iRSmaInfo) { if (iRSmaInfo) {
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo; SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
if (pIRSmaInfo) { if (pIRSmaInfo) {
if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) { if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) {
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
return NULL; return NULL;
} }
smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid); smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid);
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) { if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) {
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
return NULL; return NULL;
} }
} }
} }
} else {
pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo;
ASSERT(!pCowRSmaInfo);
} }
// unlock // unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
...@@ -1365,19 +1396,86 @@ _end: ...@@ -1365,19 +1396,86 @@ _end:
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
} }
int32_t smaDoRetention(SSma *pSma, int64_t now) { /**
int32_t code = TSDB_CODE_SUCCESS; * @brief trigger to get rsma result
if (VND_IS_RSMA(pSma->pVnode)) { *
return code; * @param param
* @param tmrId
*/
static void tdRSmaFetchTrigger2(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param;
SSma *pSma = NULL;
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pItem->refId);
return;
} }
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { pSma = pStat->pSma;
if (pSma->pRSmaTsdb[i]) {
code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); // if rsma trigger stat in paused, cancelled or finished, not start fetch task
if (code) goto _end; int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
switch (rsmaTriggerStat) {
case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED: {
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64
" refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle,
&pItem->tmrId);
}
return;
} }
default:
break;
}
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
goto _end;
}
int8_t fetchTriggerStat =
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
pItem->level, pRSmaInfo->suid);
// sync procedure => async process
tdRefRSmaInfo(pSma, pRSmaInfo);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
tdCleanupStreamInputDataBlock(pItem->taskInfo);
tdUnRefRSmaInfo(pSma, pRSmaInfo);
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
// taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} break;
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INACTIVE: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INIT: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma),
pItem->level, pRSmaInfo->suid);
} break;
default: {
smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
} }
_end: _end:
return code; tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册