From 4cc44ee79153d8e31d9b8f96004135d0bc787ecb Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 11 Sep 2022 19:34:10 +0800 Subject: [PATCH] enh: sma optimization for query and reboot --- source/dnode/vnode/src/inc/sma.h | 30 +- source/dnode/vnode/src/sma/smaCommit.c | 50 ++-- source/dnode/vnode/src/sma/smaEnv.c | 35 +-- source/dnode/vnode/src/sma/smaRollup.c | 38 ++- source/dnode/vnode/src/sma/smaTimeRange.c | 3 - source/dnode/vnode/src/tsdb/tsdbRead.c | 342 ++++------------------ 6 files changed, 129 insertions(+), 369 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 165f5da4b6..a8883babce 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -113,8 +113,9 @@ struct SRSmaStat { volatile int64_t nBufItems; // number of items in queue buffer SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) volatile int32_t nFetchAll; // active number of fetch all - int8_t triggerStat; // shared by fetch tasks - int8_t commitStat; // 0 not in committing, 1 in committing + volatile int8_t triggerStat; // shared by fetch tasks + volatile int8_t commitStat; // 0 not in committing, 1 in committing + volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo SRSmaFS fs; // for recovery/snapshot r/w SHashObj *infoHash; // key: suid, value: SRSmaInfo tsem_t notEmpty; // has items in queue buffer @@ -196,16 +197,21 @@ typedef enum { int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); -int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat); -int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdLockSma(SSma *pSma); int32_t tdUnLockSma(SSma *pSma); void *tdAcquireSmaRef(int32_t rsetId, int64_t refId); int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId); +static FORCE_INLINE void tdRefSmaStat(SSma *pSma, SSmaStat *pStat) { + int32_t ref = T_REF_INC(pStat); + smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); +} +static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { + int32_t ref = T_REF_DEC(pStat); + smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); +} + // rsma -int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); -int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaFSOpen(SSma *pSma, int64_t version); void tdRSmaFSClose(SRSmaFS *fs); @@ -218,9 +224,17 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); +void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); +void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); -void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); -void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); +static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { + int32_t ref = T_REF_INC(pRSmaInfo); + smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); +} +static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { + int32_t ref = T_REF_DEC(pRSmaInfo); + smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); +} // smaFileUtil ================ diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index f08cefdb04..d0e9b623bb 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -314,12 +314,12 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { return TSDB_CODE_FAILED; } - smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); #if 0 // consuming task of qTaskInfo clone // step 4: swap queue/qall and iQueue/iQall // lock - // taosWLockLatch(SMA_ENV_LOCK(pEnv)); + taosWLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(RSMA_INFO_HASH(pRSmaStat)); @@ -335,7 +335,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } // unlock - // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); #endif return TSDB_CODE_SUCCESS; @@ -380,25 +380,26 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { // step 1: merge qTaskInfo and iQTaskInfo // lock - // taosWLockLatch(SMA_ENV_LOCK(pEnv)); - - void *pIter = NULL; - while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) { - tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - int32_t refVal = T_REF_VAL_GET(pRSmaInfo); - if (refVal == 0) { - taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid)); - } else { - smaDebug( - "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " - "table:%" PRIi64, - SMA_VID(pSma), refVal, *pSuid); + if (1 == atomic_val_compare_exchange_8(&pRSmaStat->delFlag, 1, 0)) { + taosWLockLatch(SMA_ENV_LOCK(pEnv)); + + void *pIter = NULL; + while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) { + tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + int32_t refVal = T_REF_VAL_GET(pRSmaInfo); + if (refVal == 0) { + taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid)); + } else { + smaDebug( + "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " + "table:%" PRIi64, + SMA_VID(pSma), refVal, *pSuid); + } + + continue; } - - continue; - } #if 0 if (pRSmaInfo->taskInfo[0]) { if (pRSmaInfo->iTaskInfo[0]) { @@ -413,10 +414,11 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); #endif - } + } - // unlock - // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + // unlock + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + } tdUpdateQTaskInfoFiles(pSma, pRSmaStat); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 2595a629ba..b870ea1b62 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -177,39 +177,6 @@ void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) { return NULL; } -int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) { - if (!pStat) return 0; - - int ref = T_REF_INC(pStat); - smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); - return 0; -} - -int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { - if (!pStat) return 0; - - int ref = T_REF_DEC(pStat); - smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); - return 0; -} - -int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { - if (!pRSmaInfo) return 0; - - int ref = T_REF_INC(pRSmaInfo); - smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); - return 0; -} - -int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { - if (!pRSmaInfo) return 0; - - int ref = T_REF_DEC(pRSmaInfo); - smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); - - return 0; -} - static void tRSmaInfoHashFreeNode(void *data) { SRSmaInfo *pRSmaInfo = NULL; SRSmaInfoItem *pItem = NULL; @@ -492,6 +459,8 @@ static int32_t tdRsmaStopExecutor(const SSma *pSma) { taosThreadJoin(pthread[i], NULL); } } + + smaInfo("vgId:%d, rsma executor stopped, number:%d", SMA_VID(pSma), tsNumOfVnodeRsmaThreads); } return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 920d04bf6c..d5260b8374 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -302,7 +302,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat return TSDB_CODE_FAILED; } SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); - pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE; + pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { int64_t msInterval = convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); @@ -320,7 +320,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); - taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + pItem->fetchLevel = pItem->level; + taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, @@ -470,6 +472,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { } // set del flag for data in mem + atomic_store_8(&pRSmaStat->delFlag, 1); RSMA_INFO_SET_DEL(pRSmaInfo); tdUnRefRSmaInfo(pSma, pRSmaInfo); @@ -939,25 +942,25 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - // taosRLockLatch(SMA_ENV_LOCK(pEnv)); + taosRLockLatch(SMA_ENV_LOCK(pEnv)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } if (!pRSmaInfo->taskInfo[0]) { if (tdRSmaInfoClone(pSma, pRSmaInfo) < 0) { - // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } } tdRefRSmaInfo(pSma, pRSmaInfo); - // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(pRSmaInfo->suid == suid); return pRSmaInfo; } - // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } @@ -1734,7 +1737,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { } tdCleanupStreamInputDataBlock(taskInfo); - smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished", + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished", SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); } else { smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 @@ -1829,6 +1832,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { bool occupied = (batchMax <= 1); if (batchMax > 1) { batchMax = 100 / batchMax; + batchMax = MAX(batchMax, 4); } while (occupied || (++batchCnt < batchMax)) { // greedy mode taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock @@ -1838,13 +1842,15 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); } - int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2); - if (oldStat == 0 || - ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { - atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); - tdRSmaFetchAllResult(pSma, pInfo); - if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { - atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); + if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2); + if (oldStat == 0 || + ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { + atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); + tdRSmaFetchAllResult(pSma, pInfo); + if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { + atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); + } } } @@ -1917,7 +1923,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { tsem_wait(&pRSmaStat->notEmpty); if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { - smaInfo("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag, + smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag, atomic_load_64(&pRSmaStat->nBufItems)); break; } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 173391b769..e2cb51f586 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -178,7 +178,6 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char return TSDB_CODE_FAILED; } - tdRefSmaStat(pSma, pStat); pTsmaStat = SMA_STAT_TSMA(pStat); if (!pTsmaStat->pTSma) { @@ -230,9 +229,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char goto _err; } - tdUnRefSmaStat(pSma, pStat); return TSDB_CODE_SUCCESS; _err: - tdUnRefSmaStat(pSma, pStat); return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 41c06bc500..58f9004842 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -192,9 +192,6 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader); static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static int32_t doBuildDataBlock(STsdbReader* pReader); -static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); - -static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -685,153 +682,12 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } -int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) { - int32_t midPos = -1; - int32_t numOfRows; - - ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - - TSKEY* keyList = (TSKEY*)pValue; - int32_t firstPos = 0; - int32_t lastPos = num - 1; - - if (order == TSDB_ORDER_DESC) { - // find the first position which is smaller than the key - while (1) { - if (key >= keyList[firstPos]) return firstPos; - if (key == keyList[lastPos]) return lastPos; - - if (key < keyList[lastPos]) { - lastPos += 1; - if (lastPos >= num) { - return -1; - } else { - return lastPos; - } - } - - numOfRows = lastPos - firstPos + 1; - midPos = (numOfRows >> 1) + firstPos; - - if (key < keyList[midPos]) { - firstPos = midPos + 1; - } else if (key > keyList[midPos]) { - lastPos = midPos - 1; - } else { - break; - } - } - - } else { - // find the first position which is bigger than the key - while (1) { - if (key <= keyList[firstPos]) return firstPos; - if (key == keyList[lastPos]) return lastPos; - - if (key > keyList[lastPos]) { - lastPos = lastPos + 1; - if (lastPos >= num) - return -1; - else - return lastPos; - } - - numOfRows = lastPos - firstPos + 1; - midPos = (numOfRows >> 1u) + firstPos; - - if (key < keyList[midPos]) { - lastPos = midPos - 1; - } else if (key > keyList[midPos]) { - firstPos = midPos + 1; - } else { - break; - } - } - } - - return midPos; -} - -static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) { - // start end position - int s, e; - s = pos; - - // check - assert(pos >=0 && pos < num); - assert(num > 0); - - if (order == TSDB_ORDER_ASC) { - // find the first position which is smaller than the key - e = num - 1; - if (key < keyList[pos]) - return -1; - while (1) { - // check can return - if (key >= keyList[e]) - return e; - if (key <= keyList[s]) - return s; - if (e - s <= 1) - return s; - - // change start or end position - int mid = s + (e - s + 1)/2; - if (keyList[mid] > key) - e = mid; - else if(keyList[mid] < key) - s = mid; - else - return mid; - } - } else { // DESC - // find the first position which is bigger than the key - e = 0; - if (key > keyList[pos]) - return -1; - while (1) { - // check can return - if (key <= keyList[e]) - return e; - if (key >= keyList[s]) - return s; - if (s - e <= 1) - return s; - - // change start or end position - int mid = s - (s - e + 1)/2; - if (keyList[mid] < key) - e = mid; - else if(keyList[mid] > key) - s = mid; - else - return mid; - } - } -} - -int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) { - // NOTE: reverse the order to find the end position in data block - int32_t endPos = -1; - bool asc = ASCENDING_TRAVERSE(pReader->order); - - if (asc && pReader->window.ekey >= pBlock->maxKey.ts) { - endPos = pBlock->nRow - 1; - } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) { - endPos = 0; - } else { - endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.ekey, pReader->order); - } - - return endPos; -} - static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; SBlockData* pBlockData = &pStatus->fileBlockData; - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SDataBlk* pBlock = getCurrentBlock(pBlockIter); SSDataBlock* pResBlock = pReader->pResBlock; int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); @@ -844,42 +700,23 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn bool asc = ASCENDING_TRAVERSE(pReader->order); int32_t step = asc ? 1 : -1; - if (asc && pReader->window.skey <= pBlock->minKey.ts) { - pDumpInfo->rowIndex = 0; - } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) { - pDumpInfo->rowIndex = pBlock->nRow - 1; - } else { - int32_t pos = asc? pBlock->nRow-1:0; - int32_t order = (pReader->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; - pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order); - } - - // time window check - int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex); - if (endIndex == -1) { - setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order); - return TSDB_CODE_SUCCESS; - } + int32_t rowIndex = 0; + int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1); - endIndex += step; - int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex); - if (remain > pReader->capacity) { // output buffer check + int32_t endIndex = 0; + if (remain <= pReader->capacity) { + endIndex = pBlockData->nRow; + } else { + endIndex = pDumpInfo->rowIndex + step * pReader->capacity; remain = pReader->capacity; } - int32_t rowIndex = 0; - - int32_t i = 0; + int32_t i = 0; SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - if (asc) { - memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t)); - } else { - for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) { - colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]); - } + for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) { + colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false); } - i += 1; } @@ -893,32 +730,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn if (pData->cid < pColData->info.colId) { colIndex += 1; } else if (pData->cid == pColData->info.colId) { - if (pData->flag == HAS_NONE || pData->flag == HAS_NULL) { - colDataAppendNNULL(pColData, 0, remain); - } else { - if (IS_NUMERIC_TYPE(pColData->info.type) && asc) { - uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; - memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes); - - // null value exists, check one-by-one - if (pData->flag != HAS_VALUE) { - for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) { - uint8_t v = GET_BIT2(pData->pBitMap, j); - if (v == 0 || v == 1) { - colDataSetNull_f(pColData->nullbitmap, rowIndex); - } - } - } - } else { - for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) { - tColDataGetValue(pData, j, &cv); - doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); - } - } + for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) { + tColDataGetValue(pData, j, &cv); + doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); } - colIndex += 1; i += 1; + ASSERT(rowIndex == remain); } else { // the specified column does not exist in file block, fill with null data colDataAppendNNULL(pColData, 0, remain); i += 1; @@ -934,13 +752,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn pResBlock->info.rows = remain; pDumpInfo->rowIndex += step * remain; - if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) { - int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - setBlockAllDumped(pDumpInfo, ts, pReader->order); - } else { - int64_t k = asc? pBlock->maxKey.ts:pBlock->minKey.ts; - setBlockAllDumped(pDumpInfo, k, pReader->order); - } + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; pReader->cost.blockLoadTime += elapsedTime; @@ -948,7 +760,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", - pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows, + pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; @@ -1336,31 +1148,26 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDa } } -typedef struct { - bool overlapWithNeighborBlock; - bool hasDupTs; - bool overlapWithDelInfo; - bool overlapWithLastBlock; - bool overlapWithKeyInBuf; - bool partiallyRequired; - bool moreThanCapcity; -} SDataBlockToLoadInfo; - -static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock, - STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader, - STsdbReader* pReader) { +// 1. the version of all rows should be less than the endVersion +// 2. current block should not overlap with next neighbor block +// 3. current timestamp should not be overlap with each other +// 4. output buffer should be large enough to hold all rows in current block +// 5. delete info should not overlap with current block data +static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SDataBlk* pBlock, + STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) { int32_t neighborIndex = 0; - SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order); + SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); // overlap with neighbor + bool overlapWithNeighbor = false; if (pNeighbor) { - pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order); + overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order); taosMemoryFree(pNeighbor); } // has duplicated ts of different version in this block - pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true; - pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order); + bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true; + bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order); // todo here we need to each key in the last files to identify if it is really overlapped with last block // todo @@ -1372,48 +1179,25 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* } #endif - pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity; - pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock); - pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange); -} - -// 1. the version of all rows should be less than the endVersion -// 2. current block should not overlap with next neighbor block -// 3. current timestamp should not be overlap with each other -// 4. output buffer should be large enough to hold all rows in current block -// 5. delete info should not overlap with current block data -// 6. current block should not contain the duplicated ts -static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock, - STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { - SDataBlockToLoadInfo info = {0}; - getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader); + bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity; + bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock); + bool overlapWithKey = keyOverlapFileBlock(key, pBlock, &pReader->verRange); - bool loadDataBlock = - (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf || - info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock); + bool loadDataBlock = (overlapWithNeighbor || hasDup || partiallyRequired || overlapWithKey || + moreThanOutputCapacity || overlapWithDel || overlapWithlastBlock); // log the reason why load the datablock for profile if (loadDataBlock) { tsdbDebug("%p uid:%" PRIu64 " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, " "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s", - pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired, - info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock, - pReader->idStr); + pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey, + moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr); } return loadDataBlock; } -static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock, - STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { - SDataBlockToLoadInfo info = {0}; - getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader); - bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf || - info.overlapWithDelInfo || info.overlapWithLastBlock); - return isCleanFileBlock; -} - static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) { return TSDB_CODE_SUCCESS; @@ -2134,6 +1918,8 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum return true; } +static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } + static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { // the last block reader has been initialized for this table. if (pLBlockReader->uid == pScanInfo->uid) { @@ -2175,11 +1961,12 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLas int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { return TSDB_CODE_SUCCESS; } else { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - STSRow* pTSRow = NULL; SRowMerger merge = {0}; @@ -2202,19 +1989,13 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; + int64_t key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; + TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); + TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); + if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); } else { - TSDBROW *pRow = NULL, *piRow = NULL; - if (pBlockScanInfo->iter.hasVal) { - pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); - } - - if (pBlockScanInfo->iiter.hasVal) { - piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); - } - // imem + file + last block if (pBlockScanInfo->iiter.hasVal) { return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader); @@ -2234,29 +2015,20 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->pResBlock; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); - SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - - int64_t st = taosGetTimestampUs(); STableBlockScanInfo* pBlockScanInfo = NULL; if (pBlockInfo != NULL) { pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); - SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); - TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); - - // it is a clean block, load it directly - if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) { - copyBlockDataToSDataBlock(pReader, pBlockScanInfo); - goto _end; - } - } else { // file blocks not exist + } else { pBlockScanInfo = pReader->status.pTableIter; } + SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + int64_t st = taosGetTimestampUs(); while (1) { // todo check the validate of row in file block @@ -2299,7 +2071,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } } - _end: pResBlock->info.uid = pBlockScanInfo->uid; blockDataUpdateTsWindow(pResBlock, 0); @@ -2403,7 +2174,7 @@ _err: return code; } -TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { +static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}; TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader); if (pRow != NULL) { @@ -2599,12 +2370,12 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } initLastBlockReader(pLastBlockReader, pScanInfo, pReader); - TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); + TSDBKEY key = getCurrentKeyInBuf(pScanInfo, pReader); if (pBlockInfo == NULL) { // build data block from last data file ASSERT(pBlockIter->numOfBlocks == 0); code = buildComposedDataBlock(pReader); - } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { + } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) { tBlockDataReset(&pStatus->fileBlockData); code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { @@ -2618,7 +2389,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // build composed data block code = buildComposedDataBlock(pReader); - } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) { + } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { // data in memory that are earlier than current file block // todo rows in buffer should be less than the file block in asc, greater than file block in desc int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts; @@ -2815,6 +2586,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret if (VND_IS_RSMA(pVnode)) { int8_t level = 0; int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision); + int64_t offset = TSDB_TICK_PER_SECOND(pVnode->config.tsdbCfg.precision); for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) { SRetention* pRetention = retentions + level; @@ -2824,7 +2596,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret } break; } - if ((now - pRetention->keep) <= winSKey) { + if ((now - pRetention->keep) <= (winSKey + offset)) { break; } ++level; @@ -3349,11 +3121,11 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S } SColVal cv = {0}; - int32_t numOfInputCols = pBlockData->aIdx->size; - int32_t numOfOutputCols = pResBlock->pDataBlock->size; + int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx); + int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); while (i < numOfOutputCols && j < numOfInputCols) { - SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i); + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); if (pData->cid < pCol->info.colId) { -- GitLab