From 4b86e0a39c54d86891c8a53a46c11aa345611d71 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 8 May 2020 16:23:07 +0800 Subject: [PATCH] [td-225] add remove support for skiplist and super table index. --- src/tsdb/src/tsdbMeta.c | 12 +- src/util/inc/tskiplist.h | 5 +- src/util/src/tskiplist.c | 496 ++++++++++++++++++++------------------- 3 files changed, 267 insertions(+), 246 deletions(-) diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 63ccb538ed..7b849135e0 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -502,13 +502,19 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { memcpy(SL_GET_NODE_DATA(pNode), &pTable, POINTER_BYTES); tSkipListPut(list, pNode); - return 0; } static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { - assert(pTable->type == TSDB_CHILD_TABLE); - // TODO + assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL); + + STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid); + assert(pSTable != NULL); + + char* key = dataRowTuple(pTable->tagVal); // key + bool ret = tSkipListRemove(pSTable->pIndex, key); + + assert(ret); return 0; } diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 4634c14881..759ecbb7ed 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -178,10 +178,9 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode); * * @param pSkipList * @param pKey - * @param keyType * @return */ -SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType); +SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); /** * get the size of skip list @@ -242,7 +241,7 @@ void *tSkipListDestroyIter(SSkipListIterator *iter); * true: one node has been removed * false: no node has been removed */ -bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey); +bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey key); /* * remove the specified node in parameters diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 4149449e6c..25ea49b60d 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -74,6 +74,7 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk static SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode); static SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode); static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); +static SSkipListNode* tSkipListDoGet(SSkipList *pSkipList, SSkipListKey key); static bool initForwardBackwardPtr(SSkipList* pSkipList) { uint32_t maxLevel = pSkipList->maxLevel; @@ -97,6 +98,7 @@ static bool initForwardBackwardPtr(SSkipList* pSkipList) { return true; } + SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock, uint8_t freeNode, __sl_key_fn_t fn) { SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); @@ -139,25 +141,6 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui return pSkipList; } -// static void doRemove(SSkipList *pSkipList, SSkipListNode *pNode, SSkipListNode *forward[]) { -// int32_t level = pNode->level; -// for (int32_t j = level - 1; j >= 0; --j) { -// if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) { -// forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j]; -// } -// -// if (forward[j]->pForward[j] != NULL) { -// forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j]; -// } -// } -// -// pSkipList->state.nTotalMemSize -= (sizeof(SSkipListNode) + POINTER_BYTES * pNode->level * 2); -// removeNodeEachLevel(pSkipList, pNode->level); -// -// tfree(pNode); -// --pSkipList->size; -//} - void *tSkipListDestroy(SSkipList *pSkipList) { if (pSkipList == NULL) { return NULL; @@ -257,105 +240,10 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { return pNode; } -void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { - DO_MEMSET_PTR_AREA(pNode); - - for (int32_t i = 0; i < pNode->level; ++i) { - SSkipListNode *x = forward[i]; - SL_GET_BACKWARD_POINTER(pNode, i) = x; - - SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i); - SL_GET_BACKWARD_POINTER(next, i) = pNode; - - SL_GET_FORWARD_POINTER(pNode, i) = next; - SL_GET_FORWARD_POINTER(x, i) = pNode; - } - - atomic_add_fetch_32(&pSkipList->size, 1); - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } -} - -SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode) { - SSkipListNode* forward[MAX_SKIP_LIST_LEVEL] = {0}; - for(int32_t i = 0; i < pSkipList->level; ++i) { - forward[i] = pSkipList->pHead; - } - - tSkipListDoInsert(pSkipList, forward, pNode); - return pNode; -} - -SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode) { - // do clear pointer area - DO_MEMSET_PTR_AREA(pNode); - - for(int32_t i = 0; i < pNode->level; ++i) { - SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i); - SL_GET_FORWARD_POINTER(prev, i) = pNode; - SL_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail; - - SL_GET_BACKWARD_POINTER(pNode, i) = prev; - SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode; - } - - pSkipList->lastKey = SL_GET_NODE_KEY(pSkipList, pNode); - - atomic_add_fetch_32(&pSkipList->size, 1); - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } - - return pNode; -} - -SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) { - int32_t sLevel = pSkipList->level - 1; - - // result list +SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { SArray* sa = taosArrayInit(1, POINTER_BYTES); - SSkipListNode *pNode = pSkipList->pHead; - - if (pSkipList->lock) { - pthread_rwlock_rdlock(pSkipList->lock); - } - -#if SKIP_LIST_RECORD_PERFORMANCE - pSkipList->state.queryCount++; -#endif - - __compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, 0); - int32_t ret = -1; - for (int32_t i = sLevel; i >= 0; --i) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); - while (p != pSkipList->pTail) { - char *key = SL_GET_NODE_KEY(pSkipList, p); - - if ((ret = filterComparFn(key, pKey)) < 0) { - pNode = p; - p = SL_GET_FORWARD_POINTER(p, i); - } else { - break; - } - } - - // find the qualified key - if (ret == 0) { - SSkipListNode* pResult = SL_GET_FORWARD_POINTER(pNode, i); - taosArrayPush(sa, &pResult); - - // skip list does not allowed duplicated key, abort further retrieve data - if (!pSkipList->keyInfo.dupKey) { - break; - } - } - } - - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } - + SSkipListNode* pNode = tSkipListDoGet(pSkipList, key); + taosArrayPush(sa, &pNode); return sa; } @@ -367,114 +255,6 @@ size_t tSkipListGetSize(const SSkipList* pSkipList) { return pSkipList->size; } -SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList) { - if (pSkipList == NULL) { - return NULL; - } - - return doCreateSkipListIterator(pSkipList, TSDB_ORDER_ASC); -} - -SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* val, int32_t type, int32_t order) { - if (pSkipList == NULL) { - return NULL; - } - - assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - - if (val == NULL) { - return doCreateSkipListIterator(pSkipList, order); - } else { - - SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - - int32_t ret = -1; - __compar_fn_t filterComparFn = getKeyComparFunc(pSkipList->keyInfo.type); - SSkipListNode* pNode = pSkipList->pHead; - - for (int32_t i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); - while (p != pSkipList->pTail) { - char *key = SL_GET_NODE_KEY(pSkipList, p); - - if ((ret = filterComparFn(key, val)) < 0) { - pNode = p; - p = SL_GET_FORWARD_POINTER(p, i); - } else { - break; - } - } - - forward[i] = pNode; - } - - SSkipListIterator* iter = doCreateSkipListIterator(pSkipList, order); - - // set the initial position - if (order == TSDB_ORDER_ASC) { - iter->cur = forward[0]; // greater equals than the value - } else { - iter->cur = SL_GET_FORWARD_POINTER(forward[0], 0); - - if (ret == 0) { - assert(iter->cur != pSkipList->pTail); - iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); - } - } - - return iter; - } -} - -bool tSkipListIterNext(SSkipListIterator *iter) { - if (iter->pSkipList == NULL) { - return false; - } - - SSkipList *pSkipList = iter->pSkipList; - - if (pSkipList->lock) { - pthread_rwlock_rdlock(pSkipList->lock); - } - - if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate - if (iter->cur == NULL) { - iter->cur = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); - } else { - iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); - } - } else { // descending order iterate - if (iter->cur == NULL) { - iter->cur = SL_GET_BACKWARD_POINTER(pSkipList->pTail, 0); - } else { - iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); - } - } - - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } - - return (iter->order == TSDB_ORDER_ASC)? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead); -} - -SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) { - if (iter == NULL || iter->cur == iter->pSkipList->pTail || iter->cur == iter->pSkipList->pHead) { - return NULL; - } else { - return iter->cur; - } -} - -void* tSkipListDestroyIter(SSkipListIterator* iter) { - if (iter == NULL) { - return NULL; - } - - tfree(iter); - return NULL; -} - // static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey, // int32_t cond, SSkipListNode ***pRes) { // pthread_rwlock_rdlock(&pSkipList->lock); @@ -588,25 +368,162 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) { // } // // // compress the minimum level of skip list -// while (pSkipList->level > 0 && pSkipList->pHead.pForward[pSkipList->level - 1] == NULL) { +// while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == NULL) { // pSkipList->level -= 1; // } // // return true; //} -// -// void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { -// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; -// -// pthread_rwlock_rdlock(&pSkipList->lock); -// for (int32_t i = 0; i < pNode->level; ++i) { -// forward[i] = pNode->pBackward[i]; -// } -// -// removeSupport(pSkipList, forward, &pNode->key); -// pthread_rwlock_unlock(&pSkipList->lock); -//} -// + +bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { + SSkipListNode* pNode = tSkipListDoGet(pSkipList, key); + if (pNode != NULL) { + tSkipListRemoveNode(pSkipList, pNode); + return true; + } + + return false; +} + +void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { + int32_t level = pNode->level; + + if (pSkipList->lock) { + pthread_rwlock_wrlock(pSkipList->lock); + } + + for (int32_t j = level - 1; j >= 0; --j) { + SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pNode, j); + SSkipListNode* next = SL_GET_FORWARD_POINTER(pNode, j); + + SL_GET_FORWARD_POINTER(prev, j) = next; + SL_GET_BACKWARD_POINTER(next, j) = prev; + } + + if (pSkipList->keyInfo.freeNode) { + tfree(pNode); + } + + atomic_sub_fetch_32(&pSkipList->size, 1); + + // compress the minimum level of skip list + while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == NULL) { + pSkipList->level -= 1; + } + + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } +} + +SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList) { + if (pSkipList == NULL) { + return NULL; + } + + return doCreateSkipListIterator(pSkipList, TSDB_ORDER_ASC); +} + +SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* val, int32_t type, int32_t order) { + if (pSkipList == NULL) { + return NULL; + } + + assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); + + if (val == NULL) { + return doCreateSkipListIterator(pSkipList, order); + } else { + + SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + + int32_t ret = -1; + __compar_fn_t filterComparFn = getKeyComparFunc(pSkipList->keyInfo.type); + SSkipListNode* pNode = pSkipList->pHead; + + for (int32_t i = pSkipList->level - 1; i >= 0; --i) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); + while (p != pSkipList->pTail) { + char *key = SL_GET_NODE_KEY(pSkipList, p); + + if ((ret = filterComparFn(key, val)) < 0) { + pNode = p; + p = SL_GET_FORWARD_POINTER(p, i); + } else { + break; + } + } + + forward[i] = pNode; + } + + SSkipListIterator* iter = doCreateSkipListIterator(pSkipList, order); + + // set the initial position + if (order == TSDB_ORDER_ASC) { + iter->cur = forward[0]; // greater equals than the value + } else { + iter->cur = SL_GET_FORWARD_POINTER(forward[0], 0); + + if (ret == 0) { + assert(iter->cur != pSkipList->pTail); + iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); + } + } + + return iter; + } +} + +bool tSkipListIterNext(SSkipListIterator *iter) { + if (iter->pSkipList == NULL) { + return false; + } + + SSkipList *pSkipList = iter->pSkipList; + + if (pSkipList->lock) { + pthread_rwlock_rdlock(pSkipList->lock); + } + + if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate + if (iter->cur == NULL) { + iter->cur = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); + } else { + iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); + } + } else { // descending order iterate + if (iter->cur == NULL) { + iter->cur = SL_GET_BACKWARD_POINTER(pSkipList->pTail, 0); + } else { + iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); + } + } + + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + + return (iter->order == TSDB_ORDER_ASC)? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead); +} + +SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) { + if (iter == NULL || iter->cur == iter->pSkipList->pTail || iter->cur == iter->pSkipList->pHead) { + return NULL; + } else { + return iter->cur; + } +} + +void* tSkipListDestroyIter(SSkipListIterator* iter) { + if (iter == NULL) { + return NULL; + } + + tfree(iter); + return NULL; +} + // bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey) { // SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; // __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType); @@ -668,6 +585,105 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { } } +void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { + DO_MEMSET_PTR_AREA(pNode); + + for (int32_t i = 0; i < pNode->level; ++i) { + SSkipListNode *x = forward[i]; + SL_GET_BACKWARD_POINTER(pNode, i) = x; + + SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i); + SL_GET_BACKWARD_POINTER(next, i) = pNode; + + SL_GET_FORWARD_POINTER(pNode, i) = next; + SL_GET_FORWARD_POINTER(x, i) = pNode; + } + + atomic_add_fetch_32(&pSkipList->size, 1); + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } +} + +SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode) { + SSkipListNode* forward[MAX_SKIP_LIST_LEVEL] = {0}; + for(int32_t i = 0; i < pSkipList->level; ++i) { + forward[i] = pSkipList->pHead; + } + + tSkipListDoInsert(pSkipList, forward, pNode); + return pNode; +} + +SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode) { + // do clear pointer area + DO_MEMSET_PTR_AREA(pNode); + + for(int32_t i = 0; i < pNode->level; ++i) { + SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i); + SL_GET_FORWARD_POINTER(prev, i) = pNode; + SL_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail; + + SL_GET_BACKWARD_POINTER(pNode, i) = prev; + SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode; + } + + pSkipList->lastKey = SL_GET_NODE_KEY(pSkipList, pNode); + + atomic_add_fetch_32(&pSkipList->size, 1); + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + + return pNode; +} + +SSkipListNode* tSkipListDoGet(SSkipList *pSkipList, SSkipListKey skey) { + SSkipListNode *pNode = pSkipList->pHead; + SSkipListNode *pRes = NULL; + + if (pSkipList->lock) { + pthread_rwlock_rdlock(pSkipList->lock); + } + +#if SKIP_LIST_RECORD_PERFORMANCE + pSkipList->state.queryCount++; +#endif + + __compar_fn_t cmparFn = getComparFunc(pSkipList->keyInfo.type, 0); + + int32_t ret = -1; + for (int32_t i = pSkipList->level - 1; i >= 0; --i) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); + while (p != pSkipList->pTail) { + char *key = SL_GET_NODE_KEY(pSkipList, p); + + if ((ret = cmparFn(key, skey)) < 0) { + pNode = p; + p = SL_GET_FORWARD_POINTER(p, i); + } else { + break; + } + } + + // find the qualified key + if (ret == 0) { + pRes = SL_GET_FORWARD_POINTER(pNode, i); + break; + // skip list does not allowed duplicated key, abort further retrieve data +// if (!pSkipList->keyInfo.dupKey) { +// break; +// } + } + } + + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + + return pRes; +} + SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) { SSkipListIterator* iter = calloc(1, sizeof(SSkipListIterator)); @@ -675,4 +691,4 @@ SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) iter->order = order; return iter; -} +} \ No newline at end of file -- GitLab