diff --git a/src/tsdb/inc/tsdbMemTable.h b/src/tsdb/inc/tsdbMemTable.h index 3b3f1dd1f6a9307bbe3954374b005a23a9f15ab0..bd64ed4a5238b3b8b60716d8732d59d27218c639 100644 --- a/src/tsdb/inc/tsdbMemTable.h +++ b/src/tsdb/inc/tsdbMemTable.h @@ -37,6 +37,7 @@ typedef struct { TSKEY keyLast; int64_t numOfRows; SSkipList* pData; + T_REF_DECLARE() } STableData; typedef struct { @@ -76,7 +77,7 @@ typedef struct { int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); +int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem, SArray* pATable); void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 73a127079920bcdf0cdc2d3e244998310c4d8dbc..6818f2ed14c278bd5d203d0570f52772c159dcde 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -124,17 +124,66 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { return 0; } -int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { +int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem, SArray *pATable) { + SMemTable *tmem; + + // Get snap object if (tsdbLockRepo(pRepo) < 0) return -1; - *pMem = pRepo->mem; + tmem = pRepo->mem; *pIMem = pRepo->imem; - tsdbRefMemTable(pRepo, *pMem); + tsdbRefMemTable(pRepo, tmem); tsdbRefMemTable(pRepo, *pIMem); if (tsdbUnlockRepo(pRepo) < 0) return -1; - if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch)); + // Copy mem objects and ref needed STableData + if (tmem) { + taosRLockLatch(&(tmem->latch)); + + *pMem = (SMemTable *)calloc(1, sizeof(**pMem)); + if (*pMem == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + taosRUnLockLatch(&(tmem->latch)); + tsdbUnRefMemTable(pRepo, tmem); + tsdbUnRefMemTable(pRepo, *pIMem); + *pMem = NULL; + *pIMem = NULL; + return -1; + } + + (*pMem)->tData = (STableData **)calloc(tmem->maxTables, sizeof(STableData *)); + if ((*pMem)->tData == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + taosRUnLockLatch(&(tmem->latch)); + free(*pMem); + tsdbUnRefMemTable(pRepo, tmem); + tsdbUnRefMemTable(pRepo, *pIMem); + *pMem = NULL; + *pIMem = NULL; + return -1; + } + + (*pMem)->keyFirst = tmem->keyFirst; + (*pMem)->keyLast = tmem->keyLast; + (*pMem)->numOfRows = tmem->numOfRows; + (*pMem)->maxTables = tmem->maxTables; + + for (size_t i = 0; i < taosArrayGetSize(pATable); i++) { + STable * pTable = *(STable **)taosArrayGet(pATable, i); + int32_t tid = TABLE_TID(pTable); + STableData *pTableData = (tid < tmem->maxTables) ? tmem->tData[tid] : NULL; + + if ((pTableData == NULL) || (TABLE_UID(pTable) != pTableData->uid)) continue; + + (*pMem)->tData[tid] = tmem->tData[tid]; + T_REF_INC(tmem->tData[tid]); + } + + taosRUnLockLatch(&(tmem->latch)); + } + + tsdbUnRefMemTable(pRepo, tmem); tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem); return 0; @@ -144,8 +193,14 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem); if (pMem != NULL) { - taosRUnLockLatch(&(pMem->latch)); - tsdbUnRefMemTable(pRepo, pMem); + for (size_t i = 0; i < pMem->maxTables; i++) { + STableData *pTableData = pMem->tData[i]; + if (pTableData) { + tsdbFreeTableData(pTableData); + } + } + free(pMem->tData); + free(pMem); } if (pIMem != NULL) { @@ -436,7 +491,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData)); if (pTableData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; + return NULL; } pTableData->uid = TABLE_UID(pTable); @@ -449,20 +504,22 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey); if (pTableData->pData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; + free(pTableData); + return NULL; } - return pTableData; + T_REF_INC(pTableData); -_err: - tsdbFreeTableData(pTableData); - return NULL; + return pTableData; } static void tsdbFreeTableData(STableData *pTableData) { if (pTableData) { - tSkipListDestroy(pTableData->pData); - free(pTableData); + int32_t ref = T_REF_DEC(pTableData); + if (ref == 0) { + tSkipListDestroy(pTableData->pData); + free(pTableData); + } } } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 7162f74d3ec2620449066bc28f9acaee64067622..486ff49f097df971d1a1130181a1c14b79f9fc01 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -187,13 +187,15 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS return pLocalIdList; } -static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { +static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle, SArray* psTable) { assert(pQueryHandle != NULL && pQueryHandle->pMemRef != NULL); SMemRef* pMemRef = pQueryHandle->pMemRef; if (pQueryHandle->pMemRef->ref++ == 0) { - tsdbTakeMemSnapshot(pQueryHandle->pTsdb, (SMemTable**)&(pMemRef->mem), (SMemTable**)&(pMemRef->imem)); + tsdbTakeMemSnapshot(pQueryHandle->pTsdb, (SMemTable**)&(pMemRef->mem), (SMemTable**)&(pMemRef->imem), psTable); } + + taosArrayDestroy(psTable); } static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { @@ -242,7 +244,7 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) { return rows; } -static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) { +static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta, SArray** psTable) { size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList); assert(sizeOfGroup >= 1 && pMeta != NULL); @@ -252,6 +254,12 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa return NULL; } + SArray* pTable = taosArrayInit(4, sizeof(STable*)); + if (pTable == NULL) { + taosArrayDestroy(pTableCheckInfo); + return NULL; + } + // todo apply the lastkey of table check to avoid to load header file for (int32_t i = 0; i < sizeOfGroup; ++i) { SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i); @@ -277,31 +285,40 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa assert(info.lastKey <= pQueryHandle->window.skey); } + taosArrayPush(pTable, &pKeyInfo->pTable); + taosArrayPush(pTableCheckInfo, &info); tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid, info.tableId.tid, info.lastKey, pQueryHandle->qinfo); } } + *psTable = pTable; + taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar); return pTableCheckInfo; } -static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey) { +static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey, SArray** psTable) { size_t si = taosArrayGetSize(pTableCheckInfo); SArray* pNew = taosArrayInit(si, sizeof(STableCheckInfo)); if (pNew == NULL) { return NULL; } + SArray* pTable = taosArrayInit(si, sizeof(STable*)); + for (int32_t j = 0; j < si; ++j) { STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, j); STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj}; info.tableId = pCheckInfo->tableId; taosArrayPush(pNew, &info); + taosArrayPush(pTable, &pCheckInfo->pTableObj); } + *psTable = pTable; + // it is ordered already, no need to sort again. taosArraySort(pNew, tsdbCheckInfoCompar); return pNew; @@ -332,7 +349,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC goto out_of_memory; } - tsdbMayTakeMemSnapshot(pQueryHandle); + //tsdbMayTakeMemSnapshot(pQueryHandle); assert(pCond != NULL && pCond->numOfCols > 0 && pMemRef != NULL); if (ASCENDING_TRAVERSE(pCond->order)) { @@ -393,14 +410,18 @@ TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STable STsdbMeta* pMeta = tsdbGetMeta(tsdb); assert(pMeta != NULL); + SArray* psTable = NULL; + // todo apply the lastkey of table check to avoid to load header file - pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta); + pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta, &psTable); if (pQueryHandle->pTableCheckInfo == NULL) { tsdbCleanupQueryHandle(pQueryHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } + tsdbMayTakeMemSnapshot(pQueryHandle, psTable); + tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo); return (TsdbQueryHandleT) pQueryHandle; } @@ -2337,12 +2358,18 @@ static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SM pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pMemRef); tfree(cond.colList); - pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey); + + SArray* psTable = NULL; + + pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey, &psTable); if (pSecQueryHandle->pTableCheckInfo == NULL) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; goto out_of_memory; } + + tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable); + if (!tsdbNextDataBlock((void*)pSecQueryHandle)) { // no result in current query, free the corresponding result rows structure if (type == TSDB_PREV_ROW) {