提交 27a74a56 编写于 作者: C Cary Xu

feat: merge rows in mem/file during commit for update

上级 bcdf9c19
......@@ -466,7 +466,7 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes;
}
} else {
} else if (!tdValTypeIsNone(valType)) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
// keep the last offset
// discard the last var data
......@@ -483,7 +483,9 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int
}
#ifdef TD_SUPPORT_BITMAP
tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode);
if (!isMerge || !tdValTypeIsNone(valType)) {
tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode);
}
#endif
return 0;
}
......@@ -533,7 +535,9 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
++dcol;
}
}
#if 0
++pCols->numOfRows;
#endif
return TSDB_CODE_SUCCESS;
}
......@@ -584,7 +588,9 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
++dcol;
}
}
#if 0
++pCols->numOfRows;
#endif
return TSDB_CODE_SUCCESS;
}
......@@ -698,7 +704,7 @@ static void tdAppendValToDataCols(SDataCols *target, SDataCols *src, int iter, b
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
target->bitmapMode, isMerge);
} else {
// Keep the origi value for None
// Keep the origin value for None
}
} else {
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
......
......@@ -1330,13 +1330,15 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
TSKEY maxKey, int maxRows, int8_t update) {
TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX;
TSKEY lastKey = TSKEY_INITIAL_VAL;
STSchema *pSchema = NULL;
ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
tdResetDataCols(pTarget);
pTarget->bitmapMode = pDataCols->bitmapMode;
// TODO: filter Multi-Version
// TODO: support delete function
while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
STSRow *row = tsdbNextIterRow(pCommitIter->pIter);
......@@ -1349,6 +1351,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
if (key1 < key2) {
if (lastKey != TSKEY_INITIAL_VAL) {
++pTarget->numOfRows;
}
for (int i = 0; i < pDataCols->numOfCols; ++i) {
// TODO: dataColAppendVal may fail
SCellVal sVal = {0};
......@@ -1359,7 +1364,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
pTarget->bitmapMode, false);
}
++pTarget->numOfRows;
lastKey = key1;
++(*iter);
} else if (key1 > key2) {
if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
......@@ -1367,7 +1372,17 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL);
}
tdAppendSTSRowToDataCol(row, pSchema, pTarget, false);
if (key2 == lastKey) {
if (TD_SUPPORT_UPDATE(update)) {
tdAppendSTSRowToDataCol(row, pSchema, pTarget, true);
}
} else {
if (lastKey != TSKEY_INITIAL_VAL) {
++pTarget->numOfRows;
}
tdAppendSTSRowToDataCol(row, pSchema, pTarget, false);
lastKey = key2;
}
tSkipListIterNext(pCommitIter->pIter);
} else {
......@@ -1397,6 +1412,12 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
++(*iter);
tSkipListIterNext(pCommitIter->pIter);
#endif
if(lastKey != key1) {
lastKey = key1;
++pTarget->numOfRows;
}
// copy disk data
for (int i = 0; i < pDataCols->numOfCols; ++i) {
SCellVal sVal = {0};
......@@ -1416,26 +1437,17 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
}
// TODO: merge with Multi-Version
STSRow *curRow = row;
++(*iter);
tSkipListIterNext(pCommitIter->pIter);
STSRow *nextRow = tsdbNextIterRow(pCommitIter->pIter);
if (key2 < TD_ROW_KEY(nextRow)) {
tdAppendSTSRowToDataCol(row, pSchema, pTarget, false);
} else {
tdAppendSTSRowToDataCol(row, pSchema, pTarget, false);
}
// TODO: merge with Multi-Version
} else {
++pTarget->numOfRows;
++(*iter);
tSkipListIterNext(pCommitIter->pIter);
tdAppendSTSRowToDataCol(row, pSchema, pTarget, true);
}
++(*iter);
tSkipListIterNext(pCommitIter->pIter);
}
if (pTarget->numOfRows >= maxRows) break;
if (pTarget->numOfRows >= (maxRows - 1)) break;
}
if (lastKey != TSKEY_INITIAL_VAL) {
++pTarget->numOfRows;
}
}
......
......@@ -20,7 +20,7 @@ static void tsdbFreeTbData(STbData *pTbData);
static char *tsdbGetTsTupleKey(const void *data);
static int tsdbTbDataComp(const void *arg1, const void *arg2);
static char *tsdbTbDataGetUid(const void *arg);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge);
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
STsdbMemTable *pMemTable;
......@@ -82,15 +82,19 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
if (pIter == NULL) return 0;
STSchema *pSchema = NULL;
TSKEY rowKey = 0;
TSKEY fKey = 0;
STSchema *pSchema = NULL;
TSKEY rowKey = 0;
TSKEY fKey = 0;
// only fetch lastKey from mem data as file data not used in this function actually
TSKEY lastKey = TSKEY_INITIAL_VAL;
bool isRowDel = false;
int filterIter = 0;
STSRow *row = NULL;
SMergeInfo mInfo;
// TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
// query handle)
if (pMergeInfo == NULL) pMergeInfo = &mInfo;
memset(pMergeInfo, 0, sizeof(*pMergeInfo));
......@@ -190,21 +194,66 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
}
}
#endif
} else { // fkey >= rowKey
#if 1
} else if (fKey > rowKey) {
if (isRowDel) {
// TODO: support delete function
pMergeInfo->rowsDeleteFailed++;
} else {
if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
if (lastKey != rowKey) {
pMergeInfo->rowsInserted++;
pMergeInfo->nOperations++;
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
if (pCols) {
if (lastKey != TSKEY_INITIAL_VAL) {
++pCols->numOfRows;
}
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
}
lastKey = rowKey;
} else {
if (keepDup) {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
} else {
// discard
}
}
}
tSkipListIterNext(pIter);
row = tsdbNextIterRow(pIter);
if (row == NULL || TD_ROW_KEY(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey = TD_ROW_KEY(row);
isRowDel = TD_ROW_IS_DELETED(row);
}
} else { // fkey == rowKey
if (isRowDel) { // TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
ASSERT(!keepDup);
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsDeleteSucceed++;
pMergeInfo->nOperations++;
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
} else {
if (keepDup) {
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsUpdated++;
pMergeInfo->nOperations++;
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
if (lastKey != rowKey) {
pMergeInfo->rowsUpdated++;
pMergeInfo->nOperations++;
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
lastKey = rowKey;
++pCols->numOfRows;
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
} else {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
}
} else {
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
......@@ -228,6 +277,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
fKey = tdGetKey(filterKeys[filterIter]);
}
}
#endif
}
if (lastKey != TSKEY_INITIAL_VAL) {
++pCols->numOfRows;
}
return 0;
......@@ -301,8 +354,8 @@ static STbData *tsdbNewTbData(tb_uid_t uid) {
pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY,
tsdbGetTsTupleKey);
#endif
pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY,
tsdbGetTsTupleKey);
pTbData->pData =
tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY, tsdbGetTsTupleKey);
if (pTbData->pData == NULL) {
taosMemoryFree(pTbData);
return NULL;
......@@ -337,7 +390,7 @@ static char *tsdbTbDataGetUid(const void *arg) {
STbData *pTbData = (STbData *)arg;
return (char *)(&(pTbData->uid));
}
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row) {
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge) {
if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, TD_ROW_SVER(row));
......@@ -347,7 +400,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
}
}
tdAppendSTSRowToDataCol(row, *ppSchema, pCols, false);
tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge);
}
return 0;
......
......@@ -102,6 +102,7 @@ print ====> $data60 $data61 $data62 $data63 $data64 $data65
print ====> $data70 $data71 $data72 $data73 $data74 $data75
print ====> $data80 $data81 $data82 $data83 $data84 $data85
print ====> $data90 $data91 $data92 $data93 $data94 $data95
print ====> rows = $rows and rowNum = $rowNum for ct1
if $rows != $rowNum then
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册