diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 637b02cd32ae8ad8e4077684609dcac23922d8a0..6b2ffe118ebf4a7ab085c46d3c56b2f3e9d1d261 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -161,6 +161,11 @@ _err: static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno); + SMemTable *pIMem = pRepo->imem; + tsdbLockRepo(pRepo); + pRepo->imem = NULL; + tsdbUnlockRepo(pRepo); + tsdbUnRefMemTable(pRepo, pIMem); sem_post(&(pRepo->readyToCommit)); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 999e2deb413eb13cc7119ee9042a260366c5f9a3..bedefa55ed856402b2fdf92a3692a10397848ff3 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -17,6 +17,7 @@ #include "tsdbMain.h" #define TSDB_DATA_SKIPLIST_LEVEL 5 +#define TSDB_MAX_INSERT_BATCH 512 static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static void tsdbFreeMemTable(SMemTable *pMemTable); @@ -205,7 +206,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { if (pRepo->mem == NULL) return 0; - SMemTable *pIMem = pRepo->imem; + ASSERT(pRepo->imem == NULL); sem_wait(&(pRepo->readyToCommit)); @@ -220,8 +221,6 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { tsdbScheduleCommit(pRepo); if (tsdbUnlockRepo(pRepo) < 0) return -1; - if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; - return 0; } @@ -606,19 +605,13 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * STable * pTable = NULL; SSubmitBlkIter blkIter = {0}; SDataRow row = NULL; - void ** rows = NULL; + void * rows[TSDB_MAX_INSERT_BATCH] = {0}; int rowCounter = 0; ASSERT(pBlock->tid < pMeta->maxTables); pTable = pMeta->tables[pBlock->tid]; ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); - rows = (void **)calloc(pBlock->numOfRows, sizeof(void *)); - if (rows == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - tsdbInitSubmitBlkIter(pBlock, &blkIter); while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) { @@ -632,9 +625,18 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * if (rows[rowCounter] != NULL) { rowCounter++; } + + if (rowCounter == TSDB_MAX_INSERT_BATCH) { + if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) { + goto _err; + } + + rowCounter = 0; + memset(rows, 0, sizeof(rows)); + } } - if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) { + if (rowCounter > 0 && tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) { goto _err; } @@ -642,11 +644,9 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * pRepo->stat.pointsWritten += points * schemaNCols(pSchema); pRepo->stat.totalStorage += points * schemaVLen(pSchema); - free(rows); return 0; _err: - free(rows); return -1; }