提交 9de8ca67 编写于 作者: L Liu Jicong

[TD-5694]<hot-fix>: alloc mem for datacols dynamically

上级 dd5a627b
...@@ -325,7 +325,7 @@ typedef struct SDataCol { ...@@ -325,7 +325,7 @@ typedef struct SDataCol {
#define isAllRowsNull(pCol) ((pCol)->len == 0) #define isAllRowsNull(pCol) ((pCol)->len == 0)
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle); void dataColSetOffset(SDataCol *pCol, int nEle);
...@@ -358,12 +358,12 @@ typedef struct { ...@@ -358,12 +358,12 @@ typedef struct {
int maxRowSize; int maxRowSize;
int maxCols; // max number of columns int maxCols; // max number of columns
int maxPoints; // max number of points int maxPoints; // max number of points
int bufSize; //int bufSize;
int numOfRows; int numOfRows;
int numOfCols; // Total number of cols int numOfCols; // Total number of cols
int sversion; // TODO: set sversion int sversion; // TODO: set sversion
void * buf; //void * buf;
SDataCol *cols; SDataCol *cols;
} SDataCols; } SDataCols;
......
...@@ -207,24 +207,16 @@ SMemRow tdMemRowDup(SMemRow row) { ...@@ -207,24 +207,16 @@ SMemRow tdMemRowDup(SMemRow row) {
return trow; return trow;
} }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) { void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
pDataCol->type = colType(pCol); pDataCol->type = colType(pCol);
pDataCol->colId = colColId(pCol); pDataCol->colId = colColId(pCol);
pDataCol->bytes = colBytes(pCol); pDataCol->bytes = colBytes(pCol);
pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
pDataCol->len = 0; pDataCol->len = 0;
if (IS_VAR_DATA_TYPE(pDataCol->type)) { pDataCol->spaceSize = pDataCol->bytes * maxPoints;
pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); pDataCol->pData = NULL;
pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints); pDataCol->dataOff = NULL;
pDataCol->spaceSize = pDataCol->bytes * maxPoints;
*pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints);
} else {
pDataCol->spaceSize = pDataCol->bytes * maxPoints;
pDataCol->dataOff = NULL;
pDataCol->pData = *pBuf;
*pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
}
} }
// value from timestamp should be TKEY here instead of TSKEY // value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
...@@ -239,6 +231,15 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP ...@@ -239,6 +231,15 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP
if (numOfRows > 0) { if (numOfRows > 0) {
// Find the first not null value, fill all previouse values as NULL // Find the first not null value, fill all previouse values as NULL
dataColSetNEleNull(pCol, numOfRows, maxPoints); dataColSetNEleNull(pCol, numOfRows, maxPoints);
} else {
if(pCol->pData == NULL) {
pCol->pData = malloc(maxPoints * pCol->bytes);
ASSERT(pCol->pData != NULL);
if(IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT));
ASSERT(pCol->dataOff != NULL);
}
}
} }
} }
...@@ -263,7 +264,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) { ...@@ -263,7 +264,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) {
return true; return true;
} }
FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff[index] = pCol->len; pCol->dataOff[index] = pCol->len;
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
...@@ -277,6 +278,15 @@ FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { ...@@ -277,6 +278,15 @@ FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
if(pCol->pData == NULL) {
pCol->pData = malloc(maxPoints * pCol->bytes);
ASSERT(pCol->pData != NULL);
if(IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT));
ASSERT(pCol->dataOff != NULL);
}
}
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->len = 0; pCol->len = 0;
for (int i = 0; i < nEle; i++) { for (int i = 0; i < nEle; i++) {
...@@ -324,17 +334,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { ...@@ -324,17 +334,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
} }
pCols->maxRowSize = maxRowSize; pCols->maxRowSize = maxRowSize;
pCols->bufSize = maxRowSize * maxRows;
if (pCols->bufSize > 0) {
pCols->buf = malloc(pCols->bufSize);
if (pCols->buf == NULL) {
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
strerror(errno));
tdFreeDataCols(pCols);
return NULL;
}
}
return pCols; return pCols;
} }
...@@ -348,27 +348,31 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { ...@@ -348,27 +348,31 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
if (schemaTLen(pSchema) > pCols->maxRowSize) { if (schemaTLen(pSchema) > pCols->maxRowSize) {
pCols->maxRowSize = schemaTLen(pSchema); pCols->maxRowSize = schemaTLen(pSchema);
pCols->bufSize = schemaTLen(pSchema) * pCols->maxPoints;
pCols->buf = realloc(pCols->buf, pCols->bufSize);
if (pCols->buf == NULL) return -1;
} }
tdResetDataCols(pCols); tdResetDataCols(pCols);
pCols->numOfCols = schemaNCols(pSchema); pCols->numOfCols = schemaNCols(pSchema);
void *ptr = pCols->buf;
for (int i = 0; i < schemaNCols(pSchema); i++) { for (int i = 0; i < schemaNCols(pSchema); i++) {
dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints); dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints);
ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
} }
return 0; return 0;
} }
SDataCols *tdFreeDataCols(SDataCols *pCols) { SDataCols *tdFreeDataCols(SDataCols *pCols) {
int i;
if (pCols) { if (pCols) {
tfree(pCols->buf); if(pCols->cols) {
tfree(pCols->cols); int maxCols = pCols->maxCols;
for(i = 0; i < maxCols; i++) {
SDataCol *pCol = &pCols->cols[i];
tfree(pCol->pData);
tfree(pCol->dataOff);
}
free(pCols->cols);
pCols->cols = NULL;
}
free(pCols); free(pCols);
} }
return NULL; return NULL;
...@@ -389,19 +393,17 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { ...@@ -389,19 +393,17 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
pRet->cols[i].offset = pDataCols->cols[i].offset; pRet->cols[i].offset = pDataCols->cols[i].offset;
pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); pRet->cols[i].len = 0;
pRet->cols[i].dataOff = NULL;
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { pRet->cols[i].pData = NULL;
ASSERT(pDataCols->cols[i].dataOff != NULL);
pRet->cols[i].dataOff =
(int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
}
if (keepData) { if (keepData) {
pRet->cols[i].len = pDataCols->cols[i].len; pRet->cols[i].len = pDataCols->cols[i].len;
if (pDataCols->cols[i].len > 0) { if (pDataCols->cols[i].len > 0) {
pRet->cols[i].pData = malloc(pDataCols->cols[i].bytes * pDataCols->maxPoints);
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
pRet->cols[i].dataOff = malloc(sizeof(VarDataOffsetT) * pDataCols->maxPoints);
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
} }
} }
...@@ -426,40 +428,27 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -426,40 +428,27 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
int rcol = 0; int rcol = 0;
int dcol = 0; int dcol = 0;
if (dataRowDeleted(row)) { while (dcol < pCols->numOfCols) {
for (; dcol < pCols->numOfCols; dcol++) { SDataCol *pDataCol = &(pCols->cols[dcol]);
SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= schemaNCols(pSchema)) {
if (dcol == 0) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dataColAppendVal(pDataCol, dataRowTuple(row), pCols->numOfRows, pCols->maxPoints); dcol++;
} else { continue;
dataColSetNullAt(pDataCol, pCols->numOfRows);
}
} }
} else {
while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++;
continue;
}
STColumn *pRowCol = schemaColAt(pSchema, rcol); STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) { if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
rcol++; rcol++;
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; rcol++;
} else { } else {
if(forceSetNull) { if(forceSetNull) {
//dataColSetNullAt(pDataCol, pCols->numOfRows); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
dcol++;
} }
dcol++;
} }
} }
pCols->numOfRows++; pCols->numOfRows++;
...@@ -471,43 +460,30 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -471,43 +460,30 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
int rcol = 0; int rcol = 0;
int dcol = 0; int dcol = 0;
if (kvRowDeleted(row)) { int nRowCols = kvRowNCols(row);
for (; dcol < pCols->numOfCols; dcol++) {
SDataCol *pDataCol = &(pCols->cols[dcol]); while (dcol < pCols->numOfCols) {
if (dcol == 0) { SDataCol *pDataCol = &(pCols->cols[dcol]);
dataColAppendVal(pDataCol, kvRowValues(row), pCols->numOfRows, pCols->maxPoints); if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
} else { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dataColSetNullAt(pDataCol, pCols->numOfRows); ++dcol;
} continue;
} }
} else {
int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) { SColIdx *colIdx = kvRowColIdxAt(row, rcol);
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
++dcol;
continue;
}
SColIdx *colIdx = kvRowColIdxAt(row, rcol); if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if (colIdx->colId == pDataCol->colId) { dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
void *value = tdGetKvRowDataOfCol(row, colIdx->offset); ++dcol;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); ++rcol;
++dcol; } else if (colIdx->colId < pDataCol->colId) {
++rcol; ++rcol;
} else if (colIdx->colId < pDataCol->colId) { } else {
++rcol; if (forceSetNull) {
} else { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
if (forceSetNull) {
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
++dcol;
} }
++dcol;
} }
} }
pCols->numOfRows++; pCols->numOfRows++;
......
...@@ -29,7 +29,9 @@ typedef void* SMergeBuf; ...@@ -29,7 +29,9 @@ typedef void* SMergeBuf;
SDataRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2); SDataRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2);
static FORCE_INLINE int tsdbMergeBufMakeSureRoom(SMergeBuf *pBuf, STSchema* pSchema1, STSchema* pSchema2) { static FORCE_INLINE int tsdbMergeBufMakeSureRoom(SMergeBuf *pBuf, STSchema* pSchema1, STSchema* pSchema2) {
return tsdbMakeRoom(pBuf, MAX(dataRowMaxBytesFromSchema(pSchema1), dataRowMaxBytesFromSchema(pSchema2))); size_t len1 = dataRowMaxBytesFromSchema(pSchema1);
size_t len2 = dataRowMaxBytesFromSchema(pSchema2);
return tsdbMakeRoom(pBuf, MAX(len1, len2));
} }
static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) { static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) {
......
...@@ -1035,6 +1035,8 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro ...@@ -1035,6 +1035,8 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
} }
} }
} }
pMeta->maxCols = maxCols;
pMeta->maxRowBytes = maxRowBytes;
if (lock) tsdbUnlockRepoMeta(pRepo); if (lock) tsdbUnlockRepoMeta(pRepo);
tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable)); tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable));
......
...@@ -518,6 +518,15 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 ...@@ -518,6 +518,15 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
return -1; return -1;
} }
if(pDataCol->pData == NULL) {
pDataCol->pData = malloc(maxPoints * pDataCol->bytes);
ASSERT(pDataCol->pData != NULL);
if(IS_VAR_DATA_TYPE(pDataCol->type)) {
pDataCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT));
ASSERT(pDataCol->dataOff != NULL);
}
}
// Decode the data // Decode the data
if (comp) { if (comp) {
// Need to decompress // Need to decompress
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册