未验证 提交 56376f82 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4687 from taosdata/feature/TD-2546

feature<TD-2546>: solve insert time spike problem
...@@ -161,6 +161,11 @@ _err: ...@@ -161,6 +161,11 @@ _err:
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno);
SMemTable *pIMem = pRepo->imem;
tsdbLockRepo(pRepo);
pRepo->imem = NULL;
tsdbUnlockRepo(pRepo);
tsdbUnRefMemTable(pRepo, pIMem);
sem_post(&(pRepo->readyToCommit)); sem_post(&(pRepo->readyToCommit));
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "tsdbMain.h" #include "tsdbMain.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable); static void tsdbFreeMemTable(SMemTable *pMemTable);
...@@ -205,7 +206,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { ...@@ -205,7 +206,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
int tsdbAsyncCommit(STsdbRepo *pRepo) { int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (pRepo->mem == NULL) return 0; if (pRepo->mem == NULL) return 0;
SMemTable *pIMem = pRepo->imem; ASSERT(pRepo->imem == NULL);
sem_wait(&(pRepo->readyToCommit)); sem_wait(&(pRepo->readyToCommit));
...@@ -220,8 +221,6 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -220,8 +221,6 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
tsdbScheduleCommit(pRepo); tsdbScheduleCommit(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
return 0; return 0;
} }
...@@ -606,19 +605,13 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -606,19 +605,13 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
STable * pTable = NULL; STable * pTable = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
SDataRow row = NULL; SDataRow row = NULL;
void ** rows = NULL; void * rows[TSDB_MAX_INSERT_BATCH] = {0};
int rowCounter = 0; int rowCounter = 0;
ASSERT(pBlock->tid < pMeta->maxTables); ASSERT(pBlock->tid < pMeta->maxTables);
pTable = pMeta->tables[pBlock->tid]; pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
rows = (void **)calloc(pBlock->numOfRows, sizeof(void *));
if (rows == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbInitSubmitBlkIter(pBlock, &blkIter); tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) { if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) {
...@@ -632,9 +625,18 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -632,9 +625,18 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
if (rows[rowCounter] != NULL) { if (rows[rowCounter] != NULL) {
rowCounter++; rowCounter++;
} }
if (rowCounter == TSDB_MAX_INSERT_BATCH) {
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
goto _err;
}
rowCounter = 0;
memset(rows, 0, sizeof(rows));
}
} }
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) { if (rowCounter > 0 && tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
goto _err; goto _err;
} }
...@@ -642,11 +644,9 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -642,11 +644,9 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
pRepo->stat.pointsWritten += points * schemaNCols(pSchema); pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema); pRepo->stat.totalStorage += points * schemaVLen(pSchema);
free(rows);
return 0; return 0;
_err: _err:
free(rows);
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册