未验证 提交 b027a85b 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #7717 from taosdata/hotfix/out-of-order-data

problem related to issue-7666
......@@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
int dcol = 0;
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
......@@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++;
rcol++;
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
if(forceSetNull) {
if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
dcol++;
......@@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
......@@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
++dcol;
++rcol;
} else if (colIdx->colId < pDataCol->colId) {
++rcol;
} else {
if (forceSetNull) {
if (forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
++dcol;
......@@ -533,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0) {
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
target->maxPoints);
}
......@@ -577,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0) {
if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
}
......@@ -595,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
} else if(target->cols[i].len > 0) {
dataColSetNullAt(&target->cols[i], target->numOfRows);
}
}
target->numOfRows++;
......
......@@ -1263,13 +1263,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
bool isRowDel = false;
SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
if (row == NULL || memRowKey(row) > maxKey) {
key2 = INT64_MAX;
} else {
key2 = memRowKey(row);
isRowDel = memRowDeleted(row);
}
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
......@@ -1284,36 +1282,33 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
pTarget->numOfRows++;
(*iter)++;
} else if (key1 > key2) {
if (!isRowDel) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
tSkipListIterNext(pCommitIter->pIter);
} else {
if (update) {
if (!isRowDel) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
} else {
ASSERT(!isRowDel);
if (update != TD_ROW_OVERWRITE_UPDATE) {
//copy disk data
for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
pTarget->numOfRows++;
if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
}
if (update != TD_ROW_DISCARD_UPDATE) {
//copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
(*iter)++;
tSkipListIterNext(pCommitIter->pIter);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册