From 9b0bfc895952a238f942a5658d38044efe60694f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 29 Jul 2020 17:37:49 +0800 Subject: [PATCH] make SDataCols buffer flexible --- src/common/inc/tdataformat.h | 15 ++++++++------- src/common/src/tdataformat.c | 29 +++++++++++++++++++++++++---- src/tsdb/inc/tsdbMain.h | 2 +- src/tsdb/src/tsdbMain.c | 2 +- src/tsdb/src/tsdbMemTable.c | 7 +++++-- src/tsdb/src/tsdbRWHelper.c | 15 ++++++++++++--- 6 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 2ed4b81204..68875341ba 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -235,11 +235,11 @@ typedef struct { int maxPoints; // max number of points int bufSize; - int numOfRows; - int numOfCols; // Total number of cols - int sversion; // TODO: set sversion - void * buf; - SDataCol cols[]; + int numOfRows; + int numOfCols; // Total number of cols + int sversion; // TODO: set sversion + void * buf; + SDataCol *cols; } SDataCols; #define keyCol(pCols) (&((pCols)->cols[0])) // Key column @@ -249,13 +249,14 @@ typedef struct { SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); -void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); +int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); -void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows); +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, + int limit2, int tRows); // ----------------- K-V data row structure /* diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 12ea4ad78d..adfce5580e 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -310,9 +310,15 @@ void dataColSetOffset(SDataCol *pCol, int nEle) { } SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { - SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols); + SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols)); if (pCols == NULL) return NULL; + pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol)); + if (pCols->cols == NULL) { + tdFreeDataCols(pCols); + return NULL; + } + pCols->maxRowSize = maxRowSize; pCols->maxCols = maxCols; pCols->maxPoints = maxRows; @@ -320,15 +326,27 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->buf = malloc(pCols->bufSize); if (pCols->buf == NULL) { - free(pCols); + tdFreeDataCols(pCols); return NULL; } return pCols; } -void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { - // assert(schemaNCols(pSchema) <= pCols->numOfCols); +int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { + if (schemaNCols(pSchema) > pCols->maxCols) { + pCols->maxCols = schemaNCols(pSchema); + pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); + if (pCols->cols == NULL) return -1; + } + + if (schemaTLen(pSchema) > pCols->maxRowSize) { + pCols->maxRowSize = schemaTLen(pSchema); + pCols->bufSize = schemaTLen(pSchema) * pCols->maxPoints; + pCols->buf = realloc(pCols->buf, pCols->bufSize); + if (pCols->buf == NULL) return -1; + } + tdResetDataCols(pCols); pCols->numOfCols = schemaNCols(pSchema); @@ -337,11 +355,14 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints); ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize); } + + return 0; } void tdFreeDataCols(SDataCols *pCols) { if (pCols) { tfree(pCols->buf); + tfree(pCols->cols); free(pCols); } } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 3e92c01765..6a54cb2399 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -483,7 +483,7 @@ void tsdbDestroyHelper(SRWHelper* pHelper); void tsdbResetHelper(SRWHelper* pHelper); int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError); -void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); +int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); int tsdbWriteCompInfo(SRWHelper* pHelper); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 3a453249a9..7f674f09e5 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -788,7 +788,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - tsdbSetHelperTable(&rhelper, pTable, pRepo); + if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err; SCompIdx *pIdx = &(rhelper.curCompIdx); if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index a65d956eb0..506f3c11c1 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -610,10 +610,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe taosRLockLatch(&(pIter->pTable->latch)); - tsdbSetHelperTable(pHelper, pIter->pTable, pRepo); + if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; if (pIter->pIter != NULL) { - tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)); + if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { taosRUnLockLatch(&(pIter->pTable->latch)); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 9456996071..8d3908666e 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -262,7 +262,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { return 0; } -void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { +int tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)); // Clear members and state used by previous table @@ -273,8 +273,15 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { pHelper->tableInfo.uid = pTable->tableId.uid; STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); - tdInitDataCols(pHelper->pDataCols[0], pSchema); - tdInitDataCols(pHelper->pDataCols[1], pSchema); + if (tdInitDataCols(pHelper->pDataCols[0], pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (tdInitDataCols(pHelper->pDataCols[1], pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } if (pHelper->idxH.numOfIdx > 0) { while (true) { @@ -309,6 +316,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { helperSetState(pHelper, TSDB_HELPER_TABLE_SET); ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1)); + + return 0; } int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { -- GitLab