diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 6187146dd0fbf9b4d316495a0a912043ec82d256..03b2bc255be8778118661f452e225401aaa0b1cb 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -307,6 +307,7 @@ SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); +int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); // ------------------ tsdbFile.c #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 593454243dc24badd2115a0389ad4ffa1acc4102..57e796104952317cdc9ae715a7c9a790adf20e80 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -103,14 +103,14 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { } int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { - ASSERT(IS_REPO_LOCKED(pRepo)); - ASSERT(pMemTable != NULL); + if (pMemTable == NULL) return 0; T_REF_INC(pMemTable); + return 0; } // Need to lock the repository int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { - ASSERT(pMemTable != NULL); + if (pMemTable == NULL) return 0; if (T_REF_DEC(pMemTable) == 0) { STsdbCfg * pCfg = &pRepo->config; @@ -143,6 +143,17 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { return 0; } +int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { + if (tsdbLockRepo(pRepo) < 0) return -1; + + *pMem = pRepo->mem; + *pIMem = pRepo->mem; + tsdbRefMemTable(pRepo, *pMem); + tsdbRefMemTable(pRepo, *pIMem); + + if (tsdbUnlockRepo(pRepo) < 0) return -1; +} + // ---------------- LOCAL FUNCTIONS ---------------- static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { ASSERT(pRepo != NULL); @@ -171,10 +182,16 @@ static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } + + if (tsdbUnRefMemTable(pRepo, pRepo->imem) < 0) { + tsdbError("vgId:%d failed to unref memtable since %s", REPO_ID(pRepo), tstrerror(terrno)) + return NULL; + } } ASSERT(pRepo->commit == 0); SMemTable *pImem = pRepo->imem; + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); if (tsdbLockRepo(pRepo) < 0) return NULL; pRepo->imem = pRepo->mem; @@ -322,20 +339,12 @@ static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); } static void *tsdbCommitData(void *arg) { STsdbRepo *pRepo = (STsdbRepo *)arg; STsdbMeta *pMeta = pRepo->tsdbMeta; - ASSERT(pRepo->imem != NULL); + SMemTable *pMem = pRepo->imem; ASSERT(pRepo->commit == 1); + ASSERT(pMem != NULL); - tsdbPrint("vgId:%d start to commit! keyFirst " PRId64 " keyLast " PRId64 " numOfRows " PRId64, REPO_ID(pRepo), - pRepo->imem->keyFirst, pRepo->imem->keyLast, pRepo->imem->numOfRows); - - // STsdbMeta * pMeta = pRepo->tsdbMeta; - // STsdbCache *pCache = pRepo->tsdbCache; - // STsdbCfg * pCfg = &(pRepo->config); - // SDataCols * pDataCols = NULL; - // SRWHelper whelper = {{0}}; - // if (pCache->imem == NULL) return NULL; - - tsdbPrint("vgId:%d, starting to commit....", pRepo->config.tsdbId); + tsdbPrint("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), + pMem->keyFirst, pMem->keyLast, pMem->numOfRows); // Create the iterator to read from cache SSkipListIterator **iters = tsdbCreateTableIters(pRepo);