提交 9552ac28 编写于 作者: K kailixu

enh: coverity scan for sma

上级 9ee43c27
......@@ -749,6 +749,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3161)
#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3162)
#define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3163)
#define TSDB_CODE_RSMA_RESULT TAOS_DEF_ERROR_CODE(0, 0x3164)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
......@@ -30,6 +30,8 @@ SSmaMgmt smaMgmt = {
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now);
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static void tdUidStoreDestory(STbUidStore *pStore);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
......@@ -44,7 +46,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
// static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
......@@ -64,10 +66,7 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
qDestroyTask(otaskHandle);
} else {
smaDebug("vgId:%d, not free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
}
// TODO: clear files related to qTaskInfo?
}
/**
......@@ -95,16 +94,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (isDeepFree && pInfo->taskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
} else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
pInfo->suid, i + 1);
}
#if 0
if (pInfo->iTaskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
} else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
SMA_VID(pSma), pInfo->suid, i + 1);
}
#endif
}
......@@ -140,11 +133,6 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
}
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
if (ASSERTS(*pStore == NULL, "*pStore:%p != NULL", *pStore)) {
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_FAILED;
}
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
if (*pStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -314,11 +302,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
if (ASSERTS(pItem->level > 0, "pItem level:%" PRIi8 " should > 0", pItem->level)) {
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_FAILED;
}
SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef));
......@@ -380,13 +363,13 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->suid = suid;
T_REF_INIT_VAL(pRSmaInfo, 1);
if (!(pRSmaInfo->queue = taosOpenQueue())) {
goto _err;
}
if (!(pRSmaInfo->qall = taosAllocateQall())) {
if (!(pRSmaInfo->queue = taosOpenQueue()) || !(pRSmaInfo->qall = taosAllocateQall()) ||
tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0 ||
tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) {
goto _err;
}
#if 0
if (!(pRSmaInfo->iQueue = taosOpenQueue())) {
goto _err;
......@@ -395,13 +378,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto _err;
}
#endif
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
goto _err;
}
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) {
goto _err;
}
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
goto _err;
......@@ -577,15 +553,12 @@ void *tdUidStoreFree(STbUidStore *pStore) {
* @return int32_t
*/
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
if (!pReq) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
// spin lock for race condition during insert data
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED;
if (pReq) {
SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
// spin lock for race condition during insert data
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED;
}
}
return TSDB_CODE_SUCCESS;
......@@ -607,7 +580,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
return 0;
}
#if 0
/**
* @brief retention of rsma1/rsma2
*
......@@ -631,48 +603,40 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) {
_end:
return code;
}
#endif
#if 0
static void tdBlockDataDestroy(SArray *pBlockArr) {
for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) {
blockDataDestroy(taosArrayGetP(pBlockArr, i));
}
taosArrayDestroy(pBlockArr);
}
#endif
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid) {
int32_t code = 0;
int32_t lino = 0;
SSDataBlock *output = NULL;
SArray *pResList = taosArrayInit(1, POINTER_BYTES);
if (pResList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
while (1) {
uint64_t ts;
bool hasMore = false;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
if (code < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
break;
} else {
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
pItem->level, terrstr(code));
goto _err;
}
code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
if (code == TSDB_CODE_QRY_IN_EXEC) {
code = 0;
break;
}
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pResList) == 0) {
if (terrno == 0) {
// smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
} else {
smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
goto _err;
}
break;
} else {
smaDebug("vgId:%d, rsma level %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
}
#if 0
char flag[10] = {0};
......@@ -680,28 +644,24 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
blockDebugShowDataBlocks(pResList, flag);
#endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
SSDataBlock *output = taosArrayGetP(pResList, i);
smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.id.uid,
output->info.id.groupId, output->info.rows);
output = taosArrayGetP(pResList, i);
smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", SMA_VID(pSma),
output->info.id.uid, output->info.id.groupId, output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq2 *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8
" failed since %s",
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr());
goto _err;
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8
" failed since %s",
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr());
goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
......@@ -713,15 +673,18 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
}
}
}
taosArrayDestroy(pResList);
qCleanExecTaskBlockBuf(taskInfo);
return TSDB_CODE_SUCCESS;
_err:
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
", ver:%" PRIi64,
SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
output ? output->info.version : -1);
} else {
smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
}
taosArrayDestroy(pResList);
qCleanExecTaskBlockBuf(taskInfo);
return TSDB_CODE_FAILED;
return code;
}
/**
......@@ -910,6 +873,7 @@ _exit:
* @param pInfo
* @return int32_t
*/
#if 0
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
......@@ -961,6 +925,7 @@ _exit:
metaReaderClear(&mr);
return code;
}
#endif
/**
* @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
......@@ -996,12 +961,14 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
#if 0
if (!pRSmaInfo->taskInfo[0]) {
if ((terrno = tdRSmaInfoClone(pSma, pRSmaInfo)) < 0) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
}
#endif
tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) {
......
......@@ -82,6 +82,9 @@ static int32_t vnodeRetentionTask(void *param) {
code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit);
code = smaDoRetention(pInfo->pVnode->pSma, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit);
// commit info
vnodeCommitInfo(dir);
......
......@@ -619,6 +619,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state c
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_REF, "Rsma fs ref error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_SYNC, "Rsma fs sync error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_RESULT, "Rsma result error")
//index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
......@@ -18,8 +18,7 @@ if $rows != 2 then
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang");
sql create table ct_1 using stb1 tags("BeiJing", "ChaoYang");
sql create table ct1 using stb tags("BeiJing", "ChaoYang") ct_1 using stb1 tags("BeiJing", "ChaoYang");
sql show tables
if $rows != 2 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册