提交 89e3ce2d 编写于 作者: B Bomin Zhang

TD-325: skiplist support duplicated key

上级 8ffe4b76
......@@ -174,7 +174,7 @@ void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSiz
SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode);
/**
* get only *one* node of which key is equalled to pKey, even there are more than one nodes are of the same key
* get *all* nodes which key are equivalent to pKey
*
* @param pSkipList
* @param pKey
......@@ -234,14 +234,13 @@ SSkipListNode *tSkipListIterGet(SSkipListIterator *iter);
void *tSkipListDestroyIter(SSkipListIterator *iter);
/*
* remove only one node of the pKey value.
* If more than one node has the same value, any one will be removed
* remove nodes of the pKey value.
* If more than one node has the same value, all will be removed
*
* @Return
* true: one node has been removed
* false: no node has been removed
* the count of removed nodes
*/
bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey key);
uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key);
/*
* remove the specified node in parameters
......
......@@ -74,7 +74,47 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk
static SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode);
static SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode);
static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
static SSkipListNode* tSkipListDoGet(SSkipList *pSkipList, SSkipListKey key);
// when order is TSDB_ORDER_ASC, return the last node with key less than val
// when order is TSDB_ORDER_DESC, return the first node with key large than val
static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order) {
__compar_fn_t comparFn = pSkipList->comparFn;
SSkipListNode *pNode = NULL;
if (order == TSDB_ORDER_ASC) {
pNode = pSkipList->pHead;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
if (comparFn(key, val) < 0) {
pNode = p;
p = SL_GET_FORWARD_POINTER(p, i);
} else {
break;
}
}
}
} else {
pNode = pSkipList->pTail;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_BACKWARD_POINTER(pNode, i);
while (p != pSkipList->pHead) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
if (comparFn(key, val) > 0) {
pNode = p;
p = SL_GET_BACKWARD_POINTER(p, i);
} else {
break;
}
}
}
}
return pNode;
}
static bool initForwardBackwardPtr(SSkipList* pSkipList) {
uint32_t maxLevel = pSkipList->maxLevel;
......@@ -240,13 +280,37 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
return pNode;
}
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) {
SArray* sa = taosArrayInit(1, POINTER_BYTES);
SSkipListNode* pNode = tSkipListDoGet(pSkipList, key);
taosArrayPush(sa, &pNode);
if (pSkipList->lock) {
pthread_rwlock_wrlock(pSkipList->lock);
}
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC);
while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) {
break;
}
if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) {
break;
}
taosArrayPush(sa, &p);
pNode = p;
}
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return sa;
}
size_t tSkipListGetSize(const SSkipList* pSkipList) {
if (pSkipList == NULL) {
return 0;
......@@ -375,14 +439,52 @@ size_t tSkipListGetSize(const SSkipList* pSkipList) {
// return true;
//}
bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
SSkipListNode* pNode = tSkipListDoGet(pSkipList, key);
if (pNode != NULL) {
tSkipListRemoveNode(pSkipList, pNode);
return true;
uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
uint32_t count = 0;
if (pSkipList->lock) {
pthread_rwlock_wrlock(pSkipList->lock);
}
return false;
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC);
while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) {
break;
}
if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) {
break;
}
for (int32_t j = p->level - 1; j >= 0; --j) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(p, j);
SSkipListNode* next = SL_GET_FORWARD_POINTER(p, j);
SL_GET_FORWARD_POINTER(prev, j) = next;
SL_GET_BACKWARD_POINTER(next, j) = prev;
}
if (pSkipList->keyInfo.freeNode) {
tfree(p);
}
++count;
}
// compress the minimum level of skip list
while (pSkipList->level > 0) {
if (SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) != NULL) {
break;
}
pSkipList->level--;
}
pSkipList->size -= count;
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return count;
}
void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
......@@ -425,54 +527,25 @@ SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList) {
}
SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* val, int32_t type, int32_t order) {
if (pSkipList == NULL) {
return NULL;
}
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
assert(pSkipList != NULL);
SSkipListIterator* iter = doCreateSkipListIterator(pSkipList, order);
if (val == NULL) {
return doCreateSkipListIterator(pSkipList, order);
} else {
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
int32_t ret = -1;
__compar_fn_t filterComparFn = getKeyComparFunc(pSkipList->keyInfo.type);
SSkipListNode* pNode = pSkipList->pHead;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
if ((ret = filterComparFn(key, val)) < 0) {
pNode = p;
p = SL_GET_FORWARD_POINTER(p, i);
} else {
break;
}
}
forward[i] = pNode;
}
SSkipListIterator* iter = doCreateSkipListIterator(pSkipList, order);
// set the initial position
if (order == TSDB_ORDER_ASC) {
iter->cur = forward[0]; // greater equals than the value
} else {
iter->cur = SL_GET_FORWARD_POINTER(forward[0], 0);
if (ret == 0) {
assert(iter->cur != pSkipList->pTail);
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
}
}
return iter;
}
if (pSkipList->lock) {
pthread_rwlock_rdlock(pSkipList->lock);
}
iter->cur = getPriorNode(pSkipList, val, order);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return iter;
}
bool tSkipListIterNext(SSkipListIterator *iter) {
......@@ -487,17 +560,9 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
}
if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate
if (iter->cur == NULL) {
iter->cur = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0);
} else {
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
}
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else { // descending order iterate
if (iter->cur == NULL) {
iter->cur = SL_GET_BACKWARD_POINTER(pSkipList->pTail, 0);
} else {
iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0);
}
iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0);
}
if (pSkipList->lock) {
......@@ -638,57 +703,16 @@ SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode) {
return pNode;
}
SSkipListNode* tSkipListDoGet(SSkipList *pSkipList, SSkipListKey skey) {
SSkipListNode *pNode = pSkipList->pHead;
SSkipListNode *pRes = NULL;
if (pSkipList->lock) {
pthread_rwlock_rdlock(pSkipList->lock);
}
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.queryCount++;
#endif
__compar_fn_t cmparFn = getComparFunc(pSkipList->keyInfo.type, 0);
int32_t ret = -1;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
if ((ret = cmparFn(key, skey)) < 0) {
pNode = p;
p = SL_GET_FORWARD_POINTER(p, i);
} else {
break;
}
}
// find the qualified key
if (ret == 0) {
pRes = SL_GET_FORWARD_POINTER(pNode, i);
break;
// skip list does not allowed duplicated key, abort further retrieve data
// if (!pSkipList->keyInfo.dupKey) {
// break;
// }
}
}
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return pRes;
}
SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) {
SSkipListIterator* iter = calloc(1, sizeof(SSkipListIterator));
iter->pSkipList = pSkipList;
iter->order = order;
if(order == TSDB_ORDER_ASC) {
iter->cur = pSkipList->pHead;
} else {
iter->cur = pSkipList->pTail;
}
return iter;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册