diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5d570821cb6608fb382398e146766eb1f8aff284..85b1509aca1205c876456174983fb4feff7a2d12 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -190,6 +190,7 @@ typedef struct SQInfo { int32_t code; // error code to returned to client int64_t owner; // if it is in execution void* tsdb; + int32_t ref; // tsdb mem/immem ref count int32_t vgId; STableGroupInfo tableGroupInfo; // table list SArray STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 256b8189f8fc69345b27fdf702fb705d22ac3c10..989e29737512f154deb0771bf69c8ca1a5583e7c 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -425,8 +425,8 @@ SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); -void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); +int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem, bool ref); +void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem, bool unRef); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, @@ -576,4 +576,4 @@ void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); } #endif -#endif \ No newline at end of file +#endif diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 4cf8ddd4bd8df396352ad66b8499552018d5d322..a667c89f4253df0ec1b91174588af18ee020ab5d 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -171,30 +171,36 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { return 0; } -int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { +int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem, bool ref) { if (tsdbLockRepo(pRepo) < 0) return -1; + *pMem = pRepo->mem; + *pIMem = pRepo->imem; - *pMem = pRepo->mem; - *pIMem = pRepo->imem; - tsdbRefMemTable(pRepo, *pMem); - tsdbRefMemTable(pRepo, *pIMem); + if (ref) { + tsdbRefMemTable(pRepo, *pMem); + tsdbRefMemTable(pRepo, *pIMem); + } if (tsdbUnlockRepo(pRepo) < 0) return -1; - if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch)); + if (*pMem != NULL) { + if (ref) taosRLockLatch(&((*pMem)->latch)); + } tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem); return 0; } -void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) { +void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem, bool unRef) { if (pMem != NULL) { - taosRUnLockLatch(&(pMem->latch)); - tsdbUnRefMemTable(pRepo, pMem); + if (unRef) { + taosRUnLockLatch(&(pMem->latch)); + tsdbUnRefMemTable(pRepo, pMem); + } } if (pIMem != NULL) { - tsdbUnRefMemTable(pRepo, pIMem); + if (unRef) tsdbUnRefMemTable(pRepo, pIMem); } } @@ -780,4 +786,4 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { taosTFree(tData); return 0; -} \ No newline at end of file +} diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index f0a2694b608363518c76b63d2e1e674330fba2fa..b21e5273e1e2a53b1c4fcbce26f4da7f47286b2c 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -20,6 +20,7 @@ #include "exception.h" #include "../../query/inc/qAst.h" // todo move to common module +#include "../../query/inc/qExecutor.h" #include "tlosertree.h" #include "tsdb.h" #include "tsdbMain.h" @@ -203,8 +204,8 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) { goto out_of_memory; } - - tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem); + SQInfo *pQInfo = (SQInfo *)(pQueryHandle->qinfo); + tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem, pQInfo->ref++ == 0); size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); @@ -1939,14 +1940,15 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { pSecQueryHandle->checkFiles = true; pSecQueryHandle->activeIndex = 0; pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock; + pSecQueryHandle->qinfo = pQueryHandle->qinfo; if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; free(pSecQueryHandle); return false; } - - tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem); + SQInfo *pQInfo = (SQInfo *)(pSecQueryHandle->qinfo); + tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem, (pQInfo->ref++ == 0)); // allocate buffer in order to load data blocks from file int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); @@ -2707,7 +2709,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { taosTFree(pQueryHandle->statis); // todo check error - tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem); + SQInfo *qInfo = (SQInfo *)(pQueryHandle->qinfo); + tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem, --qInfo->ref == 0); tsdbDestroyHelper(&pQueryHandle->rhelper);