diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 759ecbb7edd264c981aa999c26eb0b9dbbd658e2..686e5ab3132807bb55a770359842cbf635e6c5cf 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -174,7 +174,7 @@ void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSiz SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode); /** - * get only *one* node of which key is equalled to pKey, even there are more than one nodes are of the same key + * get *all* nodes which key are equivalent to pKey * * @param pSkipList * @param pKey @@ -234,14 +234,13 @@ SSkipListNode *tSkipListIterGet(SSkipListIterator *iter); void *tSkipListDestroyIter(SSkipListIterator *iter); /* - * remove only one node of the pKey value. - * If more than one node has the same value, any one will be removed + * remove nodes of the pKey value. + * If more than one node has the same value, all will be removed * * @Return - * true: one node has been removed - * false: no node has been removed + * the count of removed nodes */ -bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey key); +uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key); /* * remove the specified node in parameters diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 25ea49b60d3786f956e0af679752e765e6462da1..b72db6a8d81a9379ad2fb3b4721ecf96c992580e 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -74,7 +74,47 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk static SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode); static SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode); static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); -static SSkipListNode* tSkipListDoGet(SSkipList *pSkipList, SSkipListKey key); + + +// when order is TSDB_ORDER_ASC, return the last node with key less than val +// when order is TSDB_ORDER_DESC, return the first node with key large than val +static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order) { + __compar_fn_t comparFn = pSkipList->comparFn; + SSkipListNode *pNode = NULL; + + if (order == TSDB_ORDER_ASC) { + pNode = pSkipList->pHead; + for (int32_t i = pSkipList->level - 1; i >= 0; --i) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); + while (p != pSkipList->pTail) { + char *key = SL_GET_NODE_KEY(pSkipList, p); + if (comparFn(key, val) < 0) { + pNode = p; + p = SL_GET_FORWARD_POINTER(p, i); + } else { + break; + } + } + } + } else { + pNode = pSkipList->pTail; + for (int32_t i = pSkipList->level - 1; i >= 0; --i) { + SSkipListNode *p = SL_GET_BACKWARD_POINTER(pNode, i); + while (p != pSkipList->pHead) { + char *key = SL_GET_NODE_KEY(pSkipList, p); + if (comparFn(key, val) > 0) { + pNode = p; + p = SL_GET_BACKWARD_POINTER(p, i); + } else { + break; + } + } + } + } + + return pNode; +} + static bool initForwardBackwardPtr(SSkipList* pSkipList) { uint32_t maxLevel = pSkipList->maxLevel; @@ -110,7 +150,11 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui maxLevel = MAX_SKIP_LIST_LEVEL; } - pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey, .freeNode = freeNode}; + pSkipList->keyInfo.type = keyType; + pSkipList->keyInfo.len = keyLen; + pSkipList->keyInfo.dupKey = dupKey; + pSkipList->keyInfo.freeNode = freeNode; + pSkipList->keyFn = fn; pSkipList->comparFn = getKeyComparFunc(keyType); pSkipList->maxLevel = maxLevel; @@ -240,13 +284,37 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { return pNode; } + + SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { SArray* sa = taosArrayInit(1, POINTER_BYTES); - SSkipListNode* pNode = tSkipListDoGet(pSkipList, key); - taosArrayPush(sa, &pNode); + + if (pSkipList->lock) { + pthread_rwlock_wrlock(pSkipList->lock); + } + + SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); + while (1) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); + if (p == pSkipList->pTail) { + break; + } + if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) { + break; + } + taosArrayPush(sa, &p); + pNode = p; + } + + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + return sa; } + + size_t tSkipListGetSize(const SSkipList* pSkipList) { if (pSkipList == NULL) { return 0; @@ -375,14 +443,52 @@ size_t tSkipListGetSize(const SSkipList* pSkipList) { // return true; //} -bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { - SSkipListNode* pNode = tSkipListDoGet(pSkipList, key); - if (pNode != NULL) { - tSkipListRemoveNode(pSkipList, pNode); - return true; +uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { + uint32_t count = 0; + + if (pSkipList->lock) { + pthread_rwlock_wrlock(pSkipList->lock); } - - return false; + + SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); + while (1) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); + if (p == pSkipList->pTail) { + break; + } + if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) { + break; + } + + for (int32_t j = p->level - 1; j >= 0; --j) { + SSkipListNode* prev = SL_GET_BACKWARD_POINTER(p, j); + SSkipListNode* next = SL_GET_FORWARD_POINTER(p, j); + SL_GET_FORWARD_POINTER(prev, j) = next; + SL_GET_BACKWARD_POINTER(next, j) = prev; + } + + if (pSkipList->keyInfo.freeNode) { + tfree(p); + } + + ++count; + } + + // compress the minimum level of skip list + while (pSkipList->level > 0) { + if (SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) != NULL) { + break; + } + pSkipList->level--; + } + + pSkipList->size -= count; + + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + + return count; } void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { @@ -425,54 +531,25 @@ SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList) { } SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* val, int32_t type, int32_t order) { - if (pSkipList == NULL) { - return NULL; - } - assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - + assert(pSkipList != NULL); + + SSkipListIterator* iter = doCreateSkipListIterator(pSkipList, order); if (val == NULL) { - return doCreateSkipListIterator(pSkipList, order); - } else { - - SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - - int32_t ret = -1; - __compar_fn_t filterComparFn = getKeyComparFunc(pSkipList->keyInfo.type); - SSkipListNode* pNode = pSkipList->pHead; - - for (int32_t i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); - while (p != pSkipList->pTail) { - char *key = SL_GET_NODE_KEY(pSkipList, p); - - if ((ret = filterComparFn(key, val)) < 0) { - pNode = p; - p = SL_GET_FORWARD_POINTER(p, i); - } else { - break; - } - } - - forward[i] = pNode; - } - - SSkipListIterator* iter = doCreateSkipListIterator(pSkipList, order); - - // set the initial position - if (order == TSDB_ORDER_ASC) { - iter->cur = forward[0]; // greater equals than the value - } else { - iter->cur = SL_GET_FORWARD_POINTER(forward[0], 0); - - if (ret == 0) { - assert(iter->cur != pSkipList->pTail); - iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); - } - } - return iter; } + + if (pSkipList->lock) { + pthread_rwlock_rdlock(pSkipList->lock); + } + + iter->cur = getPriorNode(pSkipList, val, order); + + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + + return iter; } bool tSkipListIterNext(SSkipListIterator *iter) { @@ -487,17 +564,9 @@ bool tSkipListIterNext(SSkipListIterator *iter) { } if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate - if (iter->cur == NULL) { - iter->cur = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); - } else { - iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); - } + iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); } else { // descending order iterate - if (iter->cur == NULL) { - iter->cur = SL_GET_BACKWARD_POINTER(pSkipList->pTail, 0); - } else { - iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); - } + iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); } if (pSkipList->lock) { @@ -638,57 +707,16 @@ SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode) { return pNode; } -SSkipListNode* tSkipListDoGet(SSkipList *pSkipList, SSkipListKey skey) { - SSkipListNode *pNode = pSkipList->pHead; - SSkipListNode *pRes = NULL; - - if (pSkipList->lock) { - pthread_rwlock_rdlock(pSkipList->lock); - } - -#if SKIP_LIST_RECORD_PERFORMANCE - pSkipList->state.queryCount++; -#endif - - __compar_fn_t cmparFn = getComparFunc(pSkipList->keyInfo.type, 0); - - int32_t ret = -1; - for (int32_t i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); - while (p != pSkipList->pTail) { - char *key = SL_GET_NODE_KEY(pSkipList, p); - - if ((ret = cmparFn(key, skey)) < 0) { - pNode = p; - p = SL_GET_FORWARD_POINTER(p, i); - } else { - break; - } - } - - // find the qualified key - if (ret == 0) { - pRes = SL_GET_FORWARD_POINTER(pNode, i); - break; - // skip list does not allowed duplicated key, abort further retrieve data -// if (!pSkipList->keyInfo.dupKey) { -// break; -// } - } - } - - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } - - return pRes; -} - SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) { SSkipListIterator* iter = calloc(1, sizeof(SSkipListIterator)); iter->pSkipList = pSkipList; iter->order = order; + if(order == TSDB_ORDER_ASC) { + iter->cur = pSkipList->pHead; + } else { + iter->cur = pSkipList->pTail; + } return iter; } \ No newline at end of file diff --git a/src/util/tests/skiplistTest.cpp b/src/util/tests/skiplistTest.cpp index 3713e71a0169acf6ec08f79cb2c65f4e0e9b7df9..70445a3d651cbd6acdf00a6f3999e5ea6b0a2c1d 100644 --- a/src/util/tests/skiplistTest.cpp +++ b/src/util/tests/skiplistTest.cpp @@ -281,34 +281,55 @@ void skiplistPerformanceTest() { // todo not support duplicated key yet void duplicatedKeyTest() { -#if 0 - SSkipListKey key; - key.nType = TSDB_DATA_TYPE_INT; - - SSkipListNode **pNodes = NULL; + SSkipList *pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_INT, sizeof(int), true, false, true, getkey); - SSkipList *pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_INT, sizeof(int)); - - for (int32_t i = 0; i < 10000; ++i) { + for (int32_t i = 0; i < 200; ++i) { for (int32_t j = 0; j < 5; ++j) { - key.i64Key = i; - tSkipListPut(pSkipList, "", &key, 1); + int32_t level, size; + tSkipListNewNodeInfo(pSkipList, &level, &size); + SSkipListNode* d = (SSkipListNode*)calloc(1, size + sizeof(int32_t)); + d->level = level; + int32_t* key = (int32_t*)SL_GET_NODE_KEY(pSkipList, d); + key[0] = i; + tSkipListPut(pSkipList, d); } } - tSkipListPrint(pSkipList, 1); - for (int32_t i = 0; i < 100; ++i) { - key.i64Key = rand() % 1000; - int32_t size = tSkipListGets(pSkipList, &key, &pNodes); - - assert(size == 5); + SSkipListKey key; + SArray* nodes = tSkipListGet(pSkipList, (char*)(&i)); + assert( taosArrayGetSize(nodes) == 5 ); + taosArrayDestroy(nodes); + } - tfree(pNodes); + int32_t key = 101; + uint32_t num = tSkipListRemove(pSkipList, (char*)(&key)); + assert(num == 5); + + SArray* nodes = tSkipListGet(pSkipList, (char*)(&key)); + assert( taosArrayGetSize(nodes) == 0 ); + taosArrayDestroy(nodes); + + key = 102; + SSkipListIterator* iter = tSkipListCreateIterFromVal(pSkipList, (char*)(&key), TSDB_DATA_TYPE_INT, TSDB_ORDER_ASC); + for(int i = 0; i < 6; i++) { + assert(tSkipListIterNext(iter) == true); + SSkipListNode* node = tSkipListIterGet(iter); + int32_t* val = (int32_t*)SL_GET_NODE_KEY(pSkipList, node); + assert((i < 5) == ((*val) == key)); } + tSkipListDestroyIter(iter); + + iter = tSkipListCreateIterFromVal(pSkipList, (char*)(&key), TSDB_DATA_TYPE_INT, TSDB_ORDER_DESC); + for(int i = 0; i < 6; i++) { + assert(tSkipListIterNext(iter) == true); + SSkipListNode* node = tSkipListIterGet(iter); + int32_t* val = (int32_t*)SL_GET_NODE_KEY(pSkipList, node); + assert((i < 5) == ((*val) == key)); + } + tSkipListDestroyIter(iter); tSkipListDestroy(pSkipList); -#endif } } // namespace