提交 ea87f4f9 编写于 作者: L Liu Jicong

[TD-6474]<hotfix>: fix coredump when merging empty block

上级 061fe86a
...@@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
int dcol = 0; int dcol = 0;
while (dcol < pCols->numOfCols) { while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) { if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
...@@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
STColumn *pRowCol = schemaColAt(pSchema, rcol); STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) { if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
rcol++; rcol++;
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; rcol++;
} else { } else {
if(forceSetNull) { if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
} }
dcol++; dcol++;
...@@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
int nRowCols = kvRowNCols(row); int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) { while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
...@@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
if (colIdx->colId == pDataCol->colId) { if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset); void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
++dcol; ++dcol;
++rcol; ++rcol;
} else if (colIdx->colId < pDataCol->colId) { } else if (colIdx->colId < pDataCol->colId) {
++rcol; ++rcol;
} else { } else {
if (forceSetNull) { if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
} }
++dcol; ++dcol;
...@@ -518,7 +522,6 @@ void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, b ...@@ -518,7 +522,6 @@ void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, b
} }
} }
//TODO: refactor this function to eliminate additional memory copy
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfCols == source->numOfCols); ASSERT(target->numOfCols == source->numOfCols);
...@@ -534,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * ...@@ -534,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) { for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) { for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0) { if (source->cols[j].len > 0 || target->cols[j].len > 0) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows, dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
target->maxPoints); target->maxPoints);
} }
...@@ -578,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i ...@@ -578,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
if (key1 < key2) { if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) { for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type); ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0) { if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints); target->maxPoints);
} }
...@@ -596,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i ...@@ -596,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints); target->maxPoints);
} else if(target->cols[i].len > 0) {
dataColSetNullAt(&target->cols[i], target->numOfRows);
} }
} }
target->numOfRows++; target->numOfRows++;
......
...@@ -1418,13 +1418,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1418,13 +1418,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
while (true) { while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
bool isRowDel = false;
SMemRow row = tsdbNextIterRow(pCommitIter->pIter); SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
if (row == NULL || memRowKey(row) > maxKey) { if (row == NULL || memRowKey(row) > maxKey) {
key2 = INT64_MAX; key2 = INT64_MAX;
} else { } else {
key2 = memRowKey(row); key2 = memRowKey(row);
isRowDel = memRowDeleted(row);
} }
if (key1 == INT64_MAX && key2 == INT64_MAX) break; if (key1 == INT64_MAX && key2 == INT64_MAX) break;
...@@ -1439,36 +1437,33 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1439,36 +1437,33 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
pTarget->numOfRows++; pTarget->numOfRows++;
(*iter)++; (*iter)++;
} else if (key1 > key2) { } else if (key1 > key2) {
if (!isRowDel) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
tdAppendMemRowToDataCol(row, pSchema, pTarget, true); tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
}
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
} else { } else {
if (update) { if (update != TD_ROW_OVERWRITE_UPDATE) {
if (!isRowDel) { //copy disk data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
} else {
ASSERT(!isRowDel);
for (int i = 0; i < pDataCols->numOfCols; i++) { for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail //TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints); pTarget->maxPoints);
} }
pTarget->numOfRows++; if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
}
if (update != TD_ROW_DISCARD_UPDATE) {
//copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
} }
(*iter)++; (*iter)++;
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册