diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 2ed4b81204050ffe2c67b6f51929829d7403b557..68875341bae20bacab048412a81cd23256cb2f83 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -235,11 +235,11 @@ typedef struct { int maxPoints; // max number of points int bufSize; - int numOfRows; - int numOfCols; // Total number of cols - int sversion; // TODO: set sversion - void * buf; - SDataCol cols[]; + int numOfRows; + int numOfCols; // Total number of cols + int sversion; // TODO: set sversion + void * buf; + SDataCol *cols; } SDataCols; #define keyCol(pCols) (&((pCols)->cols[0])) // Key column @@ -249,13 +249,14 @@ typedef struct { SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); -void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); +int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); -void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows); +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, + int limit2, int tRows); // ----------------- K-V data row structure /* diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 12ea4ad78d18a5f2b09daabb71093c153d30aca7..adfce5580e9eecb2e0168038d004e5f6cc934716 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -310,9 +310,15 @@ void dataColSetOffset(SDataCol *pCol, int nEle) { } SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { - SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols); + SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols)); if (pCols == NULL) return NULL; + pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol)); + if (pCols->cols == NULL) { + tdFreeDataCols(pCols); + return NULL; + } + pCols->maxRowSize = maxRowSize; pCols->maxCols = maxCols; pCols->maxPoints = maxRows; @@ -320,15 +326,27 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->buf = malloc(pCols->bufSize); if (pCols->buf == NULL) { - free(pCols); + tdFreeDataCols(pCols); return NULL; } return pCols; } -void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { - // assert(schemaNCols(pSchema) <= pCols->numOfCols); +int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { + if (schemaNCols(pSchema) > pCols->maxCols) { + pCols->maxCols = schemaNCols(pSchema); + pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); + if (pCols->cols == NULL) return -1; + } + + if (schemaTLen(pSchema) > pCols->maxRowSize) { + pCols->maxRowSize = schemaTLen(pSchema); + pCols->bufSize = schemaTLen(pSchema) * pCols->maxPoints; + pCols->buf = realloc(pCols->buf, pCols->bufSize); + if (pCols->buf == NULL) return -1; + } + tdResetDataCols(pCols); pCols->numOfCols = schemaNCols(pSchema); @@ -337,11 +355,14 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints); ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize); } + + return 0; } void tdFreeDataCols(SDataCols *pCols) { if (pCols) { tfree(pCols->buf); + tfree(pCols->cols); free(pCols); } } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 3e92c017651fb19e59eb6b144b052f721a296c65..6a54cb23998ff48ffeb2acb552e6ce5f7a6cbeac 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -483,7 +483,7 @@ void tsdbDestroyHelper(SRWHelper* pHelper); void tsdbResetHelper(SRWHelper* pHelper); int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError); -void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); +int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); int tsdbWriteCompInfo(SRWHelper* pHelper); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 3a453249a99ec854205982a1e1536c9283b8ccbd..7f674f09e5a9158dd35ce0de222300f538614ed4 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -788,7 +788,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - tsdbSetHelperTable(&rhelper, pTable, pRepo); + if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err; SCompIdx *pIdx = &(rhelper.curCompIdx); if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index a65d956eb07d17cd928641de373222fcf6f1b22f..506f3c11c104bed4a84641f7d95538a62bdbfec7 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -610,10 +610,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe taosRLockLatch(&(pIter->pTable->latch)); - tsdbSetHelperTable(pHelper, pIter->pTable, pRepo); + if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; if (pIter->pIter != NULL) { - tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)); + if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { taosRUnLockLatch(&(pIter->pTable->latch)); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 94569960711d37a94def06df4f94433b82ba27b0..8d3908666ed0cd9a517fabec3b7e7ea20a0885c7 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -262,7 +262,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { return 0; } -void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { +int tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)); // Clear members and state used by previous table @@ -273,8 +273,15 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { pHelper->tableInfo.uid = pTable->tableId.uid; STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); - tdInitDataCols(pHelper->pDataCols[0], pSchema); - tdInitDataCols(pHelper->pDataCols[1], pSchema); + if (tdInitDataCols(pHelper->pDataCols[0], pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (tdInitDataCols(pHelper->pDataCols[1], pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } if (pHelper->idxH.numOfIdx > 0) { while (true) { @@ -309,6 +316,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { helperSetState(pHelper, TSDB_HELPER_TABLE_SET); ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1)); + + return 0; } int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {