提交 85343a0e 编写于 作者: C Cary Xu

[TS-414]<fix>: pCol->len logic fix for partial update of columns

上级 26993d55
...@@ -339,7 +339,15 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } ...@@ -339,7 +339,15 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
int tdAllocMemForCol(SDataCol *pCol, int maxPoints); int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
// value from timestamp should be TKEY here instead of TSKEY
int dataColAppendValEx(SDataCol *pCol, const void *value, int numOfRows, int maxPoints, bool isAppend);
// value from timestamp should be TKEY here instead of TSKEY
FORCE_INLINE int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
return dataColAppendValEx(pCol, value, numOfRows, maxPoints, true);
}
void dataColSetOffset(SDataCol *pCol, int nEle); void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle);
...@@ -670,7 +678,7 @@ static FORCE_INLINE char *memRowEnd(SMemRow row) { ...@@ -670,7 +678,7 @@ static FORCE_INLINE char *memRowEnd(SMemRow row) {
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r)) #define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
SMemRow tdMemRowDup(SMemRow row); SMemRow tdMemRowDup(SMemRow row);
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull); void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, bool isAppend);
// NOTE: offset here including the header size // NOTE: offset here including the header size
static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int16_t colId, int8_t colType, uint16_t offset) { static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int16_t colId, int8_t colType, uint16_t offset) {
......
...@@ -240,7 +240,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { ...@@ -240,7 +240,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
pDataCol->len = 0; pDataCol->len = 0;
} }
// value from timestamp should be TKEY here instead of TSKEY // value from timestamp should be TKEY here instead of TSKEY
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { int dataColAppendValEx(SDataCol *pCol, const void *value, int numOfRows, int maxPoints, bool isAppend) {
ASSERT(pCol != NULL && value != NULL); ASSERT(pCol != NULL && value != NULL);
if (isAllRowsNull(pCol)) { if (isAllRowsNull(pCol)) {
...@@ -264,9 +264,15 @@ int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPo ...@@ -264,9 +264,15 @@ int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPo
// Update the length // Update the length
pCol->len += varDataTLen(value); pCol->len += varDataTLen(value);
} else { } else {
if (isAppend) { // append the value
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes; pCol->len += pCol->bytes;
} else {
// update the value of last row by increasing the pCol->len and keeping the original numOfRows
ASSERT((pCol->len == TYPE_BYTES[pCol->type] * (numOfRows + 1)) && (pCol->len > TYPE_BYTES[pCol->type]));
memcpy(POINTER_SHIFT(pCol->pData, pCol->len - TYPE_BYTES[pCol->type]), value, pCol->bytes);
}
} }
return 0; return 0;
} }
...@@ -441,7 +447,8 @@ void tdResetDataCols(SDataCols *pCols) { ...@@ -441,7 +447,8 @@ void tdResetDataCols(SDataCols *pCols) {
} }
} }
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull,
bool isAppend) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row)); ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
int rcol = 0; int rcol = 0;
...@@ -451,7 +458,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -451,7 +458,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
bool setCol = 0; bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) { if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendValEx(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, isAppend);
dcol++; dcol++;
continue; continue;
} }
...@@ -460,14 +467,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -460,14 +467,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
if (pRowCol->colId == pDataCol->colId) { if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1; if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dataColAppendValEx(pDataCol, value, pCols->numOfRows, pCols->maxPoints, isAppend);
dcol++; dcol++;
rcol++; rcol++;
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; rcol++;
} else { } else {
if(forceSetNull || setCol) { if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendValEx(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, isAppend);
} }
dcol++; dcol++;
} }
...@@ -475,7 +482,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -475,7 +482,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
pCols->numOfRows++; pCols->numOfRows++;
} }
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, bool isAppend) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row)); ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
int rcol = 0; int rcol = 0;
...@@ -487,7 +494,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -487,7 +494,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
bool setCol = 0; bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendValEx(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, isAppend);
++dcol; ++dcol;
continue; continue;
} }
...@@ -497,14 +504,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -497,14 +504,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
if (colIdx->colId == pDataCol->colId) { if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset); void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1; if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dataColAppendValEx(pDataCol, value, pCols->numOfRows, pCols->maxPoints, isAppend);
++dcol; ++dcol;
++rcol; ++rcol;
} else if (colIdx->colId < pDataCol->colId) { } else if (colIdx->colId < pDataCol->colId) {
++rcol; ++rcol;
} else { } else {
if (forceSetNull || setCol) { if (forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendValEx(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, isAppend);
} }
++dcol; ++dcol;
} }
...@@ -512,11 +519,11 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -512,11 +519,11 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
pCols->numOfRows++; pCols->numOfRows++;
} }
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, bool isAppend) {
if (isDataRow(row)) { if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull); tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull, isAppend);
} else if (isKvRow(row)) { } else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull); tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull, isAppend);
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
...@@ -1288,7 +1288,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1288,7 +1288,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
tdAppendMemRowToDataCol(row, pSchema, pTarget, true); tdAppendMemRowToDataCol(row, pSchema, pTarget, true, true);
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
} else { } else {
...@@ -1310,7 +1310,8 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1310,7 +1310,8 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE,
update != TD_ROW_PARTIAL_UPDATE);
} }
(*iter)++; (*iter)++;
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
......
...@@ -589,7 +589,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * ...@@ -589,7 +589,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
} }
} }
tdAppendMemRowToDataCol(row, *ppSchema, pCols, true); tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, true);
} }
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册