From 32be8a71ee0c61f6fd52545fdc747c85ef23cf68 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 17 Aug 2022 20:14:06 +0800 Subject: [PATCH] enh: rsma batch process --- source/dnode/vnode/src/inc/sma.h | 29 ++- source/dnode/vnode/src/sma/smaCommit.c | 91 ++++---- source/dnode/vnode/src/sma/smaEnv.c | 18 +- source/dnode/vnode/src/sma/smaRollup.c | 302 +++++++++++++++---------- 4 files changed, 259 insertions(+), 181 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index c36207e495..26adc8d5e5 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -96,10 +96,10 @@ struct SRSmaStat { int8_t commitStat; // 0 not in committing, 1 in committing int8_t execStat; // 0 not in exec , 1 in exec SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) - SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; + SHashObj *infoHash; // key: suid, value: SRSmaInfo + SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2 }; - struct SSmaStat { union { STSmaStat tsmaStat; // time-range-wise sma @@ -108,13 +108,14 @@ struct SSmaStat { T_REF_DECLARE() }; -#define SMA_STAT_TSMA(s) (&(s)->tsmaStat) -#define SMA_STAT_RSMA(s) (&(s)->rsmaStat) -#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) -#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) -#define RSMA_COMMIT_STAT(r) (&(r)->commitStat) -#define RSMA_REF_ID(r) ((r)->refId) -#define RSMA_FS_LOCK(r) (&(r)->lock) +#define SMA_STAT_TSMA(s) (&(s)->tsmaStat) +#define SMA_STAT_RSMA(s) (&(s)->rsmaStat) +#define RSMA_INFO_HASH(r) ((r)->infoHash) +#define RSMA_FETCH_HASH(r) ((r)->fetchHash) +#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) +#define RSMA_COMMIT_STAT(r) (&(r)->commitStat) +#define RSMA_REF_ID(r) ((r)->refId) +#define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { int8_t level; @@ -142,7 +143,7 @@ struct SRSmaInfo { #define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) #define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) #define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i]) -#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i]) +#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i]) #define RSMA_INFO_ITEM(r, i) (&(r)->items[i]) enum { @@ -167,6 +168,12 @@ enum { RSMA_RESTORE_SYNC = 2, }; +typedef enum { + RSMA_EXEC_OVERFLOW = 1, // triggered by queue buf overflow + RSMA_EXEC_TIMEOUT = 2, // triggered by timer + RSMA_EXEC_COMMIT = 3, // triggered by commit +} ERsmaExecType; + void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); @@ -240,7 +247,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); -int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type); +int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 101fca3346..25777f90ab 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -121,7 +121,7 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); // step 1: set rsma stat paused @@ -333,7 +333,34 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } - // step 3: swap queue/qall and iQueue/iQal + /** + * @brief step 3: consume the SubmitReq in buffer + * 1) This is high cost task and should not put in asyncPreCommit originally. + * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. + */ + nLoops = 0; + smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1); + if (old == 0) break; + if (++nLoops > 1000) { + sched_yield(); + nLoops = 0; + smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + } + } + + smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + + if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) { + atomic_store_8(&pRSmaStat->execStat, 0); + return TSDB_CODE_FAILED; + } + + + // step 4: swap queue/qall and iQueue/iQall // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); @@ -351,11 +378,12 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } atomic_store_64(&pRSmaStat->qBufSize, 0); - + atomic_store_8(&pRSmaStat->execStat, 0); // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - // step 4: others + + // step 5: others pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; return TSDB_CODE_SUCCESS; @@ -375,36 +403,17 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); - // step 1: consume the SubmitReq in buffer - int32_t nLoops = 0; - smaDebug("vgId:%d start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - while (pRSmaStat->execStat == 1) { - taosMsleep(15); - if ((++nLoops & 63) == 0) { - smaWarn("vgId:%d 1s waited for rsma exec stat = 0, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - sched_yield(); - } - } - pRSmaStat->execStat = 1; - smaDebug("vgId:%d end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - - if (tdRSmaProcessExecImpl(pSma, 1) < 0) { - pRSmaStat->execStat = 0; - return TSDB_CODE_FAILED; - } - // step 2: perform persist task for qTaskInfo operator + // perform persist task for qTaskInfo operator if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { - pRSmaStat->execStat = 0; return TSDB_CODE_FAILED; } - pRSmaStat->execStat = 0; return TSDB_CODE_SUCCESS; } /** - * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty. + * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty. * * @param pSma * @return int32_t @@ -424,13 +433,13 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); while (pIter) { - tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { int32_t refVal = T_REF_VAL_GET(pRSmaInfo); if (refVal == 0) { - if(!rsmaDeleted) { - if((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))){ + if (!rsmaDeleted) { + if ((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))) { taosArrayPush(rsmaDeleted, pSuid); } } @@ -461,22 +470,20 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); } - if (taosArrayGetSize(rsmaDeleted) > 0) { - for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) { - tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i); - void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); - if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { - tdFreeRSmaInfo(pSma, pRSmaInfo, true); - smaDebug( - "vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for " - "table:%" PRIi64, - SMA_VID(pSma), *pSuid); - } - taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); + for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) { + tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i); + void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); + if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + tdFreeRSmaInfo(pSma, pRSmaInfo, true); + smaDebug( + "vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for " + "table:%" PRIi64, + SMA_VID(pSma), *pSuid); } - // remove suid in files - taosArrayDestroy(rsmaDeleted); + taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); } + taosArrayDestroy(rsmaDeleted); + // TODO: remove suid in files? // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 73f8060559..f51aad22bd 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -171,7 +171,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { if (!pRSmaInfo) return 0; - + int ref = T_REF_INC(pRSmaInfo); smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); return 0; @@ -228,7 +228,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS RSMA_INFO_HASH(pRSmaStat) = taosHashInit( RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (!RSMA_INFO_HASH(pRSmaStat)) { - taosMemoryFreeClear(*pSmaStat); + return TSDB_CODE_FAILED; + } + + RSMA_FETCH_HASH(pRSmaStat) = taosHashInit( + RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + if (!RSMA_FETCH_HASH(pRSmaStat)) { return TSDB_CODE_FAILED; } } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { @@ -274,7 +279,10 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } taosHashCleanup(RSMA_INFO_HASH(pStat)); - // step 3: wait all triggered fetch tasks finished + // step 3: destroy the rsma fetch hash + taosHashCleanup(RSMA_FETCH_HASH(pStat)); + + // step 4: wait all triggered fetch tasks finished int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { @@ -289,8 +297,8 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { nLoops = 0; } } - - // step 4: free pStat + + // step 5: free pStat taosMemoryFreeClear(pStat); } } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index de4b7dd808..1e2a8b35a4 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -17,7 +17,8 @@ #define RSMA_QTASKINFO_BUFSIZE (32768) #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid -#define RSMA_QTASKEXEC_BUFSIZ (1048576) +#define RSMA_QTASKEXEC_BUFSIZE (1048576) +#define RSMA_SUBMIT_BATCH_SIZE (1024) SSmaMgmt smaMgmt = { .inited = 0, @@ -35,9 +36,11 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUi static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx); static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, - int8_t type, int8_t level); + ERsmaExecType type, int8_t level); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); +static void tdFreeRSmaSubmitItems(SArray *pItems); +static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr); static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); @@ -559,6 +562,14 @@ void *tdUidStoreFree(STbUidStore *pStore) { return NULL; } +/** + * @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue + * would be freed when close Vnode, thus lock should be used if with race condition. + * @param pTsdb + * @param version + * @param pReq + * @return int32_t + */ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { if (!pReq) { terrno = TSDB_CODE_INVALID_PTR; @@ -566,7 +577,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { } SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; - + // TODO: spin lock for race conditiond if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { return TSDB_CODE_FAILED; } @@ -696,7 +707,7 @@ _err: } /** - * @brief Copy msg to rsmaQueueBuffer + * @brief Copy msg to rsmaQueueBuffer for batch process * * @param pSma * @param pMsg @@ -722,17 +733,17 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen); // smoothing consume - int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZ; + int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZE; if (n > 1) { if (n > 10) { n = 10; } taosMsleep(n << 4); if (n > 2) { - smaWarn("vgId:%d pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), + smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); } else { - smaDebug("vgId:%d pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), + smaDebug("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); } } @@ -751,7 +762,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { if (pBlock == NULL) break; tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { - smaDebug("vgId:%d numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64, + smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64, SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts); } } @@ -771,10 +782,10 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { * @return int32_t */ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, - int8_t type, int8_t level) { + ERsmaExecType type, int8_t level) { int32_t idx = level - 1; - void *qTaskInfo = (type == 0) ? RSMA_INFO_QTASK(pInfo, idx) : RSMA_INFO_IQTASK(pInfo, idx); + void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx); if (!qTaskInfo) { smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid); @@ -791,7 +802,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, #if 0 for (int32_t i = 0; i < msgSize; ++i) { SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *)); - smaDebug("vgId:%d [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version); + smaDebug("vgId:%d, [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version); tdRsmaPrintSubmitReq(pSma, pReq); } #endif @@ -802,11 +813,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); - atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); - - if (smaMgmt.tmrHandle) { - taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - } return TSDB_CODE_SUCCESS; } @@ -854,13 +860,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { } taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat - return NULL; - } - - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - return pRSmaInfo; + return NULL; } static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { @@ -890,6 +890,16 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_FAILED; } + if (smaMgmt.tmrHandle) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0); + if (pItem->level > 0) { + atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); + } + pItem = RSMA_INFO_ITEM(pRSmaInfo, 1); + if (pItem->level > 0) { + atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); + } + } } else { ASSERT(0); } @@ -898,51 +908,23 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp return TSDB_CODE_SUCCESS; } -#if 0 -/** - * @brief sync mode - * - * @param pSma - * @param pMsg - * @param inputType - * @param suid - * @return int32_t - */ -static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { - SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); - if (!pRSmaInfo) { - smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); - return TSDB_CODE_SUCCESS; - } - - if (inputType == STREAM_INPUT__DATA_SUBMIT) { - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1); - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2); - } - - tdReleaseRSmaInfo(pSma, pRSmaInfo); - return TSDB_CODE_SUCCESS; -} -#endif - static int32_t tdRSmaExecCheck(SSma *pSma) { - SRSmaStat *pRsmaStat = SMA_RSMA_STAT(pSma); - int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + int64_t bufSize = atomic_load_64(&pRSmaStat->qBufSize); - if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) { - if (bufSize > RSMA_QTASKEXEC_BUFSIZ) { - smaDebug("vgId:%d bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma), - bufSize); - } else { - smaDebug("vgId:%d bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize, - RSMA_QTASKEXEC_BUFSIZ); - } + if (bufSize < RSMA_QTASKEXEC_BUFSIZE) { + smaDebug("vgId:%d, bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize, + RSMA_QTASKEXEC_BUFSIZE); return TSDB_CODE_SUCCESS; } - smaDebug("vgId:%d bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize); + if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 1) { + smaDebug("vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma), + bufSize); + return TSDB_CODE_SUCCESS; + } - pRsmaStat->execStat = 1; + smaDebug("vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize); SRSmaExecMsg fetchMsg; int32_t contLen = sizeof(SMsgHead); @@ -967,7 +949,7 @@ static int32_t tdRSmaExecCheck(SSma *pSma) { return TSDB_CODE_SUCCESS; _err: - pRsmaStat->execStat = 0; + atomic_store_8(&pRSmaStat->execStat, 0); return TSDB_CODE_FAILED; } @@ -1592,7 +1574,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { } _end: - // taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } @@ -1656,13 +1638,11 @@ _err: * @return int32_t */ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { - SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; - SRSmaFetchMsg req = {0}; - SDecoder decoder = {0}; - void *pBuf = NULL; - SRSmaInfo *pInfo = NULL; - SRSmaInfoItem *pItem = NULL; - + SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; + SRSmaFetchMsg req = {0}; + SDecoder decoder = {0}; + void *pBuf = NULL; + SRSmaStat *pRSmaStat = NULL; if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; goto _err; @@ -1676,38 +1656,33 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { goto _err; } -#if 0 - pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid); - if (!pInfo) { - if (terrno == TSDB_CODE_SUCCESS) { - terrno = TSDB_CODE_RSMA_EMPTY_INFO; - } - smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), - req.suid, req.level, terrstr()); - goto _err; - } + pRSmaStat = SMA_RSMA_STAT(pSma); - pItem = RSMA_INFO_ITEM(pInfo, req.level - 1); - - SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; - qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, req.level - 1); - if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { - goto _err; - } - if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { - goto _err; + if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 0) { + SArray *pSubmitArr = NULL; + if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + atomic_store_8(&pRSmaStat->execStat, 0); + goto _err; + } + tdRSmaConsumeAndFetch(pSma, req.suid, req.level, pSubmitArr); + atomic_store_8(&pRSmaStat->execStat, 0); + taosArrayDestroy(pSubmitArr); + } else { + int8_t level = req.level; + int8_t *val = taosHashGet(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid)); + if (val) { + level |= (*val); + } + ASSERT(level >= 1 && level <= 3); + taosHashPut(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid), &level, sizeof(level)); } - tdCleanupStreamInputDataBlock(taskInfo); - - tdReleaseRSmaInfo(pSma, pInfo); -#endif tDecoderClear(&decoder); smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid, req.level); return TSDB_CODE_SUCCESS; _err: - // tdReleaseRSmaInfo(pSma, pInfo); tDecoderClear(&decoder); smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; @@ -1719,28 +1694,101 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) { } } +static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr) { + SRSmaInfo *pInfo = tdAcquireRSmaInfoBySuid(pSma, suid); + if (!pInfo) { + return TSDB_CODE_SUCCESS; + } + + // step 1: consume submit req + int64_t qMemSize = 0; + if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) { + taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock + + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + atomic_fetch_sub_64(&pRSmaStat->qBufSize, qMemSize); + + taosArrayClear(pSubmitArr); + + while (1) { + void *msg = NULL; + taosGetQitem(pInfo->qall, (void **)&msg); + if (msg) { + if (taosArrayPush(pSubmitArr, &msg) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } else { + break; + } + } + + int32_t size = taosArrayGetSize(pSubmitArr); + if (size > 0) { + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, RSMA_EXEC_TIMEOUT, i) < + 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } + + tdFreeRSmaSubmitItems(pSubmitArr); + } + } + + // step 2: fetch rsma result + SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; + for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (level & i) { + qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1); + if (!taskInfo) { + continue; + } + if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { + goto _err; + } + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); + if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, suid) < 0) { + tdCleanupStreamInputDataBlock(taskInfo); + goto _err; + } + + tdCleanupStreamInputDataBlock(taskInfo); + } + } + +_end: + tdReleaseRSmaInfo(pSma, pInfo); + return TSDB_CODE_SUCCESS; +_err: + tdReleaseRSmaInfo(pSma, pInfo); + return TSDB_CODE_FAILED; +} + /** * @brief * * @param pSma - * @param type 0 triggered when buffer overflow, 1 triggered by commit + * @param type * @return int32_t */ -int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) { +int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; SArray *pSubmitQArr = NULL; SArray *pSubmitArr = NULL; + bool isFetchAll = false; if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) { terrno = TSDB_CODE_RSMA_INVALID_STAT; goto _err; } - if (type == 0) { + if (type == RSMA_EXEC_OVERFLOW) { taosRLockLatch(SMA_ENV_LOCK(pEnv)); - if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZ) { + if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZE) { taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return TSDB_CODE_SUCCESS; } @@ -1752,19 +1800,19 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) { goto _err; } - if (!(pSubmitArr = taosArrayInit(1024, POINTER_BYTES))) { + if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + // step 1: rsma exec - consume data in buffer queue for all suids SRSmaExecQItem qItem = {0}; - taosWLockLatch(SMA_ENV_LOCK(pEnv)); - void *pIter = taosHashIterate(infoHash, NULL); - if (type == 0) { + void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock + if (type == RSMA_EXEC_OVERFLOW) { while (pIter) { SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; if (taosQueueItemSize(pInfo->queue)) { - taosReadAllQitems(pInfo->queue, pInfo->qall); + taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock qItem.qall = &pInfo->qall; qItem.pRSmaInfo = pIter; taosArrayPush(pSubmitQArr, &qItem); @@ -1772,7 +1820,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) { ASSERT(taosQueueItemSize(pInfo->queue) == 0); pIter = taosHashIterate(infoHash, pIter); } - } else if (type == 1) { + } else if (type == RSMA_EXEC_COMMIT) { while (pIter) { SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; if (taosQueueItemSize(pInfo->iQueue)) { @@ -1788,7 +1836,6 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) { ASSERT(0); } atomic_store_64(&pRSmaStat->qBufSize, 0); - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); int32_t qSize = taosArrayGetSize(pSubmitQArr); for (int32_t i = 0; i < qSize; ++i) { @@ -1808,22 +1855,31 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) { int32_t size = taosArrayGetSize(pSubmitArr); if (size > 0) { - if (type == 0 || type == 1) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } + SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; } - } else { - ASSERT(0); } tdFreeRSmaSubmitItems(pSubmitArr); taosArrayClear(pSubmitArr); } } + // step 2: rsma fetch - consume data in buffer queue for suids triggered by timer + if (taosHashGetSize(RSMA_FETCH_HASH(pRSmaStat)) <= 0) { + goto _end; + } + pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), NULL); + if (pIter) { + tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); + while ((pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), pIter))) { + tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); + } + } + +_end: taosArrayDestroy(pSubmitArr); taosArrayDestroy(pSubmitQArr); return TSDB_CODE_SUCCESS; @@ -1842,23 +1898,23 @@ _err: */ int32_t smaProcessExec(SSma *pSma, void *pMsg) { SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; - SRSmaStat *pRsmaStat = SMA_RSMA_STAT(pSma); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; goto _err; } - smaDebug("vgId:%d, begin to process rsma exec msg by thread:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if (tdRSmaProcessExecImpl(pSma, 0) < 0) { + smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) { goto _err; } - pRsmaStat->execStat = 0; - smaDebug("vgId:%d, success to process rsma exec msg by thead:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + atomic_store_8(&pRSmaStat->execStat, 0); + smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); return TSDB_CODE_SUCCESS; _err: - pRsmaStat->execStat = 0; - smaError("vgId:%d, failed to process rsma fetch msg by thread:%p since %s", SMA_VID(pSma), - (void *)taosGetSelfPthreadId(), terrstr()); + atomic_store_8(&pRSmaStat->execStat, 0); + smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(), + terrstr()); return TSDB_CODE_FAILED; } -- GitLab