未验证 提交 48dc335c 编写于 作者: H huili 提交者: GitHub

Merge pull request #4186 from taosdata/feature/TD-2061

Feature/td 2061
...@@ -24,8 +24,8 @@ static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, in ...@@ -24,8 +24,8 @@ static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, in
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
static void tSkipListCorrectLevel(SSkipList *pSkipList); static void tSkipListCorrectLevel(SSkipList *pSkipList);
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode); static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode);
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, void *pData); static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData);
static SSkipListNode * tSkipListNewNode(uint8_t level); static SSkipListNode * tSkipListNewNode(uint8_t level);
#define tSkipListFreeNode(n) tfree((n)) #define tSkipListFreeNode(n) tfree((n))
...@@ -108,17 +108,17 @@ void tSkipListDestroy(SSkipList *pSkipList) { ...@@ -108,17 +108,17 @@ void tSkipListDestroy(SSkipList *pSkipList) {
SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
if (pSkipList == NULL || pData == NULL) return NULL; if (pSkipList == NULL || pData == NULL) return NULL;
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
uint8_t dupMode = SL_DUP_MODE(pSkipList); uint8_t dupMode = SL_DUP_MODE(pSkipList);
SSkipListNode *pNode = NULL; SSkipListNode *pNode = NULL;
tSkipListWLock(pSkipList); tSkipListWLock(pSkipList);
bool hasDup = tSkipListGetPosToPut(pSkipList, forward, pData); bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) { if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) {
if (dupMode == SL_UPDATE_DUP_KEY) { if (dupMode == SL_UPDATE_DUP_KEY) {
pNode = SL_NODE_GET_FORWARD_POINTER(forward[0], 0); pNode = SL_NODE_GET_BACKWARD_POINTER(backward[0], 0);
atomic_store_ptr(&(pNode->pData), pData); atomic_store_ptr(&(pNode->pData), pData);
} }
} else { } else {
...@@ -126,7 +126,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { ...@@ -126,7 +126,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
if (pNode != NULL) { if (pNode != NULL) {
pNode->pData = pData; pNode->pData = pData;
tSkipListDoInsert(pSkipList, forward, pNode); tSkipListDoInsert(pSkipList, backward, pNode);
} }
} }
...@@ -310,7 +310,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { ...@@ -310,7 +310,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
} }
} }
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode) {
for (int32_t i = 0; i < pNode->level; ++i) { for (int32_t i = 0; i < pNode->level; ++i) {
if (i >= pSkipList->level) { if (i >= pSkipList->level) {
SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail; SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail;
...@@ -318,14 +318,14 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk ...@@ -318,14 +318,14 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk
SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode; SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode;
SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode; SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode;
} else { } else {
SSkipListNode *x = forward[i]; SSkipListNode *x = backward[i];
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = x; SL_NODE_GET_FORWARD_POINTER(pNode, i) = x;
SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(x, i); SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(x, i);
SL_NODE_GET_BACKWARD_POINTER(next, i) = pNode; SL_NODE_GET_FORWARD_POINTER(prev, i) = pNode;
SL_NODE_GET_FORWARD_POINTER(pNode, i) = next; SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode;
SL_NODE_GET_FORWARD_POINTER(x, i) = pNode; SL_NODE_GET_BACKWARD_POINTER(pNode, i) = prev;
} }
} }
...@@ -371,57 +371,57 @@ static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) { ...@@ -371,57 +371,57 @@ static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) {
return 0; return 0;
} }
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, void *pData) { static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) {
int compare = 0; int compare = 0;
bool hasDupKey = false; bool hasDupKey = false;
char * pDataKey = pSkipList->keyFn(pData); char * pDataKey = pSkipList->keyFn(pData);
if (pSkipList->size == 0) { if (pSkipList->size == 0) {
for (int i = 0; i < pSkipList->level; i++) { for (int i = 0; i < pSkipList->level; i++) {
forward[i] = pSkipList->pHead; backward[i] = pSkipList->pTail;
} }
} else { } else {
char *pKey = NULL; char *pKey = NULL;
// Compare min key // Compare max key
pKey = SL_GET_MIN_KEY(pSkipList); pKey = SL_GET_MAX_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey); compare = pSkipList->comparFn(pDataKey, pKey);
if (compare <= 0) { if (compare >= 0) {
for (int i = 0; i < pSkipList->level; i++) { for (int i = 0; i < pSkipList->level; i++) {
forward[i] = pSkipList->pHead; backward[i] = pSkipList->pTail;
} }
return (compare == 0); return (compare == 0);
} }
// Compare max key // Compare min key
pKey = SL_GET_MAX_KEY(pSkipList); pKey = SL_GET_MIN_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey); compare = pSkipList->comparFn(pDataKey, pKey);
if (compare > 0) { if (compare < 0) {
for (int i = 0; i < pSkipList->level; i++) { for (int i = 0; i < pSkipList->level; i++) {
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i); backward[i] = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i);
} }
return (compare == 0); return (compare == 0);
} }
SSkipListNode *px = pSkipList->pHead; SSkipListNode *px = pSkipList->pTail;
for (int i = pSkipList->level - 1; i >= 0; --i) { for (int i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i); SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i);
while (p != pSkipList->pTail) { while (p != pSkipList->pHead) {
pKey = SL_GET_NODE_KEY(pSkipList, p); pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey); compare = pSkipList->comparFn(pKey, pDataKey);
if (compare >= 0) { if (compare <= 0) {
if (compare == 0 && !hasDupKey) hasDupKey = true; if (compare == 0 && !hasDupKey) hasDupKey = true;
break; break;
} else { } else {
px = p; px = p;
p = SL_NODE_GET_FORWARD_POINTER(px, i); p = SL_NODE_GET_BACKWARD_POINTER(px, i);
} }
} }
forward[i] = px; backward[i] = px;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册