提交 4cc44ee7 编写于 作者: C Cary Xu

enh: sma optimization for query and reboot

上级 5c29d146
......@@ -113,8 +113,9 @@ struct SRSmaStat {
volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int32_t nFetchAll; // active number of fetch all
int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
volatile int8_t triggerStat; // shared by fetch tasks
volatile int8_t commitStat; // 0 not in committing, 1 in committing
volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo
SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer
......@@ -196,16 +197,21 @@ typedef enum {
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdLockSma(SSma *pSma);
int32_t tdUnLockSma(SSma *pSma);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
static FORCE_INLINE void tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
int32_t ref = T_REF_INC(pStat);
smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
}
static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
int32_t ref = T_REF_DEC(pStat);
smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
}
// rsma
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version);
void tdRSmaFSClose(SRSmaFS *fs);
......@@ -218,9 +224,17 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_INC(pRSmaInfo);
smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
}
static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_DEC(pRSmaInfo);
smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
}
// smaFileUtil ================
......
......@@ -314,12 +314,12 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall
// lock
// taosWLockLatch(SMA_ENV_LOCK(pEnv));
taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat));
......@@ -335,7 +335,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
}
// unlock
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif
return TSDB_CODE_SUCCESS;
......@@ -380,25 +380,26 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
// step 1: merge qTaskInfo and iQTaskInfo
// lock
// taosWLockLatch(SMA_ENV_LOCK(pEnv));
void *pIter = NULL;
while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) {
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
} else {
smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
"table:%" PRIi64,
SMA_VID(pSma), refVal, *pSuid);
if (1 == atomic_val_compare_exchange_8(&pRSmaStat->delFlag, 1, 0)) {
taosWLockLatch(SMA_ENV_LOCK(pEnv));
void *pIter = NULL;
while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) {
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
} else {
smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
"table:%" PRIi64,
SMA_VID(pSma), refVal, *pSuid);
}
continue;
}
continue;
}
#if 0
if (pRSmaInfo->taskInfo[0]) {
if (pRSmaInfo->iTaskInfo[0]) {
......@@ -413,10 +414,11 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
#endif
}
}
// unlock
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
}
tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
......
......@@ -177,39 +177,6 @@ void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
return NULL;
}
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
if (!pStat) return 0;
int ref = T_REF_INC(pStat);
smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
return 0;
}
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
if (!pStat) return 0;
int ref = T_REF_DEC(pStat);
smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
return 0;
}
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
if (!pRSmaInfo) return 0;
int ref = T_REF_INC(pRSmaInfo);
smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
return 0;
}
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
if (!pRSmaInfo) return 0;
int ref = T_REF_DEC(pRSmaInfo);
smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
return 0;
}
static void tRSmaInfoHashFreeNode(void *data) {
SRSmaInfo *pRSmaInfo = NULL;
SRSmaInfoItem *pItem = NULL;
......@@ -492,6 +459,8 @@ static int32_t tdRsmaStopExecutor(const SSma *pSma) {
taosThreadJoin(pthread[i], NULL);
}
}
smaInfo("vgId:%d, rsma executor stopped, number:%d", SMA_VID(pSma), tsNumOfVnodeRsmaThreads);
}
return 0;
}
\ No newline at end of file
......@@ -302,7 +302,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
return TSDB_CODE_FAILED;
}
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE;
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
......@@ -320,7 +320,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef));
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
pItem->fetchLevel = pItem->level;
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
", finally maxdelay:%" PRIi32,
......@@ -470,6 +472,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
}
// set del flag for data in mem
atomic_store_8(&pRSmaStat->delFlag, 1);
RSMA_INFO_SET_DEL(pRSmaInfo);
tdUnRefRSmaInfo(pSma, pRSmaInfo);
......@@ -939,25 +942,25 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return NULL;
}
// taosRLockLatch(SMA_ENV_LOCK(pEnv));
taosRLockLatch(SMA_ENV_LOCK(pEnv));
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
if (!pRSmaInfo->taskInfo[0]) {
if (tdRSmaInfoClone(pSma, pRSmaInfo) < 0) {
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
}
tdRefRSmaInfo(pSma, pRSmaInfo);
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(pRSmaInfo->suid == suid);
return pRSmaInfo;
}
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
......@@ -1734,7 +1737,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
}
tdCleanupStreamInputDataBlock(taskInfo);
smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished",
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished",
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
} else {
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8
......@@ -1829,6 +1832,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
bool occupied = (batchMax <= 1);
if (batchMax > 1) {
batchMax = 100 / batchMax;
batchMax = MAX(batchMax, 4);
}
while (occupied || (++batchCnt < batchMax)) { // greedy mode
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
......@@ -1838,13 +1842,15 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type);
}
int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
if (oldStat == 0 ||
((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
tdRSmaFetchAllResult(pSma, pInfo);
if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
if (oldStat == 0 ||
((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
tdRSmaFetchAllResult(pSma, pInfo);
if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
}
}
}
......@@ -1917,7 +1923,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
tsem_wait(&pRSmaStat->notEmpty);
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
smaInfo("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
atomic_load_64(&pRSmaStat->nBufItems));
break;
}
......
......@@ -178,7 +178,6 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
return TSDB_CODE_FAILED;
}
tdRefSmaStat(pSma, pStat);
pTsmaStat = SMA_STAT_TSMA(pStat);
if (!pTsmaStat->pTSma) {
......@@ -230,9 +229,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
goto _err;
}
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_SUCCESS;
_err:
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册