未验证 提交 dbbd3206 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #11386 from taosdata/feature/TD-11463-3.0

feat: update time series data
......@@ -57,6 +57,8 @@ typedef enum {
TD_ROW_PARTIAL_UPDATE = 2,
} TDUpdateConfig;
#define TD_SUPPORT_UPDATE(u) ((u) > 0)
typedef enum {
TSDB_STATIS_OK = 0, // statis part exist and load successfully
TSDB_STATIS_NONE = 1, // statis part not exist
......
......@@ -482,7 +482,7 @@ void tdResetDataCols(SDataCols *pCols);
int32_t tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
SDataCols *tdFreeDataCols(SDataCols *pCols);
int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull);
int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull, TDRowVerT maxVer);
// ----------------- K-V data row structure
/* |<-------------------------------------- len -------------------------------------------->|
......
......@@ -230,7 +230,7 @@ static FORCE_INLINE int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowVa
static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val,
bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset,
col_id_t colId);
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull);
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols);
/**
* @brief
......
......@@ -27,6 +27,7 @@ extern "C" {
typedef int32_t VarDataOffsetT;
typedef uint32_t TDRowLenT;
typedef uint8_t TDRowValT;
typedef uint64_t TDRowVerT;
typedef int16_t col_id_t;
typedef int8_t col_type_t;
typedef int32_t col_bytes_t;
......
......@@ -411,6 +411,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
#endif
pCols->numOfRows = 0;
pCols->bitmapMode = 0;
pCols->numOfCols = schemaNCols(pSchema);
for (i = 0; i < schemaNCols(pSchema); ++i) {
......
......@@ -385,7 +385,6 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int
pCol->len += pCol->bytes;
}
#ifdef TD_SUPPORT_BITMAP
tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode);
#endif
return 0;
......@@ -486,7 +485,7 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
* @param pCols
* @param forceSetNull
*/
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) {
if (TD_IS_TP_ROW(pRow)) {
return tdAppendTpRowToDataCol(pRow, pSchema, pCols);
} else if (TD_IS_KV_ROW(pRow)) {
......@@ -497,7 +496,7 @@ int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCol
return TSDB_CODE_SUCCESS;
}
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull, TDRowVerT maxVer) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfCols == source->numOfCols);
int offset = 0;
......@@ -510,6 +509,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) { // No overlap
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
// TODO: filter the maxVer
for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
......@@ -555,9 +555,9 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
// TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);
ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
// TODO: filter the maxVer
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) {
for (int i = 0; i < src1->numOfCols; ++i) {
ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
SCellVal sVal = {0};
......@@ -568,12 +568,12 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
}
}
target->numOfRows++;
(*iter1)++;
++target->numOfRows;
++(*iter1);
} else if (key1 >= key2) {
// if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
if ((key1 > key2) || (key1 == key2)) {
for (int i = 0; i < src2->numOfCols; i++) {
// TODO: filter the maxVer
if ((key1 > key2) || ((key1 == key2) && !TKEY_IS_DELETED(key2))) {
for (int i = 0; i < src2->numOfCols; ++i) {
SCellVal sVal = {0};
ASSERT(target->cols[i].type == src2->cols[i].type);
if (tdGetColDataOfRow(&sVal, src2->cols + i, *iter2, src2->bitmapMode) < 0) {
......@@ -590,11 +590,11 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
dataColSetNullAt(&target->cols[i], target->numOfRows, true, target->bitmapMode);
}
}
target->numOfRows++;
++target->numOfRows;
}
(*iter2)++;
if (key1 == key2) (*iter1)++;
++(*iter2);
if (key1 == key2) ++(*iter1);
}
ASSERT(target->numOfRows <= target->maxPoints);
......
......@@ -1678,10 +1678,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL);
}
tdAppendSTSRowToDataCol(row, pSchema, pTarget, true);
tdAppendSTSRowToDataCol(row, pSchema, pTarget);
tSkipListIterNext(pCommitIter->pIter);
} else {
#if 0
if (update != TD_ROW_OVERWRITE_UPDATE) {
// copy disk data
for (int i = 0; i < pDataCols->numOfCols; ++i) {
......@@ -1706,6 +1707,43 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
}
++(*iter);
tSkipListIterNext(pCommitIter->pIter);
#endif
// copy disk data
for (int i = 0; i < pDataCols->numOfCols; ++i) {
SCellVal sVal = {0};
if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) {
TASSERT(0);
}
// TODO: tdAppendValToDataCol may fail
tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints,
pTarget->bitmapMode);
}
if (TD_SUPPORT_UPDATE(update)) {
// copy mem data(Multi-Version)
if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row));
ASSERT(pSchema != NULL);
}
// TODO: merge with Multi-Version
STSRow *curRow = row;
++(*iter);
tSkipListIterNext(pCommitIter->pIter);
STSRow *nextRow = tsdbNextIterRow(pCommitIter->pIter);
if (key2 < TD_ROW_KEY(nextRow)) {
tdAppendSTSRowToDataCol(row, pSchema, pTarget);
} else {
tdAppendSTSRowToDataCol(row, pSchema, pTarget);
}
// TODO: merge with Multi-Version
} else {
++pTarget->numOfRows;
++(*iter);
tSkipListIterNext(pCommitIter->pIter);
}
}
if (pTarget->numOfRows >= maxRows) break;
......
......@@ -475,7 +475,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
}
}
tdAppendSTSRowToDataCol(row, *ppSchema, pCols, true);
tdAppendSTSRowToDataCol(row, *ppSchema, pCols);
}
return 0;
......
......@@ -761,6 +761,7 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
TSKEY r2 = TD_ROW_KEY(rimem);
if (r1 == r2) {
#if 0
if(update == TD_ROW_DISCARD_UPDATE){
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
tSkipListIterNext(pCheckInfo->iter);
......@@ -771,6 +772,13 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
}
#endif
if (TD_SUPPORT_UPDATE(update)) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
tSkipListIterNext(pCheckInfo->iter);
}
return r1;
} else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
......
......@@ -263,8 +263,9 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++;
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
update != TD_ROW_PARTIAL_UPDATE) < 0)
update != TD_ROW_PARTIAL_UPDATE, UINT64_MAX) < 0)
return -1;
}
......@@ -293,8 +294,9 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++;
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
update != TD_ROW_PARTIAL_UPDATE) < 0)
update != TD_ROW_PARTIAL_UPDATE, UINT64_MAX) < 0)
return -1;
}
......@@ -304,6 +306,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
if (pDataCol->bitmap) {
ASSERT(pDataCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID);
tdMergeBitmap(pDataCol->pBitmap, TD_BITMAP_BYTES(pReadh->pDCols[0]->numOfRows), pDataCol->pBitmap);
tdDataColsSetBitmapI(pReadh->pDCols[0]);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册