From bbf367772976725144942c9b7367abe644c098af Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 27 Sep 2020 10:27:53 +0800 Subject: [PATCH] TD-1548 --- src/common/inc/tdataformat.h | 74 +++++-- src/common/src/tdataformat.c | 69 ++++--- src/inc/taosdef.h | 1 + src/tsdb/inc/tsdbMain.h | 22 +- src/tsdb/src/tsdbMain.c | 2 +- src/tsdb/src/tsdbMemTable.c | 198 +++++++++++++----- src/tsdb/src/tsdbMeta.c | 11 +- src/tsdb/src/tsdbRWHelper.c | 385 ++++++++++++++++++++--------------- src/util/inc/tskiplist.h | 22 +- src/util/src/tskiplist.c | 87 +++----- 10 files changed, 518 insertions(+), 353 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index aa39d5b0b2..8124075e6c 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -119,6 +119,33 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); +// ----------------- Semantic timestamp key definition +typedef uint64_t TKEY; + +#define TKEY_INVALID UINT64_MAX +#define TKEY_NULL TKEY_INVALID +#define TKEY_NEGATIVE_FLAG (((TKEY)1) << (sizeof(TKEY) * 8 - 1)) +#define TKEY_DELETE_FLAG (((TKEY)1) << (sizeof(TKEY) * 8 - 2)) +#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG | TKEY_DELETE_FLAG)) + +#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0) +#define TKEY_IS_DELETED(tkey) (((tkey)&TKEY_DELETE_FLAG) != 0) +#define tdSetTKEYDeleted(tkey) ((tkey) | TKEY_DELETE_FLAG) +#define tdGetTKEY(key) (((TKEY)ABS(key)) | (TKEY_NEGATIVE_FLAG & (TKEY)(key))) +#define tdGetKey(tkey) (((TSKEY)((tkey)&TKEY_VALUE_FILTER)) * (TKEY_IS_NEGATIVE(tkey) ? -1 : 1)) + +static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { + TSKEY key1 = tdGetKey(*(TKEY *)tkey1); + TSKEY key2 = tdGetKey(*(TKEY *)tkey2); + + if (key1 < key2) { + return -1; + } else if (key1 > key2) { + return 1; + } else { + return 0; + } +} // ----------------- Data row structure /* A data row, the format is like below: @@ -129,6 +156,8 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); * +----------+----------+---------------------------------+---------------------------------+ * | len | sversion | First part | Second part | * +----------+----------+---------------------------------+---------------------------------+ + * + * NOTE: timestamp in this row structure is TKEY instead of TSKEY */ typedef void *SDataRow; @@ -137,11 +166,13 @@ typedef void *SDataRow; #define dataRowLen(r) (*(uint16_t *)(r)) #define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)) #define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE) -#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) +#define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r))) +#define dataRowKey(r) tdGetKey(dataRowTKey(r)) #define dataRowSetLen(r, l) (dataRowLen(r) = (l)) #define dataRowSetVersion(r, v) (dataRowVersion(r) = (v)) #define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) #define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) +#define dataRowDeleted(r) TKEY_IS_DELETED(dataRowTKey(r)) SDataRow tdNewDataRowFromSchema(STSchema *pSchema); void tdFreeDataRow(SDataRow row); @@ -154,16 +185,18 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row)); - switch (type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - *(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row); - memcpy(ptr, value, varDataTLen(value)); - dataRowLen(row) += varDataTLen(value); - break; - default: + if (IS_VAR_DATA_TYPE(type)) { + *(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row); + memcpy(ptr, value, varDataTLen(value)); + dataRowLen(row) += varDataTLen(value); + } else { + if (offset == 0) { + ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP); + TKEY tvalue = tdGetTKEY(*(TSKEY *)value); + memcpy(POINTER_SHIFT(row, toffset), &tvalue, TYPE_BYTES[type]); + } else { memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]); - break; + } } return 0; @@ -171,12 +204,10 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i // NOTE: offset here including the header size static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) { - switch (type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset)); - default: - return POINTER_SHIFT(row, offset); + if (IS_VAR_DATA_TYPE(type)) { + return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset)); + } else { + return POINTER_SHIFT(row, offset); } } @@ -243,9 +274,14 @@ typedef struct { } SDataCols; #define keyCol(pCols) (&((pCols)->cols[0])) // Key column -#define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)] -#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) -#define dataColsKeyLast(pCols) ((pCols->numOfRows == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfRows - 1)) +#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] +#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx)) +#define dataColsTKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, 0) +#define dataColsKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TSDB_DATA_BIGINT_NULL : dataColsKeyAt(pCols, 0) +#define dataColsTKeyLast(pCols) \ + (((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, (pCols)->numOfRows - 1)) +#define dataColsKeyLast(pCols) \ + (((pCols)->numOfRows == 0) ? TSDB_DATA_BIGINT_NULL : dataColsKeyAt(pCols, (pCols)->numOfRows - 1)) SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 0c446b6d4f..b45cbd796d 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -423,30 +423,41 @@ void tdResetDataCols(SDataCols *pCols) { } void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) { - ASSERT(dataColsKeyLast(pCols) < dataRowKey(row)); + ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row)); int rcol = 0; int dcol = 0; - while (dcol < pCols->numOfCols) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (rcol >= schemaNCols(pSchema)) { - dataColSetNullAt(pDataCol, pCols->numOfRows); - dcol++; - continue; + if (dataRowDeleted(row)) { + for (; dcol < pCols->numOfCols; dcol++) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (dcol == 0) { + dataColAppendVal(pDataCol, dataRowTuple(row), pCols->numOfRows, pCols->maxPoints); + } else { + dataColSetNullAt(pDataCol, pCols->numOfRows); + } } + } else { + while (dcol < pCols->numOfCols) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= schemaNCols(pSchema)) { + dataColSetNullAt(pDataCol, pCols->numOfRows); + dcol++; + continue; + } - STColumn *pRowCol = schemaColAt(pSchema, rcol); - if (pRowCol->colId == pDataCol->colId) { - void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset+TD_DATA_ROW_HEAD_SIZE); - dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); - dcol++; - rcol++; - } else if (pRowCol->colId < pDataCol->colId) { - rcol++; - } else { - dataColSetNullAt(pDataCol, pCols->numOfRows); - dcol++; + STColumn *pRowCol = schemaColAt(pSchema, rcol); + if (pRowCol->colId == pDataCol->colId) { + void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + dcol++; + rcol++; + } else if (pRowCol->colId < pDataCol->colId) { + rcol++; + } else { + dataColSetNullAt(pDataCol, pCols->numOfRows); + dcol++; + } } } pCols->numOfRows++; @@ -511,8 +522,12 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i while (target->numOfRows < tRows) { if (*iter1 >= limit1 && *iter2 >= limit2) break; - TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; - TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; + TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1); + TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1); + TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2); + TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2); + + ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1))); if (key1 < key2) { for (int i = 0; i < src1->numOfCols; i++) { @@ -525,12 +540,14 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i target->numOfRows++; (*iter1)++; - } else { - for (int i = 0; i < src2->numOfCols; i++) { - ASSERT(target->cols[i].type == src2->cols[i].type); - if (src2->cols[i].len > 0) { - dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, - target->maxPoints); + } else if (key1 >= key2) { + if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) { + for (int i = 0; i < src2->numOfCols; i++) { + ASSERT(target->cols[i].type == src2->cols[i].type); + if (src2->cols[i].len > 0) { + dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, + target->maxPoints); + } } } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index e9aa6a7050..a6cf76c473 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -75,6 +75,7 @@ extern const int32_t TYPE_BYTES[11]; #define TSDB_DATA_SMALLINT_NULL 0x8000 #define TSDB_DATA_INT_NULL 0x80000000 #define TSDB_DATA_BIGINT_NULL 0x8000000000000000L +#define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL #define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN #define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index b515752bc3..7d40d7f00a 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -320,6 +320,15 @@ typedef struct { void* compBuffer; // Buffer for temperary compress/decompress purpose } SRWHelper; +typedef struct { + int rowsInserted; + int rowsUpdated; + int rowsDeleteSucceed; + int rowsDeleteFailed; + int nOperations; + TSKEY keyFirst; + TSKEY keyLast; +} SMergeInfo; // ------------------ tsdbScan.c typedef struct { SFileGroup fGroup; @@ -422,7 +431,7 @@ void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); // ------------------ tsdbMemTable.c -int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); +int tsdbUpdateRowInMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); @@ -430,7 +439,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem) void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, - TSKEY* filterKeys, int nFilterKeys, bool keepDup); + TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { if (pIter == NULL) return NULL; @@ -443,11 +452,18 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { SDataRow row = tsdbNextIterRow(pIter); - if (row == NULL) return -1; + if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; return dataRowKey(row); } +static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) { + SDataRow row = tsdbNextIterRow(pIter); + if (row == NULL) return TKEY_NULL; + + return dataRowTKey(row); +} + static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { ASSERT(pRepo != NULL); if (pRepo->mem == NULL) return NULL; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 494f17807e..a0ff1a6b95 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -765,7 +765,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY return -1; } - if (tsdbInsertRowToMem(pRepo, row, pTable) < 0) return -1; + if (tsdbUpdateRowInMem(pRepo, row, pTable) < 0) return -1; (*affectedrows)++; points++; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index e139b29d5e..cd688d46a7 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -32,14 +32,31 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row); // ---------------- INTERNAL FUNCTIONS ---------------- -int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { +int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { STsdbCfg * pCfg = &pRepo->config; STsdbMeta * pMeta = pRepo->tsdbMeta; + TKEY tkey = dataRowTKey(row); TSKEY key = dataRowKey(row); SMemTable * pMemTable = pRepo->mem; STableData *pTableData = NULL; + bool isRowDelete = TKEY_IS_DELETED(tkey); + + if (isRowDelete) { + if (!pCfg->update) { + tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo)); + terrno = TSDB_CODE_TDB_INVALID_ACTION; + return -1; + } + + if (key > TABLE_LASTKEY(pTable)) { + tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64, + REPO_ID(pRepo), key, TABLE_LASTKEY(pTable)); + return 0; + } + } void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); if (pRow == NULL) { @@ -88,8 +105,10 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { if (tSkipListPut(pTableData->pData, pRow) == NULL) { tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); } else { + // TODO: may need to refact here int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize; - if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; + if ((!isRowDelete) && (TABLE_LASTKEY(pTable) < key)) TABLE_LASTKEY(pTable) = key; + if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; if (pMemTable->keyLast < key) pMemTable->keyLast = key; pMemTable->numOfRows += deltaSize; @@ -99,8 +118,9 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { pTableData->numOfRows += deltaSize; } - tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), - TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), key); + tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), + isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), + key); return 0; } @@ -270,66 +290,120 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { return 0; } +/** + * This is an important function to load data or try to load data from memory skiplist iterator. + * + * This function load memory data until: + * 1. iterator ends + * 2. data key exceeds maxKey + * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead + * 4. operations in pCols not exceeds its max capacity if pCols is given + * + * The function try to move as mush as possible. + */ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, - TSKEY *filterKeys, int nFilterKeys, bool keepDup) { - ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); + TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { + ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0 && pMergeInfo != NULL); if (pIter == NULL) return 0; STSchema *pSchema = NULL; - int numOfRows = 0; - TSKEY keyNext = 0; + TSKEY rowKey = 0; + TSKEY fKey = 0; + bool isRowDel = false; int filterIter = 0; + SDataRow row = NULL; + + memset(pMergeInfo, 0, sizeof(*pMergeInfo)); + pMergeInfo->keyFirst = INT64_MAX; + pMergeInfo->keyLast = INT64_MIN; + + row = tsdbNextIterRow(pIter); + if (row == NULL || dataRowKey(row) > maxKey) { + rowKey = INT64_MAX; + } else { + rowKey = dataRowKey(row); + isRowDel = dataRowDeleted(row); + } + + if (nFilterKeys == 0 || filterIter >= nFilterKeys) { + fKey = INT64_MAX; + } else { + fKey = tdGetKey(filterKeys[filterIter]); + } + + while (true) { + if (fKey == INT64_MAX && rowKey == INT64_MAX) break; + + if (fKey < rowKey) { + pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey); + pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey); + + filterIter++; + if (filterIter >= nFilterKeys) { + fKey = INT64_MAX; + } else { + fKey = tdGetKey(filterKeys[filterIter]); + } + } else if (fKey > rowKey) { + if (isRowDel) { + pMergeInfo->rowsDeleteFailed++; + } else { + if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break; + if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + pMergeInfo->rowsInserted++; + pMergeInfo->nOperations++; + pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey); + pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey); + tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row); + } - if (nFilterKeys != 0) { // for filter purpose - ASSERT(filterKeys != NULL); - keyNext = tsdbNextIterKey(pIter); - if (keyNext < 0 || keyNext > maxKey) return numOfRows; - void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE); - filterIter = (ptr == NULL) ? nFilterKeys : (int)((POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY))); - } - - do { - SDataRow row = tsdbNextIterRow(pIter); - if (row == NULL) break; - - keyNext = dataRowKey(row); - if (keyNext > maxKey) break; - - bool keyFiltered = false; - if (nFilterKeys != 0) { - while (true) { - if (filterIter >= nFilterKeys) break; - if (keyNext == filterKeys[filterIter]) { - keyFiltered = true; - filterIter++; - break; - } else if (keyNext < filterKeys[filterIter]) { - break; + tSkipListIterNext(pIter); + row = tsdbNextIterRow(pIter); + if (row == NULL || dataRowKey(row) > maxKey) { + rowKey = INT64_MAX; + } else { + rowKey = dataRowKey(row); + isRowDel = dataRowDeleted(row); + } + } else { + if (isRowDel) { + ASSERT(!keepDup); + if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + pMergeInfo->rowsDeleteSucceed++; + pMergeInfo->nOperations++; + tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row); + } else { + if (keepDup) { + if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + pMergeInfo->rowsUpdated++; + pMergeInfo->nOperations++; + pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey); + pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey); + tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row); } else { - filterIter++; + pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey); + pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey); } } - } - - if (!keyFiltered) { - if (numOfRows >= maxRowsToRead) break; - numOfRows++; - } - if (!keyFiltered || keepDup) { - if (pCols) { - if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); - if (pSchema == NULL) { - ASSERT(0); - } - } + tSkipListIterNext(pIter); + row = tsdbNextIterRow(pIter); + if (row == NULL || dataRowKey(row) > maxKey) { + rowKey = INT64_MAX; + } else { + rowKey = dataRowKey(row); + isRowDel = dataRowDeleted(row); + } - tdAppendDataRowToDataCol(row, pSchema, pCols); + filterIter++; + if (filterIter >= nFilterKeys) { + fKey = INT64_MAX; + } else { + fKey = tdGetKey(filterKeys[filterIter]); } - } - } while (tSkipListIterNext(pIter)); + } + } - return numOfRows; + return 0; } // ---------------- LOCAL FUNCTIONS ---------------- @@ -420,7 +494,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], - pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey); + tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey); if (pTableData->pData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -562,7 +636,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo) { static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { for (int i = 0; i < nIters; i++) { TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); - if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; + if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return 1; } return 0; } @@ -758,5 +832,21 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { taosTFree(tData); + return 0; +} + +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row) { + if (pCols) { + if (*ppSchema == NULL || schemaVersion(*ppSchema) != dataRowVersion(row)) { + *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); + if (*ppSchema == NULL) { + ASSERT(false); + return -1; + } + } + + tdAppendDataRowToDataCol(row, *ppSchema, pCols); + } + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 1914ff08ed..53a1b3f5bf 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -86,7 +86,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { if (pTable != NULL) { tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable)); - return TSDB_CODE_TDB_TABLE_ALREADY_EXIST; + terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; + goto _err; } if (pCfg->type == TSDB_CHILD_TABLE) { @@ -700,7 +701,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) { } pTable->tagVal = NULL; STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); - pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), SL_ALLOW_DUP_KEY, getTagIndexKey); + pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL, SL_ALLOW_DUP_KEY, getTagIndexKey); if (pTable->pIndex == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -745,7 +746,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) { T_REF_INC(pTable); - tsdbTrace("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), + tsdbDebug("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable)); return pTable; @@ -1155,8 +1156,8 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { buf = tdDecodeSchema(buf, &(pTable->tagSchema)); STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); - pTable->pIndex = - tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), SL_ALLOW_DUP_KEY, getTagIndexKey); + pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL, + SL_ALLOW_DUP_KEY, getTagIndexKey); if (pTable->pIndex == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbFreeTable(pTable); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 6c6a738899..c6d4ae7a5e 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -27,6 +27,7 @@ #define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM)) #define TSDB_KEY_COL_OFFSET 0 #define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SCompBlock)) +#define TSDB_IS_LAST_BLOCK(pb) ((pb)->last) static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock, @@ -34,7 +35,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pD static int compareKeyBlock(const void *arg1, const void *arg2); static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize); static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); -static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); +static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo); static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static void tsdbResetHelperFileImpl(SRWHelper *pHelper); static int tsdbInitHelperFile(SRWHelper *pHelper); @@ -61,8 +62,10 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pComp static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock); static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, int *blkIdx); -static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, +static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); +static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SCompBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps); +static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx); // ---------------------- INTERNAL FUNCTIONS ---------------------- int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { @@ -279,7 +282,7 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols while (true) { ASSERT(blkIdx <= (int)pIdx->numOfBlocks); TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); - if (keyFirst < 0 || keyFirst > maxKey) break; // iter over + if (keyFirst == TSDB_DATA_TIMESTAMP_NULL || keyFirst > maxKey) break; // iter over if (pIdx->len <= 0 || keyFirst > pIdx->maxKey) { if (tsdbProcessAppendCommit(pHelper, pCommitIter, pDataCols, maxKey) < 0) return -1; @@ -925,7 +928,7 @@ _err: return -1; } -static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) { +static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo) { ASSERT(pCompBlock->numOfSubBlocks == 0); SCompIdx *pIdx = &(pHelper->curCompIdx); @@ -958,9 +961,9 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId pSCompBlock->numOfSubBlocks++; ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); pSCompBlock->len += sizeof(SCompBlock); - pSCompBlock->numOfRows += rowsAdded; - pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst); - pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast); + pSCompBlock->numOfRows = pSCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; + pSCompBlock->keyFirst = pMergeInfo->keyFirst; + pSCompBlock->keyLast = pMergeInfo->keyLast; pIdx->len += sizeof(SCompBlock); } else { // Need to create two sub-blocks void *ptr = NULL; @@ -989,11 +992,11 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ((SCompBlock *)ptr)[1] = *pCompBlock; pSCompBlock->numOfSubBlocks = 2; - pSCompBlock->numOfRows += rowsAdded; + pSCompBlock->numOfRows = pSCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); pSCompBlock->len = sizeof(SCompBlock) * 2; - pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst); - pSCompBlock->keyLast = MAX(((SCompBlock *)ptr)[0].keyLast, ((SCompBlock *)ptr)[1].keyLast); + pSCompBlock->keyFirst = pMergeInfo->keyFirst; + pSCompBlock->keyLast = pMergeInfo->keyLast; pIdx->len += (sizeof(SCompBlock) * 2); } @@ -1047,6 +1050,45 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int return 0; } +static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx) { + SCompIdx *pCompIdx = &(pHelper->curCompIdx); + + ASSERT(pCompIdx->numOfBlocks > 0 && blkIdx < pCompIdx->numOfBlocks); + + SCompBlock *pCompBlock= blockAtIdx(pHelper, blkIdx); + SCompBlock compBlock = *pCompBlock; + ASSERT(pCompBlock->numOfSubBlocks > 0 && pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); + + if (pCompIdx->numOfBlocks == 1) { + memset(pCompIdx, 0, sizeof(*pCompIdx)); + } else { + int tsize = 0; + + if (compBlock.numOfSubBlocks > 1) { + tsize = pCompIdx->len - (compBlock.offset + sizeof(SCompBlock) * compBlock.numOfSubBlocks); + + ASSERT(tsize > 0); + memmove(POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset), + POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset + sizeof(SCompBlock) * compBlock.numOfSubBlocks), + tsize); + + pCompIdx->len = pCompIdx->len - sizeof(SCompBlock) * compBlock.numOfSubBlocks; + } + + tsize = pCompIdx->len - POINTER_DISTANCE(blockAtIdx(pHelper, blkIdx + 1), pHelper->pCompInfo); + ASSERT(tsize > 0); + memmove((void *)blockAtIdx(pHelper, blkIdx), (void *)blockAtIdx(pHelper, blkIdx + 1), tsize); + + pCompIdx->len -= sizeof(SCompBlock); + + pCompIdx->numOfBlocks--; + pCompIdx->hasLast = blockAtIdx(pHelper, pCompIdx->numOfBlocks - 1)->last; + pCompIdx->maxKey = blockAtIdx(pHelper, pCompIdx->numOfBlocks - 1)->keyLast; + } + + return 0; +} + static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { pHelper->idxH.numOfIdx = 0; pHelper->idxH.curIdx = 0; @@ -1439,12 +1481,14 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { } static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { - STsdbCfg * pCfg = &(pHelper->pRepo->config); - STable * pTable = pCommitIter->pTable; - SCompIdx * pIdx = &(pHelper->curCompIdx); - TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); - int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; - SCompBlock compBlock = {0}; + STsdbCfg * pCfg = &(pHelper->pRepo->config); + STable * pTable = pCommitIter->pTable; + SCompIdx * pIdx = &(pHelper->curCompIdx); + TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); + int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; + SCompBlock compBlock = {0}; + SMergeInfo mergeInfo = {0}; + SMergeInfo *pMergeInfo = &mergeInfo; ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); if (pIdx->hasLast) { // append to with last block @@ -1452,39 +1496,47 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, - pDataCols, NULL, 0, pCfg->update); - ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); - if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && - pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { - if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, pDataCols, + NULL, 0, pCfg->update, pMergeInfo); - if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); + ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows); - if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; - if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; - } + if (pDataCols->numOfRows > 0) { + ASSERT((pMergeInfo->keyFirst == dataColsKeyFirst(pDataCols)) && (pMergeInfo->keyLast == dataColsKeyLast(pDataCols))); - if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + if (pDataCols->numOfRows + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && + pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { + if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, pMergeInfo) < 0) return -1; + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); + + if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); + + if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; + } + + if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + } } else { ASSERT(!pHelper->hasOldLastBlock); tdResetDataCols(pDataCols); - int rowsRead = - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0, pCfg->update); - ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0, pCfg->update, pMergeInfo); + ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows); - if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; - if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; + if (pDataCols->numOfRows > 0) { + ASSERT((pMergeInfo->keyFirst == dataColsKeyFirst(pDataCols)) && (pMergeInfo->keyLast == dataColsKeyLast(pDataCols))); + if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; + } } #ifndef NDEBUG TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter); - ASSERT(keyNext < 0 || keyNext > pIdx->maxKey); + ASSERT(keyNext == TSDB_DATA_TIMESTAMP_NULL || keyNext > pIdx->maxKey); #endif return 0; @@ -1492,13 +1544,15 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, int *blkIdx) { - STsdbCfg * pCfg = &(pHelper->pRepo->config); - STable * pTable = pCommitIter->pTable; - SCompIdx * pIdx = &(pHelper->curCompIdx); - SCompBlock compBlock = {0}; - TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); - int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; - SDataCols *pDataCols0 = pHelper->pDataCols[0]; + STsdbCfg * pCfg = &(pHelper->pRepo->config); + STable * pTable = pCommitIter->pTable; + SCompIdx * pIdx = &(pHelper->curCompIdx); + SCompBlock compBlock = {0}; + TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); + int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; + SDataCols * pDataCols0 = pHelper->pDataCols[0]; + SMergeInfo mergeInfo = {0}; + SMergeInfo *pMergeInfo = &mergeInfo; SSkipListIterator slIter = {0}; @@ -1509,120 +1563,82 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ASSERT(pCompBlock != NULL); int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); - if (pCompBlock->last) { - ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1); + ASSERT((!TSDB_IS_LAST_BLOCK(pCompBlock)) || (tblkIdx == pIdx->numOfBlocks - 1)); + + if ((!TSDB_IS_LAST_BLOCK(pCompBlock)) && keyFirst < pCompBlock->keyFirst) { + // Loop to write data until pCompBlock->keyFirst-1 + while (true) { + tdResetDataCols(pDataCols); + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, pCompBlock->keyLast - 1, defaultRowsInBlock, pDataCols, NULL, 0, + pCfg->update, pMergeInfo); + ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows); + if (pDataCols->numOfRows == 0) break; + + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + tblkIdx++; + } + ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) == TSDB_DATA_TIMESTAMP_NULL || + tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + } else { int16_t colId = 0; - slIter = *(pCommitIter->pIter); if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); - int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; - int rows2 = tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, - pDataCols0->numOfRows, pCfg->update); - if (!pCfg->update && rows2 == 0) { // all data filtered out + TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (blockAtIdx(pHelper, tblkIdx + 1)->keyFirst - 1); + + slIter = *(pCommitIter->pIter); + tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows, + pCfg->update, pMergeInfo); + + if (pMergeInfo->nOperations == 0) { + // Do nothing + ASSERT(pMergeInfo->rowsDeleteFailed > 0); *(pCommitIter->pIter) = slIter; + tblkIdx++; + } else if (pCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) { + // Delete the block and do some stuff + ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN); + if (tsdbDeleteSuperBlock(pHelper, tblkIdx) < 0) return -1; + *pCommitIter->pIter = slIter; + if (pCompBlock->last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + } else if (tsdbCheckAddSubBlockCond(pHelper, pCompBlock, pMergeInfo, pDataCols->maxPoints)) { + // Append as a sub-block of the searched block + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, INT_MAX, pDataCols, pDataCols0->cols[0].pData, + pDataCols0->numOfRows, pCfg->update, pMergeInfo); + ASSERT(memcmp(pCommitIter->pIter, &slIter, sizeof(slIter)) == 0); + if (tsdbWriteBlockToFile(pHelper, pCompBlock->last ? helperLastF(pHelper) : helperDataF(pHelper), pDataCols, + &compBlock, pCompBlock->last, false) < 0) { + return -1; + } + if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, pMergeInfo) < 0) { + return -1; + } + tblkIdx++; } else { - if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock && - pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, - pDataCols0->cols[0].pData, pDataCols0->numOfRows, pCfg->update); - ASSERT(rowsRead == rows2 && rowsRead <= pDataCols->numOfRows && pDataCols->numOfRows > 0); - if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; - tblkIdx++; - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - int round = 0; - int dIter = 0; - while (true) { - int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, - defaultRowsInBlock, pCfg->update); - if (rowsRead == 0) break; + // load the block data, merge with the memory data + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + int round = 0; + int dIter = 0; + while (true) { + tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock, + pCfg->update); + if (pDataCols->numOfRows == 0) break; + if (tblkIdx == pIdx->numOfBlocks - 1) { if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; - if (round == 0) { - if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } else { - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } - - tblkIdx++; - round++; + } else { + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; } - } - if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; - } - } else { - TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); - TSKEY blkKeyFirst = pCompBlock->keyFirst; - TSKEY blkKeyLast = pCompBlock->keyLast; - if (keyFirst < blkKeyFirst) { - while (true) { - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, - NULL, 0, pCfg->update); - if (rowsRead == 0) break; - - ASSERT(rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - tblkIdx++; - } - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } else { - ASSERT(keyFirst <= blkKeyLast); - int16_t colId = 0; - if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - - slIter = *(pCommitIter->pIter); - int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); - int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, - pDataCols0->numOfRows, pCfg->update); - - if (!pCfg->update && rows2 == 0) { // all filtered out - *(pCommitIter->pIter) = slIter; - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } else { - int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0, pCfg->update) + rows2; - - if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { - int rows = (rows1 >= rows3) ? rows3 : rows2; - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, - pDataCols0->cols[0].pData, pDataCols0->numOfRows, pCfg->update); - ASSERT(rowsRead == rows && rowsRead <= pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) - return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; - tblkIdx++; - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + if (round == 0) { + if (pCompBlock->last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - int round = 0; - int dIter = 0; - while (true) { - int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, - defaultRowsInBlock, pCfg->update); - if (rowsRead == 0) break; - - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - if (round == 0) { - if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } else { - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } - - round++; - tblkIdx++; - } - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; } + + round++; + tblkIdx++; } } } @@ -1631,9 +1647,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, return 0; } -static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, +static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update) { - int numOfRows = 0; TSKEY key1 = INT64_MAX; TSKEY key2 = INT64_MAX; STSchema *pSchema = NULL; @@ -1643,39 +1658,60 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte while (true) { key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); + bool isRowDel = false; SDataRow row = tsdbNextIterRow(pCommitIter->pIter); - key2 = (row == NULL || dataRowKey(row) > maxKey) ? INT64_MAX : dataRowKey(row); + if (row == NULL || dataRowKey(row) > maxKey) { + key2 = INT64_MAX; + } else { + key2 = dataRowKey(row); + isRowDel = dataRowDeleted(row); + } if (key1 == INT64_MAX && key2 == INT64_MAX) break; - if ((key1 < key2) || ((!update) && (key1 == key2))) { + if (key1 < key2) { for (int i = 0; i < pDataCols->numOfCols; i++) { dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, pTarget->maxPoints); } + pTarget->numOfRows++; (*iter)++; + } else if (key1 > key2) { + if (!isRowDel) { + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); + ASSERT(pSchema != NULL); + } - if ((!update) && (key1 == key2)) { - tSkipListIterNext(pCommitIter->pIter); + tdAppendDataRowToDataCol(row, pSchema, pTarget); } + + tSkipListIterNext(pCommitIter->pIter); } else { - if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); - ASSERT(pSchema != NULL); - } + if (update) { + if (!isRowDel) { + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); + ASSERT(pSchema != NULL); + } - tdAppendDataRowToDataCol(row, pSchema, pTarget); + tdAppendDataRowToDataCol(row, pSchema, pTarget); + } + } else { + ASSERT(!isRowDel); + + for (int i = 0; i < pDataCols->numOfCols; i++) { + dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, + pTarget->maxPoints); + } + + pTarget->numOfRows++; + } + (*iter)++; tSkipListIterNext(pCommitIter->pIter); - if (key1 == key2) (*iter)++; } - - numOfRows++; - if (numOfRows >= maxRows) break; - ASSERT(numOfRows == pTarget->numOfRows && numOfRows <= pTarget->maxPoints); } - - return numOfRows; } static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock) { @@ -1698,3 +1734,20 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, return 0; } + +static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SCompBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps) { + STsdbCfg *pCfg = &(pHelper->pRepo->config); + int mergeRows = pCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; + + ASSERT(mergeRows > 0); + + if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pMergeInfo->nOperations <= maxOps) { + if (pCompBlock->last) { + if (!TSDB_NLAST_FILE_OPENED(pHelper) && mergeRows < pCfg->minRowsPerFileBlock) return true; + } else { + if (mergeRows < pCfg->maxRowsPerFileBlock) return true; + } + } + + return false; +} \ No newline at end of file diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 030a9d69c1..579483dbff 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -39,16 +39,11 @@ typedef char *(*__sl_key_fn_t)(const void *); typedef struct SSkipListNode { uint8_t level; - uint8_t flags; void * pData; struct SSkipListNode *forwards[]; } SSkipListNode; -#define SL_NODE_DELETED_FLAG (uint8_t)0x1 - #define SL_GET_NODE_DATA(n) (n)->pData -#define SL_IS_NODE_DELETED(n) ((n)->flags & SL_NODE_DELETED_FLAG) -#define SL_SET_NODE_DELETED(n) (n)->flags |= SL_NODE_DELETED_FLAG #define SL_NODE_GET_FORWARD_POINTER(n, l) (n)->forwards[(l)] #define SL_NODE_GET_BACKWARD_POINTER(n, l) (n)->forwards[(n)->level + (l)] @@ -109,8 +104,7 @@ typedef struct SSkipList { uint8_t flags; uint8_t type; // static info above uint8_t level; - uint32_t size; // semantic meaning of size - uint32_t tsize; // # of all skiplist nodes in this SL + uint32_t size; SSkipListNode * pHead; // point to the first element SSkipListNode * pTail; // point to the last element #if SKIP_LIST_RECORD_PERFORMANCE @@ -131,13 +125,13 @@ typedef struct SSkipListIterator { #define SL_GET_MIN_KEY(s) SL_GET_NODE_KEY(s, SL_NODE_GET_FORWARD_POINTER((s)->pHead, 0)) #define SL_GET_MAX_KEY(s) SL_GET_NODE_KEY((s), SL_NODE_GET_BACKWARD_POINTER((s)->pTail, 0)) #define SL_SIZE(s) (s)->size -#define SL_TSIZE(s) (s)->tsize -SSkipList * tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint16_t keyLen, uint8_t flags, __sl_key_fn_t fn); -void tSkipListDestroy(SSkipList *pSkipList); -SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData); -SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); -void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel); +SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, __compar_fn_t comparFn, uint8_t flags, + __sl_key_fn_t fn); +void tSkipListDestroy(SSkipList *pSkipList); +SSkipListNode * tSkipListPut(SSkipList *pSkipList, void *pData); +SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); +void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel); SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList); SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *val, int32_t type, int32_t order); bool tSkipListIterNext(SSkipListIterator *iter); @@ -145,8 +139,6 @@ SSkipListNode * tSkipListIterGet(SSkipListIterator *iter); void * tSkipListDestroyIter(SSkipListIterator *iter); uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key); void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode); -SSkipListKey tSkipListGetMinKey(SSkipList *pSkipList); -SSkipListKey tSkipListGetMaxKey(SSkipList *pSkipList); #ifdef __cplusplus } diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 603a9759b8..ea166e4c25 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -34,7 +34,8 @@ static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList); static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList); static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList); -SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, uint8_t flags, __sl_key_fn_t fn) { +SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, __compar_fn_t comparFn, uint8_t flags, + __sl_key_fn_t fn) { SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); if (pSkipList == NULL) return NULL; @@ -47,7 +48,11 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, u pSkipList->len = keyLen; pSkipList->flags = flags; pSkipList->keyFn = fn; - pSkipList->comparFn = getKeyComparFunc(keyType); + if (comparFn == NULL) { + pSkipList->comparFn = getKeyComparFunc(keyType); + } else { + pSkipList->comparFn = comparFn; + } if (initForwardBackwardPtr(pSkipList) < 0) { tSkipListDestroy(pSkipList); @@ -115,10 +120,6 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { if (dupMode == SL_UPDATE_DUP_KEY) { pNode = SL_NODE_GET_FORWARD_POINTER(forward[0], 0); atomic_store_ptr(&(pNode->pData), pData); - if (SL_IS_NODE_DELETED(pNode)) { - pNode->flags &= (~(SL_NODE_DELETED_FLAG)); - pSkipList->size++; - } } } else { pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList)); @@ -137,8 +138,6 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { uint32_t count = 0; - if (SL_DUP_MODE(pSkipList) == SL_DISCARD_DUP_KEY) return 0; - tSkipListWLock(pSkipList); SSkipListNode *pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); @@ -177,9 +176,7 @@ SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) { break; } - if (!SL_IS_NODE_DELETED(p)) { - taosArrayPush(sa, &p); - } + taosArrayPush(sa, &p); pNode = p; } @@ -227,19 +224,13 @@ bool tSkipListIterNext(SSkipListIterator *iter) { tSkipListRLock(pSkipList); if (iter->order == TSDB_ORDER_ASC) { - while (true) { - iter->cur = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0); - iter->step++; - if (iter->cur == pSkipList->pTail) break; - if (!SL_IS_NODE_DELETED(iter->cur)) break; - } + if (iter->cur == pSkipList->pTail) return false; + iter->cur = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0); + iter->step++; } else { - while (true) { - iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0); - iter->step++; - if (iter->cur == pSkipList->pHead) break; - if (!SL_IS_NODE_DELETED(iter->cur)) break; - } + if (iter->cur == pSkipList->pHead) return false; + iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0); + iter->step++; } tSkipListUnlock(pSkipList); @@ -305,28 +296,6 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { } } -SSkipListKey tSkipListGetMinKey(SSkipList *pSkipList) { - if (pSkipList == NULL || SL_SIZE(pSkipList) == 0) return NULL; - - SSkipListNode *pNode = pSkipList->pHead; - while ((pNode = SL_NODE_GET_FORWARD_POINTER(pNode, 0)) != pSkipList->pTail) { - if (!SL_IS_NODE_DELETED(pNode)) return pSkipList->keyFn(pNode->pData); - } - - return NULL; -} - -SSkipListKey tSkipListGetMaxKey(SSkipList *pSkipList) { - if (pSkipList == NULL || SL_SIZE(pSkipList) == 0) return NULL; - - SSkipListNode *pNode = pSkipList->pTail; - while ((pNode = SL_NODE_GET_BACKWARD_POINTER(pNode, 0)) != pSkipList->pHead) { - if (!SL_IS_NODE_DELETED(pNode)) return pSkipList->keyFn(pNode->pData); - } - - return NULL; -} - static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { for (int32_t i = 0; i < pNode->level; ++i) { if (i >= pSkipList->level) { @@ -349,7 +318,6 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk if (pSkipList->level < pNode->level) pSkipList->level = pNode->level; pSkipList->size += 1; - pSkipList->tsize += 1; } static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) { @@ -447,27 +415,18 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) { int32_t level = pNode->level; uint8_t dupMode = SL_DUP_MODE(pSkipList); + ASSERT(dupMode != SL_DISCARD_DUP_KEY && dupMode != SL_UPDATE_DUP_KEY); - if (dupMode == SL_UPDATE_DUP_KEY) { - if (SL_IS_NODE_DELETED(pNode)) { - return; - } else { - SL_SET_NODE_DELETED(pNode); - pSkipList->size--; - } - } else { - for (int32_t j = level - 1; j >= 0; --j) { - SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(pNode, j); - SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(pNode, j); + for (int32_t j = level - 1; j >= 0; --j) { + SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(pNode, j); + SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(pNode, j); - SL_NODE_GET_FORWARD_POINTER(prev, j) = next; - SL_NODE_GET_BACKWARD_POINTER(next, j) = prev; - } - - tSkipListFreeNode(pNode); - pSkipList->size--; - pSkipList->tsize--; + SL_NODE_GET_FORWARD_POINTER(prev, j) = next; + SL_NODE_GET_BACKWARD_POINTER(next, j) = prev; } + + tSkipListFreeNode(pNode); + pSkipList->size--; } // Function must be called after calling tSkipListRemoveNodeImpl() function -- GitLab