提交 8c6120db 编写于 作者: K kailixu

enh: rsma asserts process

上级 ea7c1338
...@@ -147,7 +147,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -147,7 +147,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
} }
} }
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
if (pRSmaStat->commitAppliedVer < 0) { if (ASSERTS(pRSmaStat->commitAppliedVer > 0, "commit applied version %" PRIi64 " <= 0",
pRSmaStat->commitAppliedVer)) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
......
...@@ -131,8 +131,7 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) { ...@@ -131,8 +131,7 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv) (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv)
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv); : atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv);
terrno = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma); if ((terrno = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma)) != TSDB_CODE_SUCCESS) {
if (terrno != TSDB_CODE_SUCCESS) {
tdFreeSmaEnv(pEnv); tdFreeSmaEnv(pEnv);
*ppEnv = NULL; *ppEnv = NULL;
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL) (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL)
...@@ -197,7 +196,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS ...@@ -197,7 +196,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
if (!pSmaStat) { if (ASSERTS(pSmaStat != NULL, "pSmaStat is NULL")) {
terrno = TSDB_CODE_RSMA_INVALID_ENV; terrno = TSDB_CODE_RSMA_INVALID_ENV;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -257,6 +256,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS ...@@ -257,6 +256,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO // TODO
} else { } else {
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -354,6 +354,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { ...@@ -354,6 +354,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId); smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
} }
} else { } else {
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
smaError("%s failed at line %d since %s", __func__, __LINE__, terrstr()); smaError("%s failed at line %d since %s", __func__, __LINE__, terrstr());
return -1; return -1;
...@@ -374,7 +375,7 @@ int32_t tdLockSma(SSma *pSma) { ...@@ -374,7 +375,7 @@ int32_t tdLockSma(SSma *pSma) {
} }
int32_t tdUnLockSma(SSma *pSma) { int32_t tdUnLockSma(SSma *pSma) {
if (!SMA_LOCKED(pSma)) { if (ASSERTS(SMA_LOCKED(pSma), "pSma %p is not locked:%d", pSma, pSma->locked)) {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(terrno)); smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(terrno));
return -1; return -1;
......
...@@ -88,7 +88,8 @@ static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) { ...@@ -88,7 +88,8 @@ static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) {
} }
} }
if(n + sizeof(TSCKSUM) != nData) { if (ASSERTS(n + sizeof(TSCKSUM) == nData, "n:%d + sizeof(TSCKSUM):%d != nData:%d", n, (int32_t)sizeof(TSCKSUM),
nData)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _exit; goto _exit;
} }
......
...@@ -102,6 +102,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty ...@@ -102,6 +102,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
pKeepCfg->precision = pCfg->precision; pKeepCfg->precision = pCfg->precision;
switch (type) { switch (type) {
case TSDB_TYPE_TSMA: case TSDB_TYPE_TSMA:
ASSERTS(0, "undefined smaType:%d", (int32_t)type);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
break; break;
case TSDB_TYPE_RSMA_L0: case TSDB_TYPE_RSMA_L0:
...@@ -114,6 +115,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty ...@@ -114,6 +115,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
SMA_SET_KEEP_CFG(pVnode, 2); SMA_SET_KEEP_CFG(pVnode, 2);
break; break;
default: default:
ASSERTS(0, "unknown smaType:%d", (int32_t)type);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
break; break;
} }
......
...@@ -129,13 +129,17 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { ...@@ -129,13 +129,17 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
} }
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
if (ASSERTS(*pStore == NULL, "*pStore:%p != NULL", *pStore)) {
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_FAILED;
}
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
if (*pStore == NULL) { if (*pStore == NULL) {
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); terrno = TSDB_CODE_OUT_OF_MEMORY;
if (*pStore == NULL) { return TSDB_CODE_FAILED;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -298,7 +302,12 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -298,7 +302,12 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
} }
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; // make sure: pItem->level > 0 pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
if (ASSERTS(pItem->level > 0, "pItem level:%" PRIi8 " should > 0", pItem->level)) {
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_FAILED;
}
SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef));
...@@ -850,7 +859,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t ...@@ -850,7 +859,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
.initTqReader = 1, .initTqReader = 1,
}; };
if (dstTaskInfo) { if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -899,6 +908,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -899,6 +908,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
code = terrno; code = terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (mr.me.type != TSDB_SUPER_TABLE) { if (mr.me.type != TSDB_SUPER_TABLE) {
code = TSDB_CODE_RSMA_INVALID_SCHEMA; code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -907,6 +917,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -907,6 +917,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
code = TSDB_CODE_RSMA_INVALID_SCHEMA; code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (TABLE_IS_ROLLUP(mr.me.flags)) { if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam; param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
...@@ -924,8 +935,8 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -924,8 +935,8 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
_exit: _exit:
if (code) { if (code) {
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, SMA_VID(pSma), smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", flags:%" PRIi8 ",type:%" PRIi8 ", uid:%" PRIi64,
__func__, lino, tstrerror(code), pInfo->suid, mr.me.type, mr.me.uid); SMA_VID(pSma), __func__, lino, tstrerror(code), pInfo->suid, mr.me.flags, mr.me.type, mr.me.uid);
} }
metaReaderClear(&mr); metaReaderClear(&mr);
return code; return code;
...@@ -973,10 +984,8 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { ...@@ -973,10 +984,8 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
} }
tdRefRSmaInfo(pSma, pRSmaInfo); tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
if (pRSmaInfo->suid != suid) { if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, invalid rsma info in cache, suid:%" PRIi64 " != %" PRIi64, SMA_VID(pSma), pRSmaInfo->suid,
suid);
return NULL; return NULL;
} }
return pRSmaInfo; return pRSmaInfo;
...@@ -1597,7 +1606,8 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ...@@ -1597,7 +1606,8 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (oldStat == 0 || if (oldStat == 0 ||
((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
if (oldVal < 0) {
if (ASSERTS(oldVal >= 0, "oldVal of nFetchAll: %d < 0", oldVal)) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -1630,6 +1640,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ...@@ -1630,6 +1640,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
} }
} }
} else { } else {
ASSERTS(0, "unknown rsma exec type:%d", (int32_t)type);
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -1653,8 +1664,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ...@@ -1653,8 +1664,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
_exit: _exit:
taosArrayDestroy(pSubmitArr); taosArrayDestroy(pSubmitArr);
if (code) { if (code) {
smaError("vgId:%d, %s failed at line %d since %s, type:%d", TD_VID(pVnode), __func__, lino, tstrerror(code), smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
(int32_t)type);
} }
return code; return code;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册