未验证 提交 f24703f8 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #19308 from taosdata/enh/TD-21211-3.0

enh: sma assert/log optimization
......@@ -116,68 +116,6 @@ _exit:
return code;
}
// SQTaskFile ======================================================
/**
* @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be
* multiple qtaskinfo files supporting snapshot replication.
*
* @param pSma
* @param pStat
* @return int32_t
*/
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
#if 0
SVnode *pVnode = pSma->pVnode;
SRSmaFS *pFS = RSMA_FS(pStat);
int64_t committed = pStat->commitAppliedVer;
int64_t fsMaxVer = -1;
char qTaskInfoFullName[TSDB_FILENAME_LEN];
taosWLockLatch(RSMA_FS_LOCK(pStat));
for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) {
SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i);
int32_t oldVal = atomic_fetch_sub_32(&pTaskF->nRef, 1);
if ((oldVal <= 1) && (pTaskF->version < committed)) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (taosRemoveFile(qTaskInfoFullName) < 0) {
smaWarn("vgId:%d, cleanup qinf, committed %" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
qTaskInfoFullName, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, cleanup qinf, committed %" PRIi64 ", success to remove %s", TD_VID(pVnode), committed,
qTaskInfoFullName);
}
taosArrayRemove(pFS->aQTaskInf, i);
continue;
}
++i;
}
if (taosArrayGetSize(pFS->aQTaskInf) > 0) {
fsMaxVer = ((SQTaskFile *)taosArrayGetLast(pFS->aQTaskInf))->version;
}
if (fsMaxVer < committed) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), committed, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (taosCheckExistFile(qTaskInfoFullName)) {
SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
if (!taosArrayPush(pFS->aQTaskInf, &qFile)) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
}
} else {
smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode),
committed, fsMaxVer);
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
#endif
return TSDB_CODE_SUCCESS;
}
/**
* @brief Rsma async commit implementation(only do some necessary light weighted task)
* 1) set rsma stat TASK_TRIGGER_STAT_PAUSED
......@@ -187,7 +125,8 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
* @return int32_t
*/
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
int32_t code = 0;
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) {
......@@ -208,7 +147,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
}
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
// ASSERT(pRSmaStat->commitAppliedVer > 0);
if (ASSERTS(pRSmaStat->commitAppliedVer >= 0, "commit applied version %" PRIi64 " < 0",
pRSmaStat->commitAppliedVer)) {
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
// step 2: wait for all triggered fetch tasks to finish
nLoops = 0;
......@@ -242,9 +185,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
}
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if ((code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat))) != 0) {
return code;
}
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone
......@@ -252,8 +195,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat));
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
......@@ -271,9 +212,19 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
// all rsma results are written completely
STsdb *pTsdb = NULL;
if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
if ((pTsdb = VND_RSMA1(pSma->pVnode))) {
code = tsdbPrepareCommit(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
}
if ((pTsdb = VND_RSMA2(pSma->pVnode))) {
code = tsdbPrepareCommit(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -360,8 +311,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
}
tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
return TSDB_CODE_SUCCESS;
......
......@@ -131,7 +131,7 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv)
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv);
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
if ((terrno = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma)) != TSDB_CODE_SUCCESS) {
tdFreeSmaEnv(pEnv);
*ppEnv = NULL;
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL)
......@@ -193,10 +193,16 @@ static void tRSmaInfoHashFreeNode(void *data) {
}
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
ASSERT(pSmaStat != NULL);
int32_t code = 0;
int32_t lino = 0;
if (ASSERTS(pSmaStat != NULL, "pSmaStat is NULL")) {
terrno = TSDB_CODE_RSMA_INVALID_ENV;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (*pSmaStat) { // no lock
return TSDB_CODE_SUCCESS;
return code; // success, return directly
}
/**
......@@ -207,8 +213,8 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if (!(*pSmaStat)) {
*pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat) + sizeof(TdThread) * tsNumOfVnodeRsmaThreads);
if (!(*pSmaStat)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (smaType == TSDB_SMA_TYPE_ROLLUP) {
......@@ -224,7 +230,8 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if (refId < 0) {
smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno));
return TSDB_CODE_FAILED;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
} else {
smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
smaMgmt.rsetId, SMA_MGMT_REF_NUM);
......@@ -235,22 +242,30 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) {
return TSDB_CODE_FAILED;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
taosHashSetFreeFp(RSMA_INFO_HASH(pRSmaStat), tRSmaInfoHashFreeNode);
if (tdRsmaStartExecutor(pSma) < 0) {
return TSDB_CODE_FAILED;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat));
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO
} else {
ASSERT(0);
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
return TSDB_CODE_SUCCESS;
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
}
return code;
}
static void tdDestroyTSmaStat(STSmaStat *pStat) {
......@@ -339,7 +354,10 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
}
} else {
ASSERT(0);
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
terrno = TSDB_CODE_APP_ERROR;
smaError("%s failed at line %d since %s", __func__, __LINE__, terrstr());
return -1;
}
}
return 0;
......@@ -348,7 +366,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
int32_t tdLockSma(SSma *pSma) {
int code = taosThreadMutexLock(&pSma->mutex);
if (code != 0) {
smaError("vgId:%d, failed to lock td since %s", SMA_VID(pSma), strerror(errno));
smaError("vgId:%d, failed to lock since %s", SMA_VID(pSma), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
......@@ -357,12 +375,17 @@ int32_t tdLockSma(SSma *pSma) {
}
int32_t tdUnLockSma(SSma *pSma) {
ASSERT(SMA_LOCKED(pSma));
if (ASSERTS(SMA_LOCKED(pSma), "pSma %p is not locked:%d", pSma, pSma->locked)) {
terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(terrno));
return -1;
}
pSma->locked = false;
int code = taosThreadMutexUnlock(&pSma->mutex);
if (code != 0) {
smaError("vgId:%d, failed to unlock td since %s", SMA_VID(pSma), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), strerror(errno));
return -1;
}
return 0;
......
......@@ -77,7 +77,7 @@ static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) {
int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF);
if (nt < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
......@@ -88,7 +88,11 @@ static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) {
}
}
ASSERT(n + sizeof(TSCKSUM) == nData);
if (ASSERTS(n + sizeof(TSCKSUM) == nData, "n:%d + sizeof(TSCKSUM):%d != nData:%d", n, (int32_t)sizeof(TSCKSUM),
nData)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
_exit:
return code;
......@@ -545,87 +549,6 @@ int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile,
_exit:
return code;
}
#if 0
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) {
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version};
SQTaskFile *pTaskF = NULL;
int32_t oldVal = 0;
taosRLockLatch(RSMA_FS_LOCK(pStat));
if (suid > 0 && level > 0) {
ASSERT(version > 0);
if ((pTaskF = taosArraySearch(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ))) {
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
}
} else {
// ref all
int32_t size = taosArrayGetSize(aQTaskInf);
for (int32_t i = 0; i < size; ++i) {
pTaskF = TARRAY_GET_ELEM(aQTaskInf, i);
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
ASSERT(oldVal > 0);
}
}
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
return oldVal;
}
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) {
SVnode *pVnode = pSma->pVnode;
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
char qTaskFullName[TSDB_FILENAME_LEN];
SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version};
SQTaskFile *pTaskF = NULL;
int32_t idx = -1;
taosWLockLatch(RSMA_FS_LOCK(pStat));
if (suid > 0 && level > 0) {
ASSERT(version > 0);
if ((idx = taosArraySearchIdx(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ)) >= 0) {
ASSERT(idx < taosArrayGetSize(aQTaskInf));
pTaskF = taosArrayGet(aQTaskInf, idx);
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
}
taosArrayRemove(aQTaskInf, idx);
}
}
} else {
for (int32_t i = 0; i < taosArrayGetSize(aQTaskInf);) {
pTaskF = TARRAY_GET_ELEM(aQTaskInf, i);
int32_t nRef = INT32_MAX;
if (pTaskF->version == version) {
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
} else if (pTaskF->version < version) {
nRef = atomic_load_32(&pTaskF->nRef);
}
if (nRef <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
if (taosRemoveFile(qTaskFullName) < 0) {
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
}
taosArrayRemove(aQTaskInf, i);
continue;
}
++i;
}
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
}
#endif
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) {
int32_t code = 0;
......
......@@ -34,13 +34,16 @@ static int32_t rsmaRestore(SSma *pSma);
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
if (!RETENTION_VALID(r)) { \
if (l == 0) { \
goto _err; \
code = TSDB_CODE_INVALID_PARA; \
TSDB_CHECK_CODE(code, lino, _exit); \
} \
break; \
} \
smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
code = smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
TSDB_CHECK_CODE(code, lino, _exit); \
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback) < 0) { \
goto _err; \
code = terrno; \
TSDB_CHECK_CODE(code, lino, _exit); \
} \
} while (0)
......@@ -68,12 +71,10 @@ static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t p
days = keepDuration;
}
if (level == TSDB_RETENTION_L0) {
goto end;
if (level < TSDB_RETENTION_L1 || level > TSDB_RETENTION_L2) {
goto _exit;
}
ASSERT(level >= TSDB_RETENTION_L1 && level <= TSDB_RETENTION_L2);
freqDuration = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE);
keepDuration = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE);
......@@ -91,16 +92,18 @@ static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t p
if (days < freqDuration) {
days = freqDuration;
}
end:
_exit:
smaInfo("vgId:%d, evaluated duration for level %d is %d, raw val:%d", TD_VID(pVnode), level + 1, days, duration);
return days;
}
int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) {
terrno = 0;
pKeepCfg->precision = pCfg->precision;
switch (type) {
case TSDB_TYPE_TSMA:
ASSERT(0);
ASSERTS(0, "undefined smaType:%d", (int32_t)type);
terrno = TSDB_CODE_APP_ERROR;
break;
case TSDB_TYPE_RSMA_L0:
SMA_SET_KEEP_CFG(pVnode, 0);
......@@ -112,19 +115,22 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
SMA_SET_KEEP_CFG(pVnode, 2);
break;
default:
ASSERT(0);
ASSERTS(0, "unknown smaType:%d", (int32_t)type);
terrno = TSDB_CODE_APP_ERROR;
break;
}
return 0;
return terrno;
}
int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
STsdbCfg *pCfg = &pVnode->config.tsdbCfg;
SSma *pSma = taosMemoryCalloc(1, sizeof(SSma));
if (!pSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pVnode->pSma = pSma;
......@@ -143,21 +149,24 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
} else if (i == TSDB_RETENTION_L2) {
SMA_OPEN_RSMA_IMPL(pVnode, 2);
} else {
terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, sma open failed since %s, level:%d", TD_VID(pVnode), terrstr(), i);
goto _err;
code = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, sma open failed since %s, level:%d", TD_VID(pVnode), tstrerror(code), i);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// restore the rsma
if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback) < 0) {
goto _err;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
return 0;
_err:
return -1;
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t smaClose(SSma *pSma) {
......
......@@ -129,12 +129,17 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
}
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
ASSERT(*pStore == NULL);
if (ASSERTS(*pStore == NULL, "*pStore:%p != NULL", *pStore)) {
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_FAILED;
}
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
if (*pStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
......@@ -163,15 +168,14 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) {
if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0)) {
if ((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
terrstr());
return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 " uid:%" PRIi64 " level %d",
SMA_VID(pSma), pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0), i);
}
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 " uid:%" PRIi64 " level %d",
SMA_VID(pSma), pRSmaInfo->taskInfo[i], *suid, *(int64_t *)taosArrayGet(tbUids, 0), i);
}
}
......@@ -232,8 +236,6 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return TSDB_CODE_SUCCESS;
}
ASSERT(ppStore != NULL);
if (!(*ppStore)) {
if (tdUidStoreInit(ppStore) < 0) {
return TSDB_CODE_FAILED;
......@@ -300,12 +302,15 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
}
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
ASSERT(pItem->level > 0);
if (ASSERTS(pItem->level > 0, "pItem level:%" PRIi8 " should > 0", pItem->level)) {
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_FAILED;
}
SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef));
pItem->fetchLevel = pItem->level;
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
......@@ -831,50 +836,51 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param,
tb_uid_t suid, int8_t idx) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
char *pOutput = NULL;
int32_t len = 0;
if (!srcTaskInfo) {
terrno = TSDB_CODE_INVALID_PTR;
code = TSDB_CODE_INVALID_PTR;
smaWarn("vgId:%d, rsma clone, table %" PRIi64 ", no need since srcTaskInfo is NULL", TD_VID(pVnode), suid);
return TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _exit);
}
if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
code = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len);
TSDB_CHECK_CODE(code, lino, _exit);
SReadHandle handle = {
.meta = pVnode->pMeta,
.vnode = pVnode,
.initTqReader = 1,
};
ASSERT(!dstTaskInfo);
if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) {
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!dstTaskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
goto _err;
code = TSDB_CODE_RSMA_QTASKINFO_CREATE;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) {
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
code = qDeserializeTaskStatus(dstTaskInfo, pOutput, len);
TSDB_CHECK_CODE(code, lino, _exit);
smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
_exit:
taosMemoryFreeClear(pOutput);
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(pOutput);
tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
return TSDB_CODE_FAILED;
if (code) {
tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
}
return code;
}
/**
......@@ -885,43 +891,53 @@ _err:
* @return int32_t
*/
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
SRSmaParam *param = NULL;
SMetaReader mr = {0};
if (!pInfo) {
return TSDB_CODE_SUCCESS;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaGetTableEntryByUidCache(&mr, pInfo->suid) < 0) {
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
terrstr());
goto _err;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == pInfo->suid);
if (mr.me.type != TSDB_SUPER_TABLE) {
code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (mr.me.uid != pInfo->suid) {
code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (!pInfo->iTaskInfo[i]) {
continue;
}
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
goto _err;
}
code = tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
} else {
terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
goto _err;
code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", flags:%" PRIi8 ",type:%" PRIi8 ", uid:%" PRIi64,
SMA_VID(pSma), __func__, lino, tstrerror(code), pInfo->suid, mr.me.flags, mr.me.type, mr.me.uid);
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
_err:
metaReaderClear(&mr);
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
return TSDB_CODE_FAILED;
return code;
}
/**
......@@ -932,10 +948,14 @@ _err:
* @return SRSmaInfo*
*/
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
int32_t code = 0;
int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL;
terrno = 0;
if (!pEnv) {
terrno = TSDB_CODE_RSMA_INVALID_ENV;
return NULL;
......@@ -955,14 +975,17 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return NULL;
}
if (!pRSmaInfo->taskInfo[0]) {
if (tdRSmaInfoClone(pSma, pRSmaInfo) < 0) {
if ((terrno = tdRSmaInfoClone(pSma, pRSmaInfo)) < 0) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
}
tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(pRSmaInfo->suid == suid);
if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) {
terrno = TSDB_CODE_APP_ERROR;
return NULL;
}
return pRSmaInfo;
}
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
......@@ -1010,7 +1033,11 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
}
}
} else {
ASSERT(0);
terrno = TSDB_CODE_APP_ERROR;
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid,
tstrerror(terrno), inputType);
return TSDB_CODE_FAILED;
}
tdReleaseRSmaInfo(pSma, pRSmaInfo);
......@@ -1063,19 +1090,22 @@ _err:
* @return int32_t
*/
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SArray *suidList = NULL;
STbUidStore uidStore = {0};
SMetaReader mr = {0};
tb_uid_t suid = 0;
if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (vnodeGetStbIdList(pSma->pVnode, 0, suidList) < 0) {
smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr());
goto _err;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t arrSize = taosArrayGetSize(suidList);
......@@ -1092,19 +1122,26 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
int64_t nRsmaTables = 0;
metaReaderInit(&mr, SMA_META(pSma), 0);
if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
goto _err;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int64_t i = 0; i < arrSize; ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
if (metaGetTableEntryByUidCache(&mr, suid) < 0) {
smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
tDecoderClear(&mr.coder);
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == suid);
if (mr.me.type != TSDB_SUPER_TABLE) {
code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (mr.me.uid != suid) {
code = TSDB_CODE_RSMA_INVALID_SCHEMA;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (TABLE_IS_ROLLUP(mr.me.flags)) {
++nRsmaTables;
SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
......@@ -1114,22 +1151,20 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
}
if (tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) {
smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
goto _err;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
// reload all ctbUids for suid
uidStore.suid = suid;
if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
smaError("vgId:%d, rsma restore, get ctb idlist failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) {
smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
taosArrayClear(uidStore.tbUids);
......@@ -1138,21 +1173,18 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
}
}
metaReaderClear(&mr);
taosArrayDestroy(suidList);
tdUidStoreDestory(&uidStore);
if (nTables) {
*nTables = nRsmaTables;
}
return TSDB_CODE_SUCCESS;
_err:
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, TD_VID(pVnode),
__func__, lino, tstrerror(code), suid, mr.me.type, mr.me.uid);
}
metaReaderClear(&mr);
taosArrayDestroy(suidList);
tdUidStoreDestory(&uidStore);
return TSDB_CODE_FAILED;
return code;
}
/**
......@@ -1254,7 +1286,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
}
ASSERT(size > 0);
int64_t offset = 0;
if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) {
......@@ -1374,10 +1405,11 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
// async process
pItem->fetchLevel = pItem->level;
#if 0
// debugging codes
SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid);
SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1);
ASSERT(qItem->level == pItem->level);
ASSERT(qItem->fetchLevel == pItem->fetchLevel);
make sure(qItem->level == pItem->level);
make sure(qItem->fetchLevel == pItem->fetchLevel);
#endif
if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
tsem_post(&(pStat->notEmpty));
......@@ -1524,6 +1556,8 @@ _err:
*/
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
int32_t code = 0;
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
......@@ -1532,14 +1566,14 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
bool isFetchAll = false;
if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
terrno = TSDB_CODE_RSMA_INVALID_STAT;
goto _err;
code = TSDB_CODE_RSMA_INVALID_STAT;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (!(pSubmitArr =
taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
while (true) {
......@@ -1570,7 +1604,11 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (oldStat == 0 ||
((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
ASSERT(oldVal >= 0);
if (ASSERTS(oldVal >= 0, "oldVal of nFetchAll: %d < 0", oldVal)) {
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
int8_t curStat = atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat));
if (curStat == 1) {
......@@ -1600,7 +1638,9 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
}
}
} else {
ASSERT(0);
ASSERTS(0, "unknown rsma exec type:%d", (int32_t)type);
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
......@@ -1619,10 +1659,10 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
} // end of while(true)
_end:
taosArrayDestroy(pSubmitArr);
return TSDB_CODE_SUCCESS;
_err:
_exit:
taosArrayDestroy(pSubmitArr);
return TSDB_CODE_FAILED;
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -284,7 +284,9 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) {
}
int32_t tfsRmdir(STfs *pTfs, const char *rname) {
ASSERT(rname[0] != 0);
if (rname[0] == 0) {
return 0;
}
char aname[TMPNAME_LEN] = "\0";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册