提交 57437312 编写于 作者: C Cary Xu

other: rsma code optimization

上级 54562e24
...@@ -148,6 +148,7 @@ struct SRSmaInfoItem { ...@@ -148,6 +148,7 @@ struct SRSmaInfoItem {
}; };
struct SRSmaInfo { struct SRSmaInfo {
SSma *pSma;
STSchema *pTSchema; STSchema *pTSchema;
int64_t suid; int64_t suid;
int64_t lastRecv; // ms int64_t lastRecv; // ms
......
...@@ -364,7 +364,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { ...@@ -364,7 +364,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
} }
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaInfoItem *pItem = NULL;
// step 1: merge qTaskInfo and iQTaskInfo // step 1: merge qTaskInfo and iQTaskInfo
// lock // lock
......
...@@ -220,7 +220,7 @@ static void tRSmaInfoHashFreeNode(void *data) { ...@@ -220,7 +220,7 @@ static void tRSmaInfoHashFreeNode(void *data) {
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) { if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES); taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
} }
tdFreeRSmaInfo(NULL, pRSmaInfo, true); tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo, true);
} }
} }
......
...@@ -121,27 +121,27 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l ...@@ -121,27 +121,27 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l
*/ */
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (pInfo) { if (pInfo) {
int32_t vid = pSma ? SMA_VID(pSma) : -1;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i]; SRSmaInfoItem *pItem = &pInfo->items[i];
if (isDeepFree && pItem->tmrId) { if (isDeepFree && pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", vid, pItem->tmrId, pInfo->suid, i + 1); smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
pInfo->suid, i + 1);
taosTmrStopA(&pItem->tmrId); taosTmrStopA(&pItem->tmrId);
} }
if (isDeepFree && pInfo->taskInfo[i]) { if (isDeepFree && pInfo->taskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], vid, i + 1); tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
} else { } else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", vid, smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
pInfo->suid, i + 1); pInfo->suid, i + 1);
} }
if (pInfo->iTaskInfo[i]) { if (pInfo->iTaskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], vid, i + 1); tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
} else { } else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", vid, smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
pInfo->suid, i + 1); SMA_VID(pSma), pInfo->suid, i + 1);
} }
} }
if (isDeepFree) { if (isDeepFree) {
...@@ -377,7 +377,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -377,7 +377,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _err; goto _err;
} }
pRSmaInfo->pSma = pSma;
pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->suid = suid;
T_REF_INIT_VAL(pRSmaInfo, 1);
if (!(pRSmaInfo->queue = taosOpenQueue())) { if (!(pRSmaInfo->queue = taosOpenQueue())) {
goto _err; goto _err;
} }
...@@ -391,8 +394,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -391,8 +394,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
if (!(pRSmaInfo->iQall = taosAllocateQall())) { if (!(pRSmaInfo->iQall = taosAllocateQall())) {
goto _err; goto _err;
} }
pRSmaInfo->suid = suid;
T_REF_INIT_VAL(pRSmaInfo, 1);
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) { if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
goto _err; goto _err;
...@@ -1586,8 +1587,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1586,8 +1587,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) { if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)",
smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES); taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
return; return;
} }
...@@ -1636,7 +1637,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1636,7 +1637,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
switch (fetchTriggerStat) { switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: { case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
// async process // async process
pItem->fetchLevel = pItem->level; pItem->fetchLevel = pItem->level;
#if 0 #if 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册