diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d9d3c7e2977f7b88b759f9d4cc7bc216b91e06fc..644f423cb1961f5422ec695cbc83f76128095f4a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1136,6 +1136,7 @@ typedef struct { int64_t numOfInsertSuccessReqs; int64_t numOfBatchInsertReqs; int64_t numOfBatchInsertSuccessReqs; + int32_t numOfCacheTables; } SVnodeLoad; typedef struct { diff --git a/include/util/tlrucache.h b/include/util/tlrucache.h index 19009342484bce74c5173e7bf61ab52d6ddae767..c9cf71c2fd99398bd3f5e55bd08127f4278ed1f4 100644 --- a/include/util/tlrucache.h +++ b/include/util/tlrucache.h @@ -55,6 +55,8 @@ void *taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle); size_t taosLRUCacheGetUsage(SLRUCache *cache); size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache); +int32_t taosLRUCacheGetElems(SLRUCache *cache); + void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity); size_t taosLRUCacheGetCapacity(SLRUCache *cache); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 1d14829891c602587f90f7aa38d1eeefb4b7b875..906d16ce77f925a1c4025cd15a593c2c57592adb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -198,9 +198,10 @@ int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32 void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); -void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); -size_t tsdbCacheGetCapacity(SVnode *pVnode); -size_t tsdbCacheGetUsage(SVnode *pVnode); +void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); +size_t tsdbCacheGetCapacity(SVnode *pVnode); +size_t tsdbCacheGetUsage(SVnode *pVnode); +int32_t tsdbCacheGetElems(SVnode *pVnode); // tq typedef struct SMetaTableInfo { @@ -264,7 +265,7 @@ int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); -int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char* id); +int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret); int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b2e1e8ab34f627e17d431e4b0624bfa3166fdd73..2dec38240e95852695a1f4b524991c08755b03eb 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -748,7 +748,7 @@ struct SDiskDataBuilder { int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr); + bool destroyLoadInfo, const char *idStr, bool strictTimeRange); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index bfac2c2549325e90a94d3922b07c33d8e342c42b..e68b3589fc3e6d356bb995b2630adcff9afb5ec9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -632,8 +632,8 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa } tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, - &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL); + &(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX}, + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true); state->pMergeTree = &state->mergeTree; bool hasVal = tMergeTreeNext(&state->mergeTree); if (!hasVal) { @@ -1727,6 +1727,15 @@ size_t tsdbCacheGetUsage(SVnode *pVnode) { return usage; } +int32_t tsdbCacheGetElems(SVnode *pVnode) { + int32_t elems = 0; + if (pVnode->pTsdb != NULL) { + elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache); + } + + return elems; +} + static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) { struct { int32_t fid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index d9259507032894876ca3dad2e04c31da49054d09..1d78622a61a73a8514f697a36bb43bd2024f008d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -408,6 +408,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 if (hasNotNullRow) { pr->lastTs = minTs; + tsdbInfo("%p have cache read %d tables", pr, i + 1); } } @@ -418,6 +419,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr); } + tsdbInfo("have cached %d tables", taosLRUCacheGetElems(lruCache)); } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { STableKeyInfo* pKeyInfo = &pr->pTableList[i]; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index d9d60442ffbfd36dfa9b132b4d5f9c714558de7a..af4eb9626e05ddabc500fe9b410b8cd8d4a5cc3a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -31,7 +31,8 @@ struct SLDataIter { SSttBlockLoadInfo *pBlockLoadInfo; }; -SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfSttTrigger) { +SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, + int32_t numOfSttTrigger) { SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo)); if (pLoadInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -162,7 +163,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; - tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow, idStr); + tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow, + idStr); return &pInfo->blockData[pInfo->currentLoadBlockIndex]; _exit: @@ -263,7 +265,7 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, - const char *idStr) { + const char *idStr, bool strictTimeRange) { int32_t code = TSDB_CODE_SUCCESS; *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); @@ -340,6 +342,16 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t if ((*pIter)->iSttBlk != -1) { (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk); (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1; + + if ((!backward) && ((strictTimeRange && (*pIter)->pSttBlk->minKey >= (*pIter)->timeWindow.ekey) || + (!strictTimeRange && (*pIter)->pSttBlk->minKey > (*pIter)->timeWindow.ekey))) { + (*pIter)->pSttBlk = NULL; + } + + if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) || + (!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) { + (*pIter)->pSttBlk = NULL; + } } return code; @@ -421,7 +433,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { pBlockData->aUid != NULL) { i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward); if (i == -1) { - tsdbDebug("failed to find the data in pBlockData, uid:%"PRIu64" , %s", pIter->uid, idStr); + tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr); pIter->iRow = -1; return; } @@ -508,7 +520,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { } // set start row index - pIter->iRow = pIter->backward? pBlockData->nRow-1:0; + pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0; } } @@ -551,7 +563,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr) { + bool destroyLoadInfo, const char *idStr, bool strictTimeRange) { pMTree->backward = backward; pMTree->pIter = NULL; pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); @@ -573,7 +585,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file struct SLDataIter *pIter = NULL; code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, - &pMTree->pLoadInfo[i], pMTree->idStr); + &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange); if (code != TSDB_CODE_SUCCESS) { goto _end; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index fd19e552c1dce4f7c2e9b530895f09a3e4e90f47..2766745cbe03641200a9cdfa7789d7ebfc776eb3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -315,11 +315,11 @@ static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) } if (pBuf->numOfTables > 0) { - STableBlockScanInfo **p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData); + STableBlockScanInfo** p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData); taosMemoryFree(*p); pBuf->numOfTables /= pBuf->numPerBucket; } - + int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket; int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket; if (pBuf->pData == NULL) { @@ -919,7 +919,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN pBlockNum->numOfBlocks += 1; } - if ((pScanInfo->pBlockList != NULL )&& (taosArrayGetSize(pScanInfo->pBlockList) > 0)) { + if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) { numOfQTable += 1; } } @@ -1798,7 +1798,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc while (1) { bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); - if (!hasVal) { // the next value will be the accessed key in stt + if (!hasVal) { // the next value will be the accessed key in stt pScanInfo->lastKeyInStt += step; return false; } @@ -2481,7 +2481,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan pScanInfo->uid, pReader->idStr); int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, - pLBlockReader->pInfo, false, pReader->idStr); + pLBlockReader->pInfo, false, pReader->idStr, false); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -3512,7 +3512,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn CHECK_FILEBLOCK_STATE* state) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->order); *state = CHECK_FILEBLOCK_QUIT; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; @@ -3927,7 +3927,8 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n if (code) { return code; } - pReader->status.uidList.tableUidList = (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num); + pReader->status.uidList.tableUidList = + (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num); } taosHashClear(pReader->status.pTableMap); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index c017266839a872711b5262ee6671d17d3a3a76b6..d7e9c72a914dbc1da9c95a99d5aceeaa37b91fac 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -382,6 +382,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->syncRestore = state.restored; pLoad->syncCanRead = state.canRead; pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); + pLoad->numOfCacheTables = tsdbCacheGetElems(pVnode); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); pLoad->totalStorage = (int64_t)3 * 1073741824; diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 264883be4e71c99f1f3f3a1fbc872bf0c522728e..f4172fbb4427463f8a6cf8f937eac1df7eae2ce6 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -580,6 +580,16 @@ static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) { return usage; } +static int32_t taosLRUCacheShardGetElems(SLRUCacheShard *shard) { + int32_t elems = 0; + + taosThreadMutexLock(&shard->mutex); + elems = shard->table.elems; + taosThreadMutexUnlock(&shard->mutex); + + return elems; +} + static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) { size_t usage = 0; @@ -755,6 +765,16 @@ size_t taosLRUCacheGetUsage(SLRUCache *cache) { return usage; } +int32_t taosLRUCacheGetElems(SLRUCache *cache) { + int32_t elems = 0; + + for (int i = 0; i < cache->numShards; ++i) { + elems += taosLRUCacheShardGetElems(&cache->shards[i]); + } + + return elems; +} + size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) { size_t usage = 0;