diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 47bd8a72b208fd09ef85511ccc6ef8e82e3f354a..fb6bab0cf29d50f196a18eeff990b1fc62dac9ea 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -325,7 +325,9 @@ typedef struct SDataCol { #define isAllRowsNull(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } -void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); +int tdAllocMemForCol(SDataCol *pCol, int maxPoints); + +void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColSetOffset(SDataCol *pCol, int nEle); @@ -358,12 +360,10 @@ typedef struct { int maxRowSize; int maxCols; // max number of columns int maxPoints; // max number of points - int bufSize; int numOfRows; int numOfCols; // Total number of cols int sversion; // TODO: set sversion - void * buf; SDataCol *cols; } SDataCols; diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 8ef3d083c75c58381fc8a71f076e7e04e976d774..3f0ab7f93ead2bbba62e8c75035530614798bd28 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -22,6 +22,29 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows, bool forceSetNull); +//TODO: change caller to use return val +int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { + int spaceNeeded = pCol->bytes * maxPoints; + if(IS_VAR_DATA_TYPE(pCol->type)) { + spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; + } + if(pCol->spaceSize < spaceNeeded) { + void* ptr = realloc(pCol->pData, spaceNeeded); + if(ptr == NULL) { + uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)pCol->spaceSize, + strerror(errno)); + return -1; + } else { + pCol->pData = ptr; + pCol->spaceSize = spaceNeeded; + } + } + if(IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); + } + return 0; +} + /** * Duplicate the schema and return a new object */ @@ -207,24 +230,13 @@ SMemRow tdMemRowDup(SMemRow row) { return trow; } -void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) { +void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { pDataCol->type = colType(pCol); pDataCol->colId = colColId(pCol); pDataCol->bytes = colBytes(pCol); pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; pDataCol->len = 0; - if (IS_VAR_DATA_TYPE(pDataCol->type)) { - pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); - pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints); - pDataCol->spaceSize = pDataCol->bytes * maxPoints; - *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints); - } else { - pDataCol->spaceSize = pDataCol->bytes * maxPoints; - pDataCol->dataOff = NULL; - pDataCol->pData = *pBuf; - *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize); - } } // value from timestamp should be TKEY here instead of TSKEY void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { @@ -239,6 +251,8 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP if (numOfRows > 0) { // Find the first not null value, fill all previouse values as NULL dataColSetNEleNull(pCol, numOfRows, maxPoints); + } else { + tdAllocMemForCol(pCol, maxPoints); } } @@ -257,13 +271,14 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP } bool isNEleNull(SDataCol *pCol, int nEle) { + if(isAllRowsNull(pCol)) return true; for (int i = 0; i < nEle; i++) { if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false; } return true; } -FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { +static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->dataOff[index] = pCol->len; char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); @@ -276,6 +291,7 @@ FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { } void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { + tdAllocMemForCol(pCol, maxPoints); if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->len = 0; @@ -319,56 +335,63 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { tdFreeDataCols(pCols); return NULL; } + int i; + for(i = 0; i < maxCols; i++) { + pCols->cols[i].spaceSize = 0; + pCols->cols[i].len = 0; + pCols->cols[i].pData = NULL; + pCols->cols[i].dataOff = NULL; + } pCols->maxCols = maxCols; } pCols->maxRowSize = maxRowSize; - pCols->bufSize = maxRowSize * maxRows; - if (pCols->bufSize > 0) { - pCols->buf = malloc(pCols->bufSize); - if (pCols->buf == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, - strerror(errno)); - tdFreeDataCols(pCols); - return NULL; - } - } return pCols; } int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { - if (schemaNCols(pSchema) > pCols->maxCols) { + int i; + int oldMaxCols = pCols->maxCols; + if (schemaNCols(pSchema) > oldMaxCols) { pCols->maxCols = schemaNCols(pSchema); pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); if (pCols->cols == NULL) return -1; + for(i = oldMaxCols; i < pCols->maxCols; i++) { + pCols->cols[i].pData = NULL; + pCols->cols[i].dataOff = NULL; + pCols->cols[i].spaceSize = 0; + } } if (schemaTLen(pSchema) > pCols->maxRowSize) { pCols->maxRowSize = schemaTLen(pSchema); - pCols->bufSize = schemaTLen(pSchema) * pCols->maxPoints; - pCols->buf = realloc(pCols->buf, pCols->bufSize); - if (pCols->buf == NULL) return -1; } tdResetDataCols(pCols); pCols->numOfCols = schemaNCols(pSchema); - void *ptr = pCols->buf; - for (int i = 0; i < schemaNCols(pSchema); i++) { - dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints); - ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize); + for (i = 0; i < schemaNCols(pSchema); i++) { + dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); } return 0; } SDataCols *tdFreeDataCols(SDataCols *pCols) { + int i; if (pCols) { - tfree(pCols->buf); - tfree(pCols->cols); + if(pCols->cols) { + int maxCols = pCols->maxCols; + for(i = 0; i < maxCols; i++) { + SDataCol *pCol = &pCols->cols[i]; + tfree(pCol->pData); + } + free(pCols->cols); + pCols->cols = NULL; + } free(pCols); } return NULL; @@ -388,21 +411,14 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].bytes = pDataCols->cols[i].bytes; pRet->cols[i].offset = pDataCols->cols[i].offset; - pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; - pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); - - if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { - ASSERT(pDataCols->cols[i].dataOff != NULL); - pRet->cols[i].dataOff = - (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf))); - } - if (keepData) { - pRet->cols[i].len = pDataCols->cols[i].len; if (pDataCols->cols[i].len > 0) { + tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints); + pRet->cols[i].len = pDataCols->cols[i].len; memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { - memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); + int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints; + memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize); } } } @@ -426,40 +442,27 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols int rcol = 0; int dcol = 0; - if (dataRowDeleted(row)) { - for (; dcol < pCols->numOfCols; dcol++) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (dcol == 0) { - dataColAppendVal(pDataCol, dataRowTuple(row), pCols->numOfRows, pCols->maxPoints); - } else { - dataColSetNullAt(pDataCol, pCols->numOfRows); - } + while (dcol < pCols->numOfCols) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= schemaNCols(pSchema)) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dcol++; + continue; } - } else { - while (dcol < pCols->numOfCols) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (rcol >= schemaNCols(pSchema)) { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - dcol++; - continue; - } - STColumn *pRowCol = schemaColAt(pSchema, rcol); - if (pRowCol->colId == pDataCol->colId) { - void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); - dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); - dcol++; - rcol++; - } else if (pRowCol->colId < pDataCol->colId) { - rcol++; - } else { - if(forceSetNull) { - //dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - } - dcol++; + STColumn *pRowCol = schemaColAt(pSchema, rcol); + if (pRowCol->colId == pDataCol->colId) { + void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + dcol++; + rcol++; + } else if (pRowCol->colId < pDataCol->colId) { + rcol++; + } else { + if(forceSetNull) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } + dcol++; } } pCols->numOfRows++; @@ -471,43 +474,30 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo int rcol = 0; int dcol = 0; - if (kvRowDeleted(row)) { - for (; dcol < pCols->numOfCols; dcol++) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (dcol == 0) { - dataColAppendVal(pDataCol, kvRowValues(row), pCols->numOfRows, pCols->maxPoints); - } else { - dataColSetNullAt(pDataCol, pCols->numOfRows); - } + int nRowCols = kvRowNCols(row); + + while (dcol < pCols->numOfCols) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + ++dcol; + continue; } - } else { - int nRowCols = kvRowNCols(row); - while (dcol < pCols->numOfCols) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - ++dcol; - continue; - } + SColIdx *colIdx = kvRowColIdxAt(row, rcol); - SColIdx *colIdx = kvRowColIdxAt(row, rcol); - - if (colIdx->colId == pDataCol->colId) { - void *value = tdGetKvRowDataOfCol(row, colIdx->offset); - dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); - ++dcol; - ++rcol; - } else if (colIdx->colId < pDataCol->colId) { - ++rcol; - } else { - if (forceSetNull) { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - } - ++dcol; + if (colIdx->colId == pDataCol->colId) { + void *value = tdGetKvRowDataOfCol(row, colIdx->offset); + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + ++dcol; + ++rcol; + } else if (colIdx->colId < pDataCol->colId) { + ++rcol; + } else { + if (forceSetNull) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } + ++dcol; } } pCols->numOfRows++; diff --git a/src/tsdb/inc/tsdbRowMergeBuf.h b/src/tsdb/inc/tsdbRowMergeBuf.h index 302bf25750fc08367a2840fa9c483919c828fcb5..cefa9b27fbd2a6116e2d7180f102440af9595e93 100644 --- a/src/tsdb/inc/tsdbRowMergeBuf.h +++ b/src/tsdb/inc/tsdbRowMergeBuf.h @@ -29,7 +29,9 @@ typedef void* SMergeBuf; SDataRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2); static FORCE_INLINE int tsdbMergeBufMakeSureRoom(SMergeBuf *pBuf, STSchema* pSchema1, STSchema* pSchema2) { - return tsdbMakeRoom(pBuf, MAX(dataRowMaxBytesFromSchema(pSchema1), dataRowMaxBytesFromSchema(pSchema2))); + size_t len1 = dataRowMaxBytesFromSchema(pSchema1); + size_t len2 = dataRowMaxBytesFromSchema(pSchema2); + return tsdbMakeRoom(pBuf, MAX(len1, len2)); } static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) { diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index f233500ee9d036148657125abcd09eae53a88185..619b32b3d92cc27f93855e3c5ffe42d327bc4695 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -1035,6 +1035,8 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro } } } + pMeta->maxCols = maxCols; + pMeta->maxRowBytes = maxRowBytes; if (lock) tsdbUnlockRepoMeta(pRepo); tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable)); diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 666a2d357144431093855479f30a4feefbb9ab3b..711c32535bb00bae55d7faafadc115a22c76a949 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -518,6 +518,8 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 return -1; } + tdAllocMemForCol(pDataCol, maxPoints); + // Decode the data if (comp) { // Need to decompress