diff --git a/src/query/src/qAst.c b/src/query/src/qAst.c index 634f014d975430c8da3e4de406ef7c92482930b6..1e60c71afd2b779060f2eccaf8e002bc2cafe865 100644 --- a/src/query/src/qAst.c +++ b/src/query/src/qAst.c @@ -411,9 +411,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr } if (cond.start != NULL) { - iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->keyInfo.type, TSDB_ORDER_ASC); + iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC); } else { - iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->keyInfo.type, TSDB_ORDER_DESC); + iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC); } if (cond.start != NULL) { @@ -468,7 +468,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr tSkipListDestroyIter(iter); comp = true; - iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->keyInfo.type, TSDB_ORDER_DESC); + iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC); while(tSkipListIterNext(iter)) { SSkipListNode* pNode = tSkipListIterGet(iter); comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index f6a7f1b35cf7992b3bb9ccdab5892418c8174eb1..0b7cc4dca4c0733e7420603315bb532bad869402 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -37,38 +37,25 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { STsdbCfg * pCfg = &pRepo->config; STsdbMeta * pMeta = pRepo->tsdbMeta; - int32_t level = 0; - int32_t headSize = 0; TSKEY key = dataRowKey(row); SMemTable * pMemTable = pRepo->mem; STableData *pTableData = NULL; - SSkipList * pSList = NULL; + // SSkipList * pSList = NULL; - if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL && - pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) { - pTableData = pMemTable->tData[TABLE_TID(pTable)]; - pSList = pTableData->pData; - } - - tSkipListNewNodeInfo(pSList, &level, &headSize); - - SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(SDataRow *)); - if (pNode == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } + // if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL && + // pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) { + // pTableData = pMemTable->tData[TABLE_TID(pTable)]; + // pSList = pTableData->pData; + // } void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); if (pRow == NULL) { tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); - free(pNode); return -1; } - pNode->level = level; dataRowCpy(pRow, row); - *(SDataRow *)SL_GET_NODE_DATA(pNode) = pRow; // Operations above may change pRepo->mem, retake those values ASSERT(pRepo->mem != NULL); @@ -77,7 +64,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { if (TABLE_TID(pTable) >= pMemTable->maxTables) { if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { tsdbFreeBytes(pRepo, pRow, dataRowLen(row)); - free(pNode); return -1; } } @@ -97,7 +83,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { " to table %s while create new table data object since %s", REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno)); tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); - free(pNode); return -1; } @@ -106,20 +91,19 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); - if (tSkipListPut(pTableData->pData, pNode) == NULL) { + int64_t oldSize = SL_GET_SIZE(pTableData->pData); + if (tSkipListPut(pTableData->pData, (void *)(&pRow), sizeof(void *)) == NULL) { tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); - free(pNode); } else { + int64_t deltaSize = SL_GET_SIZE(pTableData->pData) - oldSize; if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; if (pMemTable->keyLast < key) pMemTable->keyLast = key; - pMemTable->numOfRows++; + pMemTable->numOfRows += deltaSize; if (pTableData->keyFirst > key) pTableData->keyFirst = key; if (pTableData->keyLast < key) pTableData->keyLast = key; - pTableData->numOfRows++; - - ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData)); + pTableData->numOfRows += deltaSize; } tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), @@ -439,7 +423,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { pTableData->numOfRows = 0; pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, - TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 1, tsdbGetTsTupleKey); + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], /*SL_DISCARD_DUP_KEY*/ SL_APPEND_DUP_KEY, tsdbGetTsTupleKey); if (pTableData->pData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index a84bb69777f512a6c5d947dd48f8f2865b867200..dede109928d81369406507ad367629943b7b0730 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -690,7 +690,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { } pTable->tagVal = NULL; STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); - pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), 1, 0, 1, getTagIndexKey); + pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), SL_ALLOW_DUP_KEY, getTagIndexKey); if (pTable->pIndex == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -892,23 +892,8 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper pTable->pSuper = pSTable; - int32_t level = 0; - int32_t headSize = 0; + tSkipListPut(pSTable->pIndex, (void *)(&pTable), sizeof(STable *)); - tSkipListNewNodeInfo(pSTable->pIndex, &level, &headSize); - - // NOTE: do not allocate the space for key, since in each skip list node, only keep the pointer to pTable, not the - // actual key value, and the key value will be retrieved during query through the pTable and getTagIndexKey function - SSkipListNode *pNode = calloc(1, headSize + sizeof(STable *)); - if (pNode == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - pNode->level = level; - - memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *)); - - tSkipListPut(pSTable->pIndex, pNode); if (refSuper) T_REF_INC(pSTable); return 0; } @@ -1165,7 +1150,7 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) { buf = tdDecodeSchema(buf, &(pTable->tagSchema)); STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); pTable->pIndex = - tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), 1, 0, 1, getTagIndexKey); + tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), SL_ALLOW_DUP_KEY, getTagIndexKey); if (pTable->pIndex == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbFreeTable(pTable); @@ -1191,7 +1176,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) { tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM); } else { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { - tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (tSkipListGetSize(pTable->pIndex) + 1)); + tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (SL_GET_SIZE(pTable->pIndex) + 1)); } else { tlen = sizeof(SListNode) + sizeof(SActObj); } diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index e63a085cc8d9e81d68e770df64f6c0d6c43e1807..211c5ab32035ab1a115749bc623dd37444a869a6 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -7,7 +7,7 @@ TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4) IF (TD_LINUX) TARGET_LINK_LIBRARIES(tutil m rt) - ADD_SUBDIRECTORY(tests) + # ADD_SUBDIRECTORY(tests) FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/) IF (ICONV_INCLUDE_EXIST) diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 4ba620dce03c51cd49faeb0ed113d64fe2d24959..f1e09501e9356baa77bcf43dd78372a1fdb86d10 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -27,6 +27,14 @@ extern "C" { #define MAX_SKIP_LIST_LEVEL 15 #define SKIP_LIST_RECORD_PERFORMANCE 0 +// For key property setting +#define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists +#define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key +#define SL_UPDATA_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert +#define SL_APPEND_DUP_KEY (uint8_t)0x3 // Update duplicate key by append +// For thread safety setting +#define SL_THREAD_SAFE (uint8_t)0x4 + typedef char *SSkipListKey; typedef char *(*__sl_key_fn_t)(const void *); @@ -41,6 +49,9 @@ typedef struct SSkipListNode { uint8_t level; } SSkipListNode; +#define SL_IS_THREAD_SAFE(flags) ((flags)&SL_THREAD_SAFE) +#define SL_DUP_MODE(flags) ((flags) & ((((uint8_t)1) << 2) - 1)) + #define SL_NODE_HEADER_SIZE(_l) (sizeof(SSkipListNode) + ((_l) << 1u) * POINTER_BYTES) #define SL_GET_FORWARD_POINTER(n, _l) ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode)))[(_l)] @@ -54,6 +65,8 @@ typedef struct SSkipListNode { #define SL_GET_SL_MAX_KEY(s) (SL_GET_NODE_KEY((s), SL_GET_BACKWARD_POINTER((s)->pTail, 0))) #define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n)) +#define SL_GET_SIZE(s) (s)->size +#define SL_GET_TSIZE(s) (s)->tsize /* * @version 0.3 @@ -113,13 +126,16 @@ typedef struct SSkipListKeyInfo { typedef struct SSkipList { __compar_fn_t comparFn; __sl_key_fn_t keyFn; - uint32_t size; + pthread_rwlock_t *lock; + uint16_t len; uint8_t maxLevel; + uint8_t flags; + uint8_t type; // static info above uint8_t level; - SSkipListKeyInfo keyInfo; - pthread_rwlock_t *lock; - SSkipListNode * pHead; // point to the first element - SSkipListNode * pTail; // point to the last element + uint32_t size; // not including duplicate keys + uint32_t tsize; // including duplicate keys + SSkipListNode * pHead; // point to the first element + SSkipListNode * pTail; // point to the last element #if SKIP_LIST_RECORD_PERFORMANCE tSkipListState state; // skiplist state #endif @@ -145,8 +161,7 @@ typedef struct SSkipListIterator { * @param dupKey allow the duplicated key in the skip list * @return */ -SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t threadsafe, - uint8_t freeNode, __sl_key_fn_t fn); +SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint16_t keyLen, uint8_t flags, __sl_key_fn_t fn); /** * @@ -164,30 +179,34 @@ void *tSkipListDestroy(SSkipList *pSkipList); void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize); /** - * put the skip list node into the skip list. + * put the data into the skiplist * If failed, NULL will be returned, otherwise, the pNode will be returned. * * @param pSkipList - * @param pNode + * @param pData + * @param dataLen * @return */ -SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode); +SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData, int dataLen); /** - * get *all* nodes which key are equivalent to pKey + * put the skip list node into the skip list. + * If failed, NULL will be returned, otherwise, the pNode will be returned. * * @param pSkipList - * @param pKey + * @param pNode * @return */ -SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); +SSkipListNode *tSkipListPutNode(SSkipList *pSkipList, SSkipListNode *pNode); /** - * get the size of skip list + * get *all* nodes which key are equivalent to pKey + * * @param pSkipList + * @param pKey * @return */ -size_t tSkipListGetSize(const SSkipList *pSkipList); +SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); /** * display skip list of the given level, for debug purpose only diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 303c2440bf9b59282b8347604d58d3368dd312c0..73f6bf0502102ebc0c37314914c31a89543482fe 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -13,318 +13,649 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "tskiplist.h" #include "os.h" +#include "tcompare.h" #include "tulog.h" -#include "tskiplist.h" #include "tutil.h" -#include "tcompare.h" -UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level -#if SKIP_LIST_RECORD_PERFORMANCE - for (int32_t i = 0; i < level; ++i) { - pSkipList->state.nLevelNodeCnt[i]++; +#define DO_MEMSET_PTR_AREA(n) \ + do { \ + int32_t _l = (n)->level; \ + memset(pNode, 0, SL_NODE_HEADER_SIZE(_l)); \ + (n)->level = _l; \ + } while (0) + +static int initForwardBackwardPtr(SSkipList *pSkipList); +static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order); +static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); +static void tSkipListCorrectLevel(SSkipList *pSkipList); +static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); + +static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList); +static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList); +static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList); +static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList); +static FORCE_INLINE SSkipListNode *tSkipListPutNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); + +SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, uint8_t flags, __sl_key_fn_t fn) { + SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); + if (pSkipList == NULL) { + return NULL; + } + + if (maxLevel > MAX_SKIP_LIST_LEVEL) { + maxLevel = MAX_SKIP_LIST_LEVEL; + } + + pSkipList->maxLevel = maxLevel; + pSkipList->type = keyType; + pSkipList->len = keyLen; + pSkipList->flags = flags; + pSkipList->keyFn = fn; + pSkipList->comparFn = getKeyComparFunc(keyType); + + // pSkipList->level = 1; // TODO: check if 1 is valid + if (initForwardBackwardPtr(pSkipList) < 0) { + taosTFree(pSkipList); + return NULL; } + + if (SL_IS_THREAD_SAFE(flags)) { + pSkipList->lock = (pthread_rwlock_t *)calloc(1, sizeof(pthread_rwlock_t)); + + if (pthread_rwlock_init(pSkipList->lock, NULL) != 0) { + taosTFree(pSkipList->pHead); + taosTFree(pSkipList); + + return NULL; + } + } + + srand((uint32_t)time(NULL)); + +#if SKIP_LIST_RECORD_PERFORMANCE + pSkipList->state.nTotalMemSize += sizeof(SSkipList); #endif + + return pSkipList; } -UNUSED_FUNC static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) { -#if SKIP_LIST_RECORD_PERFORMANCE - for (int32_t i = 0; i < level; ++i) { - pSkipList->state.nLevelNodeCnt[i]--; +void *tSkipListDestroy(SSkipList *pSkipList) { + if (pSkipList == NULL) return NULL; + + tSkipListWLock(pSkipList); + + SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); + + while (pNode != pSkipList->pTail) { + SSkipListNode *pTemp = pNode; + pNode = SL_GET_FORWARD_POINTER(pNode, 0); + taosTFree(pTemp); } -#endif + + tSkipListUnlock(pSkipList); + if (pSkipList->lock != NULL) { + pthread_rwlock_destroy(pSkipList->lock); + taosTFree(pSkipList->lock); + } + + taosTFree(pSkipList->pHead); + taosTFree(pSkipList); + return NULL; } -static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) { - const uint32_t factor = 4; +void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) { + if (pSkipList == NULL) { + *level = 1; + *headSize = SL_NODE_HEADER_SIZE(*level); + return; + } - int32_t n = 1; - while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) { - n++; + *level = getSkipListRandLevel(pSkipList); + *headSize = SL_NODE_HEADER_SIZE(*level); +} + +SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData, int dataLen) { + if (pSkipList == NULL || pData == NULL) return NULL; + + tSkipListWLock(pSkipList); + + int32_t level = getSkipListRandLevel(pSkipList); + + SSkipListNode *pNode = (SSkipListNode *)malloc(SL_NODE_HEADER_SIZE(level) + dataLen); + if (pNode == NULL) { + tSkipListUnlock(pSkipList); + return NULL; } - return n; + pNode->level = level; + memcpy(SL_GET_NODE_DATA(pNode), pData, dataLen); + + if (tSkipListPutNodeImpl(pSkipList, pNode) == NULL) { + tSkipListUnlock(pSkipList); + taosTFree(pNode); + return NULL; + } + + tSkipListUnlock(pSkipList); + + return pNode; } -static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) { - int32_t level = getSkipListNodeRandomHeight(pSkipList); - if (pSkipList->size == 0) { - level = 1; - pSkipList->level = 1; - } else { - if (level > pSkipList->level) { - if (pSkipList->level < pSkipList->maxLevel) { - level = (++pSkipList->level); - } else { - level = pSkipList->level; - } +SSkipListNode *tSkipListPutNode(SSkipList *pSkipList, SSkipListNode *pNode) { + SSkipListNode *pRetNode = NULL; + + if (pSkipList == NULL || pNode == NULL) return NULL; + + tSkipListWLock(pSkipList); + pRetNode = tSkipListPutNodeImpl(pSkipList, pNode); + tSkipListUnlock(pSkipList); + + return pRetNode; +} + +SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { + SArray *sa = taosArrayInit(1, POINTER_BYTES); + + tSkipListRLock(pSkipList); + + SSkipListNode *pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); + while (1) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); + if (p == pSkipList->pTail) { + break; + } + if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) { + break; } + taosArrayPush(sa, &p); + pNode = p; } - - assert(level <= pSkipList->maxLevel); - return level; + + tSkipListUnlock(pSkipList); + + return sa; } -#define DO_MEMSET_PTR_AREA(n) do {\ -int32_t _l = (n)->level;\ -memset(pNode, 0, SL_NODE_HEADER_SIZE(_l));\ -(n)->level = _l;\ -} while(0) +uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { + uint32_t count = 0; -static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode); -static SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode); -static SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode); -static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); + tSkipListWLock(pSkipList); + SSkipListNode *pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); + while (1) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); + if (p == pSkipList->pTail) { + break; + } + if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) { + break; + } -// when order is TSDB_ORDER_ASC, return the last node with key less than val -// when order is TSDB_ORDER_DESC, return the first node with key large than val -static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order) { - __compar_fn_t comparFn = pSkipList->comparFn; - SSkipListNode *pNode = NULL; + tSkipListRemoveNodeImpl(pSkipList, p); - if (order == TSDB_ORDER_ASC) { - pNode = pSkipList->pHead; - for (int32_t i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); - while (p != pSkipList->pTail) { - char *key = SL_GET_NODE_KEY(pSkipList, p); - if (comparFn(key, val) < 0) { - pNode = p; - p = SL_GET_FORWARD_POINTER(p, i); - } else { - break; + ++count; + } + + tSkipListCorrectLevel(pSkipList); + + tSkipListUnlock(pSkipList); + + return count; +} + +void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { + tSkipListWLock(pSkipList); + tSkipListRemoveNodeImpl(pSkipList, pNode); + tSkipListCorrectLevel(pSkipList); + tSkipListUnlock(pSkipList); +} + +SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) { + if (pSkipList == NULL) return NULL; + + return doCreateSkipListIterator(pSkipList, TSDB_ORDER_ASC); +} + +SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *val, int32_t type, int32_t order) { + assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); + assert(pSkipList != NULL); + + SSkipListIterator *iter = doCreateSkipListIterator(pSkipList, order); + if (val == NULL) { + return iter; + } + + tSkipListRLock(pSkipList); + + iter->cur = getPriorNode(pSkipList, val, order); + + tSkipListUnlock(pSkipList); + + return iter; +} + +bool tSkipListIterNext(SSkipListIterator *iter) { + if (iter->pSkipList == NULL) return false; + + SSkipList *pSkipList = iter->pSkipList; + uint8_t dupMod = SL_DUP_MODE(pSkipList->flags); + + tSkipListRLock(pSkipList); + + if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate + if (dupMod == SL_APPEND_DUP_KEY) { + if (iter->cur == pSkipList->pHead) { + iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); + iter->step++; + } else { + while (true) { + iter->step++; + SSkipListNode *pNode = iter->cur; + iter->cur = SL_GET_FORWARD_POINTER(pNode, 0); + + if (iter->cur == pSkipList->pTail) break; + if (pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, pNode), SL_GET_NODE_KEY(pSkipList, iter->cur)) == 0) { + continue; + } else { + break; + } } } + + if (iter->cur != pSkipList->pTail) { + while (true) { + SSkipListNode *pNode = SL_GET_FORWARD_POINTER(iter->cur, 0); + if (pNode == pSkipList->pTail) break; + if (pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, pNode), SL_GET_NODE_KEY(pSkipList, iter->cur)) == 0) { + iter->step++; + iter->cur = pNode; + } else { + break; + } + } + } + } else { + iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); + iter->step++; } - } else { - pNode = pSkipList->pTail; - for (int32_t i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_GET_BACKWARD_POINTER(pNode, i); - while (p != pSkipList->pHead) { - char *key = SL_GET_NODE_KEY(pSkipList, p); - if (comparFn(key, val) > 0) { - pNode = p; - p = SL_GET_BACKWARD_POINTER(p, i); - } else { - break; + } else { // descending order iterate + if (dupMod == SL_APPEND_DUP_KEY) { + if (iter->cur == pSkipList->pTail) { + iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); + iter->step++; + } else { + while (true) { + SSkipListNode *pNode = iter->cur; + iter->cur = SL_GET_BACKWARD_POINTER(pNode, 0); + + if (iter->cur == pSkipList->pHead) break; + if (pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, pNode), SL_GET_NODE_KEY(pSkipList, iter->cur)) == 0) { + iter->cur = pNode; + continue; + } else { + break; + } } } + } else { + iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); + iter->step++; } } - return pNode; -} + tSkipListUnlock(pSkipList); + return (iter->order == TSDB_ORDER_ASC) ? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead); +} -static bool initForwardBackwardPtr(SSkipList* pSkipList) { - uint32_t maxLevel = pSkipList->maxLevel; - - // head info - pSkipList->pHead = (SSkipListNode *)calloc(1, SL_NODE_HEADER_SIZE(maxLevel) * 2); - if (pSkipList->pHead == NULL) { - return false; - } - - pSkipList->pHead->level = pSkipList->maxLevel; - - // tail info - pSkipList->pTail = (SSkipListNode*) ((char*) pSkipList->pHead + SL_NODE_HEADER_SIZE(maxLevel)); - pSkipList->pTail->level = pSkipList->maxLevel; - - for (uint32_t i = 0; i < maxLevel; ++i) { - SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pSkipList->pTail; - SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pSkipList->pHead; +SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) { + if (iter == NULL || iter->cur == iter->pSkipList->pTail || iter->cur == iter->pSkipList->pHead) { + return NULL; + } else { + return iter->cur; } - - return true; } -SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock, - uint8_t freeNode, __sl_key_fn_t fn) { - SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); - if (pSkipList == NULL) { +void *tSkipListDestroyIter(SSkipListIterator *iter) { + if (iter == NULL) { return NULL; } - if (maxLevel > MAX_SKIP_LIST_LEVEL) { - maxLevel = MAX_SKIP_LIST_LEVEL; - } + taosTFree(iter); + return NULL; +} - pSkipList->keyInfo.type = keyType; - pSkipList->keyInfo.len = keyLen; - pSkipList->keyInfo.dupKey = dupKey; - pSkipList->keyInfo.freeNode = freeNode; - - pSkipList->keyFn = fn; - pSkipList->comparFn = getKeyComparFunc(keyType); - pSkipList->maxLevel = maxLevel; - pSkipList->level = 1; - - if (!initForwardBackwardPtr(pSkipList)) { - taosTFree(pSkipList); - return NULL; +void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { + if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) { + return; } - - if (lock) { - pSkipList->lock = calloc(1, sizeof(pthread_rwlock_t)); - if (pthread_rwlock_init(pSkipList->lock, NULL) != 0) { - taosTFree(pSkipList->pHead); - taosTFree(pSkipList); - - return NULL; + SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); + + int32_t id = 1; + char * prev = NULL; + + while (p != pSkipList->pTail) { + char *key = SL_GET_NODE_KEY(pSkipList, p); + if (prev != NULL) { + assert(pSkipList->comparFn(prev, key) < 0); + } + + switch (pSkipList->type) { + case TSDB_DATA_TYPE_INT: + fprintf(stdout, "%d: %d\n", id++, *(int32_t *)key); + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_BIGINT: + fprintf(stdout, "%d: %" PRId64 " \n", id++, *(int64_t *)key); + break; + case TSDB_DATA_TYPE_BINARY: + fprintf(stdout, "%d: %s \n", id++, key); + break; + case TSDB_DATA_TYPE_DOUBLE: + fprintf(stdout, "%d: %lf \n", id++, *(double *)key); + break; + default: + fprintf(stdout, "\n"); } + + prev = SL_GET_NODE_KEY(pSkipList, p); + + p = SL_GET_FORWARD_POINTER(p, nlevel - 1); } +} - srand((uint32_t)time(NULL)); +static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode, bool hasDupKey) { + uint8_t dupMode = SL_DUP_MODE(pSkipList->flags); -#if SKIP_LIST_RECORD_PERFORMANCE - pSkipList->state.nTotalMemSize += sizeof(SSkipList); -#endif + // FIXME: this may cause the level of skiplist change + if (dupMode == SL_UPDATA_DUP_KEY && hasDupKey) { + tSkipListRemoveNodeImpl(pSkipList, forward[0]); + tSkipListCorrectLevel(pSkipList); + } - return pSkipList; + DO_MEMSET_PTR_AREA(pNode); + + for (int32_t i = 0; i < pNode->level; ++i) { + SSkipListNode *x = forward[i]; + SL_GET_BACKWARD_POINTER(pNode, i) = x; + + SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i); + SL_GET_BACKWARD_POINTER(next, i) = pNode; + + SL_GET_FORWARD_POINTER(pNode, i) = next; + SL_GET_FORWARD_POINTER(x, i) = pNode; + } + + if (!(dupMode == SL_APPEND_DUP_KEY && hasDupKey)) { + pSkipList->size += 1; + } + pSkipList->tsize += 1; } -void *tSkipListDestroy(SSkipList *pSkipList) { - if (pSkipList == NULL) { - return NULL; +static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) { + SSkipListIterator *iter = calloc(1, sizeof(SSkipListIterator)); + + iter->pSkipList = pSkipList; + iter->order = order; + if (order == TSDB_ORDER_ASC) { + iter->cur = pSkipList->pHead; + } else { + iter->cur = pSkipList->pTail; } - if (pSkipList->lock) { - pthread_rwlock_wrlock(pSkipList->lock); + return iter; +} + +static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList) { + if (SL_IS_THREAD_SAFE(pSkipList->flags)) { + return pthread_rwlock_wrlock(pSkipList->lock); } + return 0; +} - if (pSkipList->keyInfo.freeNode) { - SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); +static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList) { + if (SL_IS_THREAD_SAFE(pSkipList->flags)) { + return pthread_rwlock_rdlock(pSkipList->lock); + } + return 0; +} - while (pNode != pSkipList->pTail) { - SSkipListNode *pTemp = pNode; - pNode = SL_GET_FORWARD_POINTER(pNode, 0); - taosTFree(pTemp); - } +static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) { + if (SL_IS_THREAD_SAFE(pSkipList->flags)) { + return pthread_rwlock_unlock(pSkipList->lock); } + return 0; +} - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - pthread_rwlock_destroy(pSkipList->lock); +static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { + int compare = 1; + bool hasDupKey = false; + char * pNodeKey = SL_GET_NODE_KEY(pSkipList, pNode); + uint8_t dupMode = SL_DUP_MODE(pSkipList->flags); - taosTFree(pSkipList->lock); + if (pSkipList->size == 0) { + for (int i = 0; i < pNode->level; i++) { + forward[i] = pSkipList->pHead; + } + } else { + char *pKey = NULL; + + // Compare min key + pKey = SL_GET_SL_MIN_KEY(pSkipList); + compare = pSkipList->comparFn(pNodeKey, pKey); + if ((dupMode == SL_APPEND_DUP_KEY && compare < 0) || (dupMode != SL_APPEND_DUP_KEY && compare <= 0)) { + for (int i = 0; i < pNode->level; i++) { + forward[i] = pSkipList->pHead; + } + return (compare == 0); + } + + // Compare max key + pKey = SL_GET_SL_MAX_KEY(pSkipList); + compare = pSkipList->comparFn(pNodeKey, pKey); + if ((dupMode == SL_DISCARD_DUP_KEY && compare > 0) || (dupMode != SL_DISCARD_DUP_KEY && compare >= 0)) { + for (int i = 0; i < pNode->level; i++) { + forward[i] = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i); + } + + return (compare == 0); + } + + SSkipListNode *px = pSkipList->pHead; + for (int i = pSkipList->level - 1; i >= 0; --i) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i); + while (p != pSkipList->pTail) { + pKey = SL_GET_NODE_KEY(pSkipList, p); + + compare = pSkipList->comparFn(pKey, pNodeKey); + if (compare == 0 && hasDupKey == false) hasDupKey = true; + if ((dupMode == SL_APPEND_DUP_KEY && compare > 0) || (dupMode != SL_APPEND_DUP_KEY && compare >= 0)) { + break; + } else { + px = p; + p = SL_GET_FORWARD_POINTER(px, i); + } + } + + forward[i] = px; + } } - taosTFree(pSkipList->pHead); - taosTFree(pSkipList); - return NULL; + return hasDupKey; } -void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) { - if (pSkipList == NULL) { - *level = 1; - *headSize = SL_NODE_HEADER_SIZE(*level); - return; +static bool tSkipListIsNodeDup(SSkipList *pSkipList, SSkipListNode *pNode) { + SSkipListNode *pPrevNode = SL_GET_BACKWARD_POINTER(pNode, 0); + SSkipListNode *pNextNode = SL_GET_FORWARD_POINTER(pNode, 0); + char * pNodeKey = SL_GET_NODE_KEY(pSkipList, pNode); + char * pPrevNodeKey = (pPrevNode == pSkipList->pHead) ? NULL : SL_GET_NODE_KEY(pSkipList, pPrevNode); + char * pNextNodeKey = (pNextNode == pSkipList->pTail) ? NULL : SL_GET_NODE_KEY(pSkipList, pNextNode); + + return ((pPrevNodeKey != NULL && pSkipList->comparFn(pNodeKey, pPrevNodeKey) == 0) || + (pNextNodeKey != NULL && pSkipList->comparFn(pNodeKey, pNextNodeKey) == 0)); +} + +static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) { + int32_t level = pNode->level; + uint8_t dupMode = SL_DUP_MODE(pSkipList->flags); + + bool sizeReduce = !(dupMode == SL_APPEND_DUP_KEY && tSkipListIsNodeDup(pSkipList, pNode)); + + for (int32_t j = level - 1; j >= 0; --j) { + SSkipListNode *prev = SL_GET_BACKWARD_POINTER(pNode, j); + SSkipListNode *next = SL_GET_FORWARD_POINTER(pNode, j); + + SL_GET_FORWARD_POINTER(prev, j) = next; + SL_GET_BACKWARD_POINTER(next, j) = prev; } - *level = getSkipListRandLevel(pSkipList); - *headSize = SL_NODE_HEADER_SIZE(*level); + taosTFree(pNode); + + if (sizeReduce) pSkipList->size--; + pSkipList->tsize--; } -SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { - if (pSkipList == NULL || pNode == NULL) { - return NULL; +// Function must be called after calling tSkipListRemoveNodeImpl() function +static void tSkipListCorrectLevel(SSkipList *pSkipList) { + while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == pSkipList->pTail) { + pSkipList->level -= 1; } +} - if (pSkipList->lock) { - pthread_rwlock_wrlock(pSkipList->lock); +UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, + int32_t level) { // record link count in each level +#if SKIP_LIST_RECORD_PERFORMANCE + for (int32_t i = 0; i < level; ++i) { + pSkipList->state.nLevelNodeCnt[i]++; } - - // if the new key is greater than the maximum key of skip list, push back this node at the end of skip list - char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode); - if (pSkipList->size == 0 || pSkipList->comparFn(SL_GET_SL_MAX_KEY(pSkipList), newDatakey) < 0) { - return tSkipListPushBack(pSkipList, pNode); +#endif +} + +UNUSED_FUNC static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) { +#if SKIP_LIST_RECORD_PERFORMANCE + for (int32_t i = 0; i < level; ++i) { + pSkipList->state.nLevelNodeCnt[i]--; } - - // if the new key is less than the minimum key of skip list, push front this node at the front of skip list - assert(pSkipList->size > 0); - char* minKey = SL_GET_SL_MIN_KEY(pSkipList); - if (pSkipList->comparFn(newDatakey, minKey) < 0) { - return tSkipListPushFront(pSkipList, pNode); +#endif +} + +static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) { + const uint32_t factor = 4; + + int32_t n = 1; + while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) { + n++; } - - // find the appropriated position to insert data - SSkipListNode *px = pSkipList->pHead; - SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - int32_t ret = -1; - for (int32_t i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i); - while (p != pSkipList->pTail) { - char *key = SL_GET_NODE_KEY(pSkipList, p); - - // if the forward element is less than the specified key, forward one step - ret = pSkipList->comparFn(key, newDatakey); - if (ret < 0) { - px = p; - p = SL_GET_FORWARD_POINTER(px, i); + return n; +} + +static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) { + int32_t level = 0; + if (pSkipList->size == 0) { + level = 1; + } else { + level = getSkipListNodeRandomHeight(pSkipList); + if (level > pSkipList->level) { + if (pSkipList->level < pSkipList->maxLevel) { + level = pSkipList->level + 1; } else { - break; + level = pSkipList->level; } } - - forward[i] = px; } - // if the skip list does not allowed identical key inserted, the new data will be discarded. - if (pSkipList->keyInfo.dupKey == 0 && ret == 0) { - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } + assert(level <= pSkipList->maxLevel); + return level; +} - return NULL; +// when order is TSDB_ORDER_ASC, return the last node with key less than val +// when order is TSDB_ORDER_DESC, return the first node with key large than val +static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order) { + __compar_fn_t comparFn = pSkipList->comparFn; + SSkipListNode *pNode = NULL; + + if (order == TSDB_ORDER_ASC) { + pNode = pSkipList->pHead; + for (int32_t i = pSkipList->level - 1; i >= 0; --i) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); + while (p != pSkipList->pTail) { + char *key = SL_GET_NODE_KEY(pSkipList, p); + if (comparFn(key, val) < 0) { + pNode = p; + p = SL_GET_FORWARD_POINTER(p, i); + } else { + break; + } + } + } + } else { + pNode = pSkipList->pTail; + for (int32_t i = pSkipList->level - 1; i >= 0; --i) { + SSkipListNode *p = SL_GET_BACKWARD_POINTER(pNode, i); + while (p != pSkipList->pHead) { + char *key = SL_GET_NODE_KEY(pSkipList, p); + if (comparFn(key, val) > 0) { + pNode = p; + p = SL_GET_BACKWARD_POINTER(p, i); + } else { + break; + } + } + } } - - tSkipListDoInsert(pSkipList, forward, pNode); + return pNode; } +static int initForwardBackwardPtr(SSkipList *pSkipList) { + uint32_t maxLevel = pSkipList->maxLevel; - -SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { - SArray* sa = taosArrayInit(1, POINTER_BYTES); - - if (pSkipList->lock) { - pthread_rwlock_wrlock(pSkipList->lock); + // head info + pSkipList->pHead = (SSkipListNode *)malloc(SL_NODE_HEADER_SIZE(maxLevel) * 2); + if (pSkipList->pHead == NULL) { + return -1; } - SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); - while (1) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); - if (p == pSkipList->pTail) { - break; - } - if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) { - break; - } - taosArrayPush(sa, &p); - pNode = p; - } + pSkipList->pHead->level = maxLevel; - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); + // tail info + pSkipList->pTail = (SSkipListNode *)POINTER_SHIFT(pSkipList->pHead, SL_NODE_HEADER_SIZE(maxLevel)); + pSkipList->pTail->level = maxLevel; + + for (uint32_t i = 0; i < maxLevel; ++i) { + SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pSkipList->pTail; + SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pSkipList->pHead; } - return sa; + return 0; } +static FORCE_INLINE SSkipListNode *tSkipListPutNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) { + SSkipListNode *pRetNode = NULL; + SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + int hasDupKey = tSkipListGetPosToPut(pSkipList, forward, pNode); + if (SL_DUP_MODE(pSkipList->flags) == SL_DISCARD_DUP_KEY && hasDupKey) { + pRetNode = NULL; + } else { + pRetNode = pNode; + tSkipListDoInsert(pSkipList, forward, pNode, hasDupKey); -size_t tSkipListGetSize(const SSkipList* pSkipList) { - if (pSkipList == NULL) { - return 0; + if (pNode->level > pSkipList->level) pSkipList->level = pNode->level; } - - return pSkipList->size; + + return pRetNode; } +// static void tSkipListSeek(SSkipList *pSkipList, char *key, int order) { +// // TODO +// } + // static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey, // int32_t cond, SSkipListNode ***pRes) { // pthread_rwlock_rdlock(&pSkipList->lock); @@ -444,158 +775,7 @@ size_t tSkipListGetSize(const SSkipList* pSkipList) { // // return true; //} - -uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { - uint32_t count = 0; - - if (pSkipList->lock) { - pthread_rwlock_wrlock(pSkipList->lock); - } - - SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); - while (1) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); - if (p == pSkipList->pTail) { - break; - } - if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) { - break; - } - - for (int32_t j = p->level - 1; j >= 0; --j) { - SSkipListNode* prev = SL_GET_BACKWARD_POINTER(p, j); - SSkipListNode* next = SL_GET_FORWARD_POINTER(p, j); - SL_GET_FORWARD_POINTER(prev, j) = next; - SL_GET_BACKWARD_POINTER(next, j) = prev; - } - - if (pSkipList->keyInfo.freeNode) { - taosTFree(p); - } - - ++count; - } - - // compress the minimum level of skip list - while (pSkipList->level > 0) { - if (SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) != NULL) { - break; - } - pSkipList->level--; - } - - pSkipList->size -= count; - - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } - - return count; -} - -void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { - int32_t level = pNode->level; - - if (pSkipList->lock) { - pthread_rwlock_wrlock(pSkipList->lock); - } - - for (int32_t j = level - 1; j >= 0; --j) { - SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pNode, j); - SSkipListNode* next = SL_GET_FORWARD_POINTER(pNode, j); - - SL_GET_FORWARD_POINTER(prev, j) = next; - SL_GET_BACKWARD_POINTER(next, j) = prev; - } - - if (pSkipList->keyInfo.freeNode) { - taosTFree(pNode); - } - - atomic_sub_fetch_32(&pSkipList->size, 1); - - // compress the minimum level of skip list - while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == NULL) { - pSkipList->level -= 1; - } - - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } -} - -SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList) { - if (pSkipList == NULL) { - return NULL; - } - - return doCreateSkipListIterator(pSkipList, TSDB_ORDER_ASC); -} - -SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* val, int32_t type, int32_t order) { - assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - assert(pSkipList != NULL); - - SSkipListIterator* iter = doCreateSkipListIterator(pSkipList, order); - if (val == NULL) { - return iter; - } - - if (pSkipList->lock) { - pthread_rwlock_rdlock(pSkipList->lock); - } - - iter->cur = getPriorNode(pSkipList, val, order); - - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } - - return iter; -} - -bool tSkipListIterNext(SSkipListIterator *iter) { - if (iter->pSkipList == NULL) { - return false; - } - - SSkipList *pSkipList = iter->pSkipList; - - if (pSkipList->lock) { - pthread_rwlock_rdlock(pSkipList->lock); - } - - if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate - iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); - } else { // descending order iterate - iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); - } - - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } - - iter->step += 1; - return (iter->order == TSDB_ORDER_ASC)? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead); -} - -SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) { - if (iter == NULL || iter->cur == iter->pSkipList->pTail || iter->cur == iter->pSkipList->pHead) { - return NULL; - } else { - return iter->cur; - } -} - -void* tSkipListDestroyIter(SSkipListIterator* iter) { - if (iter == NULL) { - return NULL; - } - - taosTFree(iter); - return NULL; -} - +// // bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey) { // SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; // __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType); @@ -614,110 +794,4 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) { // pthread_rwlock_unlock(&pSkipList->lock); // // return ret; -//} - -void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { - if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) { - return; - } - - SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); - - int32_t id = 1; - char* prev = NULL; - - while (p != pSkipList->pTail) { - char *key = SL_GET_NODE_KEY(pSkipList, p); - if (prev != NULL) { - assert(pSkipList->comparFn(prev, key) < 0); - } - - switch (pSkipList->keyInfo.type) { - case TSDB_DATA_TYPE_INT: - fprintf(stdout, "%d: %d\n", id++, *(int32_t *)key); - break; - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_BIGINT: - fprintf(stdout, "%d: %" PRId64 " \n", id++, *(int64_t *)key); - break; - case TSDB_DATA_TYPE_BINARY: - fprintf(stdout, "%d: %s \n", id++, key); - break; - case TSDB_DATA_TYPE_DOUBLE: - fprintf(stdout, "%d: %lf \n", id++, *(double *)key); - break; - default: - fprintf(stdout, "\n"); - } - - prev = SL_GET_NODE_KEY(pSkipList, p); - - p = SL_GET_FORWARD_POINTER(p, nlevel - 1); - } -} - -void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { - DO_MEMSET_PTR_AREA(pNode); - - for (int32_t i = 0; i < pNode->level; ++i) { - SSkipListNode *x = forward[i]; - SL_GET_BACKWARD_POINTER(pNode, i) = x; - - SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i); - SL_GET_BACKWARD_POINTER(next, i) = pNode; - - SL_GET_FORWARD_POINTER(pNode, i) = next; - SL_GET_FORWARD_POINTER(x, i) = pNode; - } - - atomic_add_fetch_32(&pSkipList->size, 1); - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } -} - -SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode) { - SSkipListNode* forward[MAX_SKIP_LIST_LEVEL] = {0}; - for(int32_t i = 0; i < pSkipList->level; ++i) { - forward[i] = pSkipList->pHead; - } - - tSkipListDoInsert(pSkipList, forward, pNode); - return pNode; -} - -SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode) { - // do clear pointer area - DO_MEMSET_PTR_AREA(pNode); - - for(int32_t i = 0; i < pNode->level; ++i) { - SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i); - SL_GET_FORWARD_POINTER(prev, i) = pNode; - SL_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail; - - SL_GET_BACKWARD_POINTER(pNode, i) = prev; - SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode; - } - - atomic_add_fetch_32(&pSkipList->size, 1); - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } - - return pNode; -} - -SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) { - SSkipListIterator* iter = calloc(1, sizeof(SSkipListIterator)); - - iter->pSkipList = pSkipList; - iter->order = order; - if(order == TSDB_ORDER_ASC) { - iter->cur = pSkipList->pHead; - } else { - iter->cur = pSkipList->pTail; - } - - return iter; -} \ No newline at end of file +//} \ No newline at end of file diff --git a/src/util/tests/skiplistTest.cpp b/src/util/tests/skiplistTest.cpp index 77174f69fda1a49975c44717d8993996274ca4c9..cd1fd1f7e1a0cba76ca1d8f355288afa907f1728 100644 --- a/src/util/tests/skiplistTest.cpp +++ b/src/util/tests/skiplistTest.cpp @@ -247,7 +247,7 @@ void skiplistPerformanceTest() { printf("total:%" PRIu64 " ms, avg:%f\n", e - s, (e - s) / (double)size); printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level); - assert(tSkipListGetSize(pSkipList) == size); + assert(SL_GET_SIZE(pSkipList) == size); // printf("the level of skiplist is:\n"); // @@ -273,7 +273,7 @@ void skiplistPerformanceTest() { int64_t et = taosGetTimestampMs(); printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st); - assert(tSkipListGetSize(pSkipList) == size); + assert(SL_GET_SIZE(pSkipList) == size); tSkipListDestroy(pSkipList); taosTFree(total);