From 9de8ca6725960dc5078196db8f78e9a4e6e3e8fa Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 3 Aug 2021 15:29:00 +0800 Subject: [PATCH] [TD-5694]: alloc mem for datacols dynamically --- src/common/inc/tdataformat.h | 6 +- src/common/src/tdataformat.c | 180 ++++++++++++++------------------- src/tsdb/inc/tsdbRowMergeBuf.h | 4 +- src/tsdb/src/tsdbMeta.c | 2 + src/tsdb/src/tsdbReadImpl.c | 9 ++ 5 files changed, 95 insertions(+), 106 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 47bd8a72b2..99c612c86c 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -325,7 +325,7 @@ typedef struct SDataCol { #define isAllRowsNull(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } -void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); +void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColSetOffset(SDataCol *pCol, int nEle); @@ -358,12 +358,12 @@ typedef struct { int maxRowSize; int maxCols; // max number of columns int maxPoints; // max number of points - int bufSize; + //int bufSize; int numOfRows; int numOfCols; // Total number of cols int sversion; // TODO: set sversion - void * buf; + //void * buf; SDataCol *cols; } SDataCols; diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 8ef3d083c7..ad928211a1 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -207,24 +207,16 @@ SMemRow tdMemRowDup(SMemRow row) { return trow; } -void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) { +void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { pDataCol->type = colType(pCol); pDataCol->colId = colColId(pCol); pDataCol->bytes = colBytes(pCol); pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; pDataCol->len = 0; - if (IS_VAR_DATA_TYPE(pDataCol->type)) { - pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); - pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints); - pDataCol->spaceSize = pDataCol->bytes * maxPoints; - *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints); - } else { - pDataCol->spaceSize = pDataCol->bytes * maxPoints; - pDataCol->dataOff = NULL; - pDataCol->pData = *pBuf; - *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize); - } + pDataCol->spaceSize = pDataCol->bytes * maxPoints; + pDataCol->pData = NULL; + pDataCol->dataOff = NULL; } // value from timestamp should be TKEY here instead of TSKEY void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { @@ -239,6 +231,15 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP if (numOfRows > 0) { // Find the first not null value, fill all previouse values as NULL dataColSetNEleNull(pCol, numOfRows, maxPoints); + } else { + if(pCol->pData == NULL) { + pCol->pData = malloc(maxPoints * pCol->bytes); + ASSERT(pCol->pData != NULL); + if(IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); + ASSERT(pCol->dataOff != NULL); + } + } } } @@ -263,7 +264,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) { return true; } -FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { +static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->dataOff[index] = pCol->len; char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); @@ -277,6 +278,15 @@ FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { + if(pCol->pData == NULL) { + pCol->pData = malloc(maxPoints * pCol->bytes); + ASSERT(pCol->pData != NULL); + if(IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); + ASSERT(pCol->dataOff != NULL); + } + } + if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->len = 0; for (int i = 0; i < nEle; i++) { @@ -324,17 +334,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { } pCols->maxRowSize = maxRowSize; - pCols->bufSize = maxRowSize * maxRows; - if (pCols->bufSize > 0) { - pCols->buf = malloc(pCols->bufSize); - if (pCols->buf == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, - strerror(errno)); - tdFreeDataCols(pCols); - return NULL; - } - } return pCols; } @@ -348,27 +348,31 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { if (schemaTLen(pSchema) > pCols->maxRowSize) { pCols->maxRowSize = schemaTLen(pSchema); - pCols->bufSize = schemaTLen(pSchema) * pCols->maxPoints; - pCols->buf = realloc(pCols->buf, pCols->bufSize); - if (pCols->buf == NULL) return -1; } tdResetDataCols(pCols); pCols->numOfCols = schemaNCols(pSchema); - void *ptr = pCols->buf; for (int i = 0; i < schemaNCols(pSchema); i++) { - dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints); - ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize); + dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); } return 0; } SDataCols *tdFreeDataCols(SDataCols *pCols) { + int i; if (pCols) { - tfree(pCols->buf); - tfree(pCols->cols); + if(pCols->cols) { + int maxCols = pCols->maxCols; + for(i = 0; i < maxCols; i++) { + SDataCol *pCol = &pCols->cols[i]; + tfree(pCol->pData); + tfree(pCol->dataOff); + } + free(pCols->cols); + pCols->cols = NULL; + } free(pCols); } return NULL; @@ -389,19 +393,17 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].offset = pDataCols->cols[i].offset; pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; - pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); - - if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { - ASSERT(pDataCols->cols[i].dataOff != NULL); - pRet->cols[i].dataOff = - (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf))); - } + pRet->cols[i].len = 0; + pRet->cols[i].dataOff = NULL; + pRet->cols[i].pData = NULL; if (keepData) { pRet->cols[i].len = pDataCols->cols[i].len; if (pDataCols->cols[i].len > 0) { + pRet->cols[i].pData = malloc(pDataCols->cols[i].bytes * pDataCols->maxPoints); memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { + pRet->cols[i].dataOff = malloc(sizeof(VarDataOffsetT) * pDataCols->maxPoints); memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); } } @@ -426,40 +428,27 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols int rcol = 0; int dcol = 0; - if (dataRowDeleted(row)) { - for (; dcol < pCols->numOfCols; dcol++) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (dcol == 0) { - dataColAppendVal(pDataCol, dataRowTuple(row), pCols->numOfRows, pCols->maxPoints); - } else { - dataColSetNullAt(pDataCol, pCols->numOfRows); - } + while (dcol < pCols->numOfCols) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= schemaNCols(pSchema)) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dcol++; + continue; } - } else { - while (dcol < pCols->numOfCols) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (rcol >= schemaNCols(pSchema)) { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - dcol++; - continue; - } - STColumn *pRowCol = schemaColAt(pSchema, rcol); - if (pRowCol->colId == pDataCol->colId) { - void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); - dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); - dcol++; - rcol++; - } else if (pRowCol->colId < pDataCol->colId) { - rcol++; - } else { - if(forceSetNull) { - //dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - } - dcol++; + STColumn *pRowCol = schemaColAt(pSchema, rcol); + if (pRowCol->colId == pDataCol->colId) { + void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + dcol++; + rcol++; + } else if (pRowCol->colId < pDataCol->colId) { + rcol++; + } else { + if(forceSetNull) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } + dcol++; } } pCols->numOfRows++; @@ -471,43 +460,30 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo int rcol = 0; int dcol = 0; - if (kvRowDeleted(row)) { - for (; dcol < pCols->numOfCols; dcol++) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (dcol == 0) { - dataColAppendVal(pDataCol, kvRowValues(row), pCols->numOfRows, pCols->maxPoints); - } else { - dataColSetNullAt(pDataCol, pCols->numOfRows); - } + int nRowCols = kvRowNCols(row); + + while (dcol < pCols->numOfCols) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + ++dcol; + continue; } - } else { - int nRowCols = kvRowNCols(row); - while (dcol < pCols->numOfCols) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - ++dcol; - continue; - } + SColIdx *colIdx = kvRowColIdxAt(row, rcol); - SColIdx *colIdx = kvRowColIdxAt(row, rcol); - - if (colIdx->colId == pDataCol->colId) { - void *value = tdGetKvRowDataOfCol(row, colIdx->offset); - dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); - ++dcol; - ++rcol; - } else if (colIdx->colId < pDataCol->colId) { - ++rcol; - } else { - if (forceSetNull) { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - } - ++dcol; + if (colIdx->colId == pDataCol->colId) { + void *value = tdGetKvRowDataOfCol(row, colIdx->offset); + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + ++dcol; + ++rcol; + } else if (colIdx->colId < pDataCol->colId) { + ++rcol; + } else { + if (forceSetNull) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } + ++dcol; } } pCols->numOfRows++; diff --git a/src/tsdb/inc/tsdbRowMergeBuf.h b/src/tsdb/inc/tsdbRowMergeBuf.h index 302bf25750..cefa9b27fb 100644 --- a/src/tsdb/inc/tsdbRowMergeBuf.h +++ b/src/tsdb/inc/tsdbRowMergeBuf.h @@ -29,7 +29,9 @@ typedef void* SMergeBuf; SDataRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2); static FORCE_INLINE int tsdbMergeBufMakeSureRoom(SMergeBuf *pBuf, STSchema* pSchema1, STSchema* pSchema2) { - return tsdbMakeRoom(pBuf, MAX(dataRowMaxBytesFromSchema(pSchema1), dataRowMaxBytesFromSchema(pSchema2))); + size_t len1 = dataRowMaxBytesFromSchema(pSchema1); + size_t len2 = dataRowMaxBytesFromSchema(pSchema2); + return tsdbMakeRoom(pBuf, MAX(len1, len2)); } static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) { diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index f233500ee9..619b32b3d9 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -1035,6 +1035,8 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro } } } + pMeta->maxCols = maxCols; + pMeta->maxRowBytes = maxRowBytes; if (lock) tsdbUnlockRepoMeta(pRepo); tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable)); diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 666a2d3571..a16c3ffe6a 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -518,6 +518,15 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 return -1; } + if(pDataCol->pData == NULL) { + pDataCol->pData = malloc(maxPoints * pDataCol->bytes); + ASSERT(pDataCol->pData != NULL); + if(IS_VAR_DATA_TYPE(pDataCol->type)) { + pDataCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); + ASSERT(pDataCol->dataOff != NULL); + } + } + // Decode the data if (comp) { // Need to decompress -- GitLab