提交 4b86e0a3 编写于 作者: H hjxilinx

[td-225] add remove support for skiplist and super table index.

上级 557064d1
......@@ -502,13 +502,19 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
memcpy(SL_GET_NODE_DATA(pNode), &pTable, POINTER_BYTES);
tSkipListPut(list, pNode);
return 0;
}
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
assert(pTable->type == TSDB_CHILD_TABLE);
// TODO
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
assert(pSTable != NULL);
char* key = dataRowTuple(pTable->tagVal); // key
bool ret = tSkipListRemove(pSTable->pIndex, key);
assert(ret);
return 0;
}
......
......@@ -178,10 +178,9 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode);
*
* @param pSkipList
* @param pKey
* @param keyType
* @return
*/
SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType);
SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey);
/**
* get the size of skip list
......@@ -242,7 +241,7 @@ void *tSkipListDestroyIter(SSkipListIterator *iter);
* true: one node has been removed
* false: no node has been removed
*/
bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey);
bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey key);
/*
* remove the specified node in parameters
......
......@@ -74,6 +74,7 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk
static SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode);
static SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode);
static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
static SSkipListNode* tSkipListDoGet(SSkipList *pSkipList, SSkipListKey key);
static bool initForwardBackwardPtr(SSkipList* pSkipList) {
uint32_t maxLevel = pSkipList->maxLevel;
......@@ -97,6 +98,7 @@ static bool initForwardBackwardPtr(SSkipList* pSkipList) {
return true;
}
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock,
uint8_t freeNode, __sl_key_fn_t fn) {
SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList));
......@@ -139,25 +141,6 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui
return pSkipList;
}
// static void doRemove(SSkipList *pSkipList, SSkipListNode *pNode, SSkipListNode *forward[]) {
// int32_t level = pNode->level;
// for (int32_t j = level - 1; j >= 0; --j) {
// if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) {
// forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j];
// }
//
// if (forward[j]->pForward[j] != NULL) {
// forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j];
// }
// }
//
// pSkipList->state.nTotalMemSize -= (sizeof(SSkipListNode) + POINTER_BYTES * pNode->level * 2);
// removeNodeEachLevel(pSkipList, pNode->level);
//
// tfree(pNode);
// --pSkipList->size;
//}
void *tSkipListDestroy(SSkipList *pSkipList) {
if (pSkipList == NULL) {
return NULL;
......@@ -257,114 +240,180 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
return pNode;
}
void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) {
DO_MEMSET_PTR_AREA(pNode);
for (int32_t i = 0; i < pNode->level; ++i) {
SSkipListNode *x = forward[i];
SL_GET_BACKWARD_POINTER(pNode, i) = x;
SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i);
SL_GET_BACKWARD_POINTER(next, i) = pNode;
SL_GET_FORWARD_POINTER(pNode, i) = next;
SL_GET_FORWARD_POINTER(x, i) = pNode;
}
atomic_add_fetch_32(&pSkipList->size, 1);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) {
SArray* sa = taosArrayInit(1, POINTER_BYTES);
SSkipListNode* pNode = tSkipListDoGet(pSkipList, key);
taosArrayPush(sa, &pNode);
return sa;
}
SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode) {
SSkipListNode* forward[MAX_SKIP_LIST_LEVEL] = {0};
for(int32_t i = 0; i < pSkipList->level; ++i) {
forward[i] = pSkipList->pHead;
size_t tSkipListGetSize(const SSkipList* pSkipList) {
if (pSkipList == NULL) {
return 0;
}
tSkipListDoInsert(pSkipList, forward, pNode);
return pNode;
return pSkipList->size;
}
SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode) {
// do clear pointer area
DO_MEMSET_PTR_AREA(pNode);
for(int32_t i = 0; i < pNode->level; ++i) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i);
SL_GET_FORWARD_POINTER(prev, i) = pNode;
SL_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail;
SL_GET_BACKWARD_POINTER(pNode, i) = prev;
SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode;
}
pSkipList->lastKey = SL_GET_NODE_KEY(pSkipList, pNode);
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
// int32_t cond, SSkipListNode ***pRes) {
// pthread_rwlock_rdlock(&pSkipList->lock);
// SSkipListNode *p = pStartNode;
// int32_t numOfRes = 0;
//
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pEndKey->nType);
// while (p != NULL) {
// int32_t ret = filterComparFn(&p->key, pEndKey);
// if (ret > 0) {
// break;
// }
//
// if (ret < 0) {
// numOfRes++;
// p = p->pForward[0];
// } else if (ret == 0) {
// if (cond == TSDB_RELATION_LESS_EQUAL) {
// numOfRes++;
// p = p->pForward[0];
// } else {
// break;
// }
// }
// }
//
// (*pRes) = (SSkipListNode **)malloc(POINTER_BYTES * numOfRes);
// for (int32_t i = 0; i < numOfRes; ++i) {
// (*pRes)[i] = pStartNode;
// pStartNode = pStartNode->pForward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return numOfRes;
//}
//
///*
// * maybe return the copy of SSkipListNode would be better
// */
// int32_t tSkipListGets(SSkipList *pSkipList, SSkipListKey *pKey, SSkipListNode ***pRes) {
// (*pRes) = NULL;
//
// SSkipListNode *pNode = tSkipListGet(pSkipList, pKey);
// if (pNode == NULL) {
// return 0;
// }
//
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
//
// // backward check if previous nodes are with the same value.
// SSkipListNode *pPrev = pNode->pBackward[0];
// while ((pPrev != &pSkipList->pHead) && filterComparFn(&pPrev->key, pKey) == 0) {
// pPrev = pPrev->pBackward[0];
// }
//
// return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes);
//}
//
// static SSkipListNode *tSkipListParQuery(SSkipList *pSkipList, SSkipListKey *pKey, int32_t cond) {
// int32_t sLevel = pSkipList->level - 1;
// int32_t ret = -1;
//
// SSkipListNode *x = &pSkipList->pHead;
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
//
// pthread_rwlock_rdlock(&pSkipList->lock);
//
// if (cond == TSDB_RELATION_GREATER_EQUAL || cond == TSDB_RELATION_GREATER) {
// for (int32_t i = sLevel; i >= 0; --i) {
// while (x->pForward[i] != NULL && (ret = filterComparFn(&x->pForward[i]->key, pKey)) < 0) {
// x = x->pForward[i];
// }
// }
//
// // backward check if previous nodes are with the same value.
// if (cond == TSDB_RELATION_GREATER_EQUAL && ret == 0) {
// SSkipListNode *pNode = x->pForward[0];
// while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparFn(&pNode->pBackward[0]->key, pKey) == 0)) {
// pNode = pNode->pBackward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
// return pNode;
// }
//
// if (ret > 0 || cond == TSDB_RELATION_GREATER_EQUAL) {
// pthread_rwlock_unlock(&pSkipList->lock);
// return x->pForward[0];
// } else { // cond == TSDB_RELATION_GREATER && ret == 0
// SSkipListNode *pn = x->pForward[0];
// while (pn != NULL && filterComparFn(&pn->key, pKey) == 0) {
// pn = pn->pForward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
// return pn;
// }
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
// return NULL;
//}
//
//
// static bool removeSupport(SSkipList *pSkipList, SSkipListNode **forward, SSkipListKey *pKey) {
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
//
// if (filterComparFn(&forward[0]->pForward[0]->key, pKey) == 0) {
// SSkipListNode *p = forward[0]->pForward[0];
// doRemove(pSkipList, p, forward);
// } else { // failed to find the node of specified value,abort
// return false;
// }
//
// // compress the minimum level of skip list
// while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == NULL) {
// pSkipList->level -= 1;
// }
//
// return true;
//}
atomic_add_fetch_32(&pSkipList->size, 1);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
SSkipListNode* pNode = tSkipListDoGet(pSkipList, key);
if (pNode != NULL) {
tSkipListRemoveNode(pSkipList, pNode);
return true;
}
return pNode;
return false;
}
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) {
int32_t sLevel = pSkipList->level - 1;
// result list
SArray* sa = taosArrayInit(1, POINTER_BYTES);
SSkipListNode *pNode = pSkipList->pHead;
void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
int32_t level = pNode->level;
if (pSkipList->lock) {
pthread_rwlock_rdlock(pSkipList->lock);
pthread_rwlock_wrlock(pSkipList->lock);
}
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.queryCount++;
#endif
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, 0);
int32_t ret = -1;
for (int32_t i = sLevel; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
for (int32_t j = level - 1; j >= 0; --j) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pNode, j);
SSkipListNode* next = SL_GET_FORWARD_POINTER(pNode, j);
if ((ret = filterComparFn(key, pKey)) < 0) {
pNode = p;
p = SL_GET_FORWARD_POINTER(p, i);
} else {
break;
SL_GET_FORWARD_POINTER(prev, j) = next;
SL_GET_BACKWARD_POINTER(next, j) = prev;
}
if (pSkipList->keyInfo.freeNode) {
tfree(pNode);
}
// find the qualified key
if (ret == 0) {
SSkipListNode* pResult = SL_GET_FORWARD_POINTER(pNode, i);
taosArrayPush(sa, &pResult);
atomic_sub_fetch_32(&pSkipList->size, 1);
// skip list does not allowed duplicated key, abort further retrieve data
if (!pSkipList->keyInfo.dupKey) {
break;
}
}
// compress the minimum level of skip list
while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == NULL) {
pSkipList->level -= 1;
}
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return sa;
}
size_t tSkipListGetSize(const SSkipList* pSkipList) {
if (pSkipList == NULL) {
return 0;
}
return pSkipList->size;
}
SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList) {
......@@ -475,138 +524,6 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) {
return NULL;
}
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
// int32_t cond, SSkipListNode ***pRes) {
// pthread_rwlock_rdlock(&pSkipList->lock);
// SSkipListNode *p = pStartNode;
// int32_t numOfRes = 0;
//
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pEndKey->nType);
// while (p != NULL) {
// int32_t ret = filterComparFn(&p->key, pEndKey);
// if (ret > 0) {
// break;
// }
//
// if (ret < 0) {
// numOfRes++;
// p = p->pForward[0];
// } else if (ret == 0) {
// if (cond == TSDB_RELATION_LESS_EQUAL) {
// numOfRes++;
// p = p->pForward[0];
// } else {
// break;
// }
// }
// }
//
// (*pRes) = (SSkipListNode **)malloc(POINTER_BYTES * numOfRes);
// for (int32_t i = 0; i < numOfRes; ++i) {
// (*pRes)[i] = pStartNode;
// pStartNode = pStartNode->pForward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return numOfRes;
//}
//
///*
// * maybe return the copy of SSkipListNode would be better
// */
// int32_t tSkipListGets(SSkipList *pSkipList, SSkipListKey *pKey, SSkipListNode ***pRes) {
// (*pRes) = NULL;
//
// SSkipListNode *pNode = tSkipListGet(pSkipList, pKey);
// if (pNode == NULL) {
// return 0;
// }
//
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
//
// // backward check if previous nodes are with the same value.
// SSkipListNode *pPrev = pNode->pBackward[0];
// while ((pPrev != &pSkipList->pHead) && filterComparFn(&pPrev->key, pKey) == 0) {
// pPrev = pPrev->pBackward[0];
// }
//
// return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes);
//}
//
// static SSkipListNode *tSkipListParQuery(SSkipList *pSkipList, SSkipListKey *pKey, int32_t cond) {
// int32_t sLevel = pSkipList->level - 1;
// int32_t ret = -1;
//
// SSkipListNode *x = &pSkipList->pHead;
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
//
// pthread_rwlock_rdlock(&pSkipList->lock);
//
// if (cond == TSDB_RELATION_GREATER_EQUAL || cond == TSDB_RELATION_GREATER) {
// for (int32_t i = sLevel; i >= 0; --i) {
// while (x->pForward[i] != NULL && (ret = filterComparFn(&x->pForward[i]->key, pKey)) < 0) {
// x = x->pForward[i];
// }
// }
//
// // backward check if previous nodes are with the same value.
// if (cond == TSDB_RELATION_GREATER_EQUAL && ret == 0) {
// SSkipListNode *pNode = x->pForward[0];
// while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparFn(&pNode->pBackward[0]->key, pKey) == 0)) {
// pNode = pNode->pBackward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
// return pNode;
// }
//
// if (ret > 0 || cond == TSDB_RELATION_GREATER_EQUAL) {
// pthread_rwlock_unlock(&pSkipList->lock);
// return x->pForward[0];
// } else { // cond == TSDB_RELATION_GREATER && ret == 0
// SSkipListNode *pn = x->pForward[0];
// while (pn != NULL && filterComparFn(&pn->key, pKey) == 0) {
// pn = pn->pForward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
// return pn;
// }
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
// return NULL;
//}
//
//
// static bool removeSupport(SSkipList *pSkipList, SSkipListNode **forward, SSkipListKey *pKey) {
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
//
// if (filterComparFn(&forward[0]->pForward[0]->key, pKey) == 0) {
// SSkipListNode *p = forward[0]->pForward[0];
// doRemove(pSkipList, p, forward);
// } else { // failed to find the node of specified value,abort
// return false;
// }
//
// // compress the minimum level of skip list
// while (pSkipList->level > 0 && pSkipList->pHead.pForward[pSkipList->level - 1] == NULL) {
// pSkipList->level -= 1;
// }
//
// return true;
//}
//
// void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// for (int32_t i = 0; i < pNode->level; ++i) {
// forward[i] = pNode->pBackward[i];
// }
//
// removeSupport(pSkipList, forward, &pNode->key);
// pthread_rwlock_unlock(&pSkipList->lock);
//}
//
// bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey) {
// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
......@@ -668,6 +585,105 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
}
}
void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) {
DO_MEMSET_PTR_AREA(pNode);
for (int32_t i = 0; i < pNode->level; ++i) {
SSkipListNode *x = forward[i];
SL_GET_BACKWARD_POINTER(pNode, i) = x;
SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i);
SL_GET_BACKWARD_POINTER(next, i) = pNode;
SL_GET_FORWARD_POINTER(pNode, i) = next;
SL_GET_FORWARD_POINTER(x, i) = pNode;
}
atomic_add_fetch_32(&pSkipList->size, 1);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
}
SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode) {
SSkipListNode* forward[MAX_SKIP_LIST_LEVEL] = {0};
for(int32_t i = 0; i < pSkipList->level; ++i) {
forward[i] = pSkipList->pHead;
}
tSkipListDoInsert(pSkipList, forward, pNode);
return pNode;
}
SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode) {
// do clear pointer area
DO_MEMSET_PTR_AREA(pNode);
for(int32_t i = 0; i < pNode->level; ++i) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i);
SL_GET_FORWARD_POINTER(prev, i) = pNode;
SL_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail;
SL_GET_BACKWARD_POINTER(pNode, i) = prev;
SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode;
}
pSkipList->lastKey = SL_GET_NODE_KEY(pSkipList, pNode);
atomic_add_fetch_32(&pSkipList->size, 1);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return pNode;
}
SSkipListNode* tSkipListDoGet(SSkipList *pSkipList, SSkipListKey skey) {
SSkipListNode *pNode = pSkipList->pHead;
SSkipListNode *pRes = NULL;
if (pSkipList->lock) {
pthread_rwlock_rdlock(pSkipList->lock);
}
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.queryCount++;
#endif
__compar_fn_t cmparFn = getComparFunc(pSkipList->keyInfo.type, 0);
int32_t ret = -1;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
if ((ret = cmparFn(key, skey)) < 0) {
pNode = p;
p = SL_GET_FORWARD_POINTER(p, i);
} else {
break;
}
}
// find the qualified key
if (ret == 0) {
pRes = SL_GET_FORWARD_POINTER(pNode, i);
break;
// skip list does not allowed duplicated key, abort further retrieve data
// if (!pSkipList->keyInfo.dupKey) {
// break;
// }
}
}
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return pRes;
}
SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) {
SSkipListIterator* iter = calloc(1, sizeof(SSkipListIterator));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册