提交 c2c7ccf7 编写于 作者: C Cary Xu

enh: rsma info lifecyle mgmt

上级 963bc989
......@@ -115,10 +115,10 @@ struct SSmaStat {
#define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem {
tmr_h tmrId;
int32_t maxDelay;
int8_t level;
int8_t triggerStat;
int32_t maxDelay;
tmr_h tmrId;
};
struct SRSmaInfo {
......@@ -173,8 +173,8 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
......@@ -233,7 +233,6 @@ void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t le
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
......
......@@ -402,7 +402,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
// step 1: merge rsmaInfoHash and iRsmaInfoHash
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
#if 0
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
// just switch the hash pointer if rsmaInfoHash is empty
if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) {
......@@ -411,25 +411,47 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash;
}
} else {
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
} else {
// free the resources
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
}
#endif
#if 1
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
smaDebug(
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
"table:%" PRIi64,
SMA_VID(pSma), *pSuid);
} else {
smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
"table:%" PRIi64,
SMA_VID(pSma), refVal, *pSuid);
}
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
continue;
}
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
} else {
// free the resources
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
}
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
}
#endif
// }
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL;
......
......@@ -171,7 +171,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
if (!pRSmaInfo) return 0;
int ref = T_REF_INC(pRSmaInfo);
smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
return 0;
......@@ -183,9 +183,6 @@ int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int ref = T_REF_DEC(pRSmaInfo);
smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
if (ref == 0) {
tdRemoveRSmaInfoBySuid(pSma, pRSmaInfo->suid);
}
return 0;
}
......
......@@ -32,11 +32,14 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *ui
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, int8_t level);
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
int8_t blkType);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid,
int8_t level);
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid,
SRSmaStat *pStat, int8_t blkType);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
......@@ -114,7 +117,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
if (isDeepFree && pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
pItem->tmrId, i + 1);
......@@ -168,7 +171,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
return TSDB_CODE_SUCCESS;
}
pRSmaInfo = tdGetRSmaInfoBySuid(pSma, *suid);
pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, *suid);
if (!pRSmaInfo) {
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
......@@ -179,6 +182,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) {
if ((qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true) < 0)) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
terrstr());
return TSDB_CODE_FAILED;
......@@ -189,6 +193,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
}
}
tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -417,7 +422,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pReq->suid);
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid);
if (!pRSmaInfo) {
smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
......@@ -429,8 +434,10 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
RSMA_INFO_SET_DEL(pRSmaInfo);
tdUnRefRSmaInfo(pSma, pRSmaInfo);
// save to file
tdReleaseRSmaInfo(pSma, pRSmaInfo);
// save to file
// TODO
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
return TSDB_CODE_SUCCESS;
}
......@@ -575,10 +582,10 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
/**
* @brief retention of rsma1/rsma2
*
* @param pSma
* @param now
* @return int32_t
*
* @param pSma
* @param now
* @return int32_t
*/
int32_t smaDoRetention(SSma *pSma, int64_t now) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -597,8 +604,8 @@ _end:
return code;
}
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
int8_t blkType) {
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid,
SRSmaStat *pStat, int8_t blkType) {
SArray *pResult = NULL;
SSma *pSma = pStat->pSma;
......@@ -711,7 +718,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
* @param suid
* @return SRSmaInfo*
*/
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL;
......@@ -725,14 +732,23 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return NULL;
}
taosRLockLatch(SMA_ENV_LOCK(pEnv));
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return pRSmaInfo;
}
if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
SRSmaInfo *pCowRSmaInfo = NULL;
......@@ -742,7 +758,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (iRSmaInfo) {
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
if (pIRSmaInfo) {
if (pIRSmaInfo && !RSMA_INFO_IS_DEL(pIRSmaInfo)) {
if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) {
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
......@@ -762,68 +778,40 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo;
ASSERT(!pCowRSmaInfo);
}
if(pCowRSmaInfo) {
tdRefRSmaInfo(pSma, pCowRSmaInfo);
}
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
return pCowRSmaInfo;
}
/**
* @brief During the drop procedure, only need to delete the object in rsmaInfoHash.
*
* @param pSma
* @param suid
* @return SRSmaInfo*
*/
void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL;
if (!pEnv) {
return;
}
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
if (!pStat || !RSMA_INFO_HASH(pStat)) {
return;
}
// unlock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo) {
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
}
taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
smaDebug("vgId:%d, remove from infoHash for table:%" PRIu64 " succeed", SMA_VID(pSma), suid);
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
if (pInfo) {
tdUnRefRSmaInfo(pSma, pInfo);
}
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
}
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, suid);
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
if (!pRSmaInfo) {
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS;
}
if (!RSMA_INFO_QTASK(pRSmaInfo,0)) {
if (!RSMA_INFO_QTASK(pRSmaInfo, 0)) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
tdRefRSmaInfo(pSma, pRSmaInfo);
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1);
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2);
tdUnRefRSmaInfo(pSma, pRSmaInfo);
}
tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -1034,7 +1022,7 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
SRSmaInfo *pRSmaInfo = NULL;
void *qTaskInfo = NULL;
pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pItem->suid);
pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pItem->suid);
if (!pRSmaInfo) {
smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
return TSDB_CODE_SUCCESS;
......@@ -1049,11 +1037,13 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
}
if (!qTaskInfo) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
return TSDB_CODE_SUCCESS;
}
if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid,
pItem->type, terrstr());
return TSDB_CODE_FAILED;
......@@ -1061,6 +1051,7 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid,
pItem->type);
tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -1239,6 +1230,12 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
while (infoHash) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
infoHash = taosHashIterate(pInfoHash, infoHash);
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
if (!taskInfo) {
......@@ -1325,92 +1322,6 @@ _err:
return TSDB_CODE_FAILED;
}
#if 0
/**
* @brief trigger to get rsma result
*
* @param param
* @param tmrId
*/
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param;
SSma *pSma = NULL;
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pItem->refId);
return;
}
pSma = pStat->pSma;
// if rsma trigger stat in paused, cancelled or finished, not start fetch task
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
switch (rsmaTriggerStat) {
case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED: {
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64
" refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle,
&pItem->tmrId);
}
return;
}
default:
break;
}
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
goto _end;
}
int8_t fetchTriggerStat =
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
pItem->level, pRSmaInfo->suid);
// sync procedure => async process
tdRefRSmaInfo(pSma, pRSmaInfo);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
tdCleanupStreamInputDataBlock(pItem->taskInfo);
tdUnRefRSmaInfo(pSma, pRSmaInfo);
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
// taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} break;
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INACTIVE: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_INIT: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma),
pItem->level, pRSmaInfo->suid);
} break;
default: {
smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
}
_end:
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
}
#endif
/**
* @brief trigger to get rsma result
*
......@@ -1426,7 +1337,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
return;
}
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId, __func__, __LINE__);
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
......@@ -1441,7 +1352,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
switch (rsmaTriggerStat) {
case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED: {
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId, __func__, __LINE__);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64
" refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId);
......@@ -1463,7 +1374,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
pItem->level, pRSmaInfo->suid);
// sync procedure => async process
tdRefRSmaInfo(pSma, pRSmaInfo);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qTaskInfo_t taskInfo = pRSmaInfo->taskInfo[pItem->level - 1];
......@@ -1472,9 +1382,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
STREAM_INPUT__DATA_BLOCK);
tdCleanupStreamInputDataBlock(taskInfo);
tdUnRefRSmaInfo(pSma, pRSmaInfo);
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
// taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} break;
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
......@@ -1495,5 +1402,5 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
}
_end:
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId, __func__, __LINE__);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
}
......@@ -287,22 +287,22 @@ int32_t tdRemoveTFile(STFile *pTFile) {
}
// smaXXXUtil ================
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) {
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
void *pResult = taosAcquireRef(rsetId, refId);
if (!pResult) {
smaWarn("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr());
smaWarn("rsma acquire ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr());
} else {
smaDebug("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId);
smaDebug("rsma acquire ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId);
}
return pResult;
}
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) {
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
if (taosReleaseRef(rsetId, refId) < 0) {
smaWarn("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr());
smaWarn("rsma release ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr());
return TSDB_CODE_FAILED;
}
smaDebug("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId);
smaDebug("rsma release ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId);
return TSDB_CODE_SUCCESS;
}
......@@ -313,7 +313,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
char *pOutput = NULL;
int32_t len = 0;
if (qSerializeTaskStatus(srcTaskInfo, &pOutput, &len) < 0) {
if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
......@@ -337,13 +337,15 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
goto _err;
}
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
taosMemoryFreeClear(pOutput);
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(pOutput);
tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
return TSDB_CODE_FAILED;
}
......@@ -376,19 +378,21 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) {
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i);
if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) {
goto _err;
}
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid);
}
metaReaderClear(&mr);
*pDest = pSrc; // pointer copy
*pDest = pSrc; // pointer copy
return TSDB_CODE_SUCCESS;
_err:
*pDest = NULL;
metaReaderClear(&mr);
// tdFreeRSmaInfo(pSma, pDest, false);
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr());
return TSDB_CODE_FAILED;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册