From 4a83966f5c7cce0fcaf86dbe3e1410fdf8b6e338 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 22 Sep 2020 10:02:23 +0800 Subject: [PATCH] TD-1194 --- src/tsdb/inc/tsdbMain.h | 2 +- src/tsdb/src/tsdbMemTable.c | 10 +- src/tsdb/src/tsdbMeta.c | 6 +- src/tsdb/src/tsdbRead.c | 18 +- src/util/inc/tskiplist.h | 190 ++++------------- src/util/src/tskiplist.c | 393 ++++++++++++++---------------------- 6 files changed, 205 insertions(+), 414 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 95b7010038..b515752bc3 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -438,7 +438,7 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { SSkipListNode* node = tSkipListIterGet(pIter); if (node == NULL) return NULL; - return *(SDataRow *)SL_GET_NODE_DATA(node); + return (SDataRow)SL_GET_NODE_DATA(node); } static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index c58460e18f..9c76a09ca3 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -91,11 +91,11 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); - int64_t oldSize = SL_GET_SIZE(pTableData->pData); - if (tSkipListPut(pTableData->pData, (void *)(&pRow), sizeof(void *)) == NULL) { + int64_t oldSize = SL_SIZE(pTableData->pData); + if (tSkipListPut(pTableData->pData, pRow) == NULL) { tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); } else { - int64_t deltaSize = SL_GET_SIZE(pTableData->pData) - oldSize; + int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize; if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; if (pMemTable->keyLast < key) pMemTable->keyLast = key; @@ -427,7 +427,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], - pCfg->update ? SL_APPEND_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey); + pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey); if (pTableData->pData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -447,7 +447,7 @@ static void tsdbFreeTableData(STableData *pTableData) { } } -static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); } +static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); } static void *tsdbCommitData(void *arg) { STsdbRepo * pRepo = (STsdbRepo *)arg; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 76f9d51b87..391cb55985 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -643,7 +643,7 @@ static void tsdbOrgMeta(void *pHandle) { } static char *getTagIndexKey(const void *pData) { - STable *pTable = *(STable **)pData; + STable *pTable = (STable *)pData; STSchema *pSchema = tsdbGetTableTagSchema(pTable); STColumn *pCol = schemaColAt(pSchema, DEFAULT_TAG_INDEX_COLUMN); @@ -900,7 +900,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper pTable->pSuper = pSTable; - tSkipListPut(pSTable->pIndex, (void *)(&pTable), sizeof(STable *)); + tSkipListPut(pSTable->pIndex, (void *)pTable); if (refSuper) T_REF_INC(pSTable); return 0; @@ -1182,7 +1182,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) { tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM); } else { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { - tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (SL_GET_SIZE(pTable->pIndex) + 1)); + tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (SL_SIZE(pTable->pIndex) + 1)); } else { tlen = sizeof(SListNode) + sizeof(SActObj); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 320f1c3765..f390208af1 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -386,7 +386,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); assert(node != NULL); - SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node); + SDataRow row = (SDataRow)SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p", @@ -408,7 +408,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); assert(node != NULL); - SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node); + SDataRow row = (SDataRow)SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p", @@ -438,14 +438,14 @@ static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { - rmem = *(SDataRow *)SL_GET_NODE_DATA(node); + rmem = (SDataRow)SL_GET_NODE_DATA(node); } } if (pCheckInfo->iiter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { - rimem = *(SDataRow *)SL_GET_NODE_DATA(node); + rimem = (SDataRow)SL_GET_NODE_DATA(node); } } @@ -1358,8 +1358,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* * copy them all to result buffer, since it may be overlapped with file data block. */ if (node == NULL || - ((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || - ((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { + ((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || + ((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { // no data in cache or data in cache is greater than the ekey of time window, load data from file block if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; @@ -1864,9 +1864,9 @@ static int32_t getAllTableList(STable* pSuperTable, SArray* list) { while (tSkipListIterNext(iter)) { SSkipListNode* pNode = tSkipListIterGet(iter); - STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode); + STable* pTable = (STable*) SL_GET_NODE_DATA((SSkipListNode*) pNode); - STableKeyInfo info = {.pTable = *pTable, .lastKey = TSKEY_INITIAL_VAL}; + STableKeyInfo info = {.pTable = pTable, .lastKey = TSKEY_INITIAL_VAL}; taosArrayPush(list, &info); } @@ -2434,7 +2434,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC static bool indexedNodeFilterFp(const void* pNode, void* param) { tQueryInfo* pInfo = (tQueryInfo*) param; - STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); + STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); char* val = NULL; diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index f1e09501e9..1eccac6646 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -30,43 +30,27 @@ extern "C" { // For key property setting #define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists #define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key -#define SL_UPDATA_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert -#define SL_APPEND_DUP_KEY (uint8_t)0x3 // Update duplicate key by append +#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert // For thread safety setting #define SL_THREAD_SAFE (uint8_t)0x4 typedef char *SSkipListKey; typedef char *(*__sl_key_fn_t)(const void *); -/** - * the skip list node is located in a consecutive memory area, - * the format of skip list node is as follows: - * +------------+-----------------------+------------------------+-----+------+ - * | node level | forward pointer array | backward pointer array | key | data | - * +------------+-----------------------+------------------------+-----+------+ - */ typedef struct SSkipListNode { - uint8_t level; + uint8_t level; + uint8_t flags; + void * pData; + struct SSkipListNode *forwards[]; } SSkipListNode; -#define SL_IS_THREAD_SAFE(flags) ((flags)&SL_THREAD_SAFE) -#define SL_DUP_MODE(flags) ((flags) & ((((uint8_t)1) << 2) - 1)) - -#define SL_NODE_HEADER_SIZE(_l) (sizeof(SSkipListNode) + ((_l) << 1u) * POINTER_BYTES) - -#define SL_GET_FORWARD_POINTER(n, _l) ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode)))[(_l)] -#define SL_GET_BACKWARD_POINTER(n, _l) \ - ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)] +#define SL_NODE_DELETED_FLAG (uint8_t)0x1 -#define SL_GET_NODE_DATA(n) ((char *)(n) + SL_NODE_HEADER_SIZE((n)->level)) -#define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n))) - -#define SL_GET_SL_MIN_KEY(s) (SL_GET_NODE_KEY((s), SL_GET_FORWARD_POINTER((s)->pHead, 0))) -#define SL_GET_SL_MAX_KEY(s) (SL_GET_NODE_KEY((s), SL_GET_BACKWARD_POINTER((s)->pTail, 0))) - -#define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n)) -#define SL_GET_SIZE(s) (s)->size -#define SL_GET_TSIZE(s) (s)->tsize +#define SL_GET_NODE_DATA(n) (n)->pData +#define SL_IS_NODE_DELETED(n) ((n)->flags & SL_NODE_DELETED_FLAG) +#define SL_SET_NODE_DELETED(n) (n)->flags |= SL_NODE_DELETED_FLAG +#define SL_NODE_GET_FORWARD_POINTER(n, l) (n)->forwards[(l)] +#define SL_NODE_GET_BACKWARD_POINTER(n, l) (n)->forwards[(n)->level + (l)] /* * @version 0.3 @@ -116,13 +100,6 @@ typedef struct tSkipListState { uint64_t nTotalElapsedTimeForInsert; } tSkipListState; -typedef struct SSkipListKeyInfo { - uint8_t dupKey : 2; // if allow duplicated key in the skip list - uint8_t type : 4; // key type - uint8_t freeNode:2; // free node when destroy the skiplist - uint8_t len; // maximum key length, used in case of string key -} SSkipListKeyInfo; - typedef struct SSkipList { __compar_fn_t comparFn; __sl_key_fn_t keyFn; @@ -130,10 +107,10 @@ typedef struct SSkipList { uint16_t len; uint8_t maxLevel; uint8_t flags; - uint8_t type; // static info above + uint8_t type; // static info above uint8_t level; - uint32_t size; // not including duplicate keys - uint32_t tsize; // including duplicate keys + uint32_t size; // semantic meaning of size + uint32_t tsize; // # of all skiplist nodes in this SL SSkipListNode * pHead; // point to the first element SSkipListNode * pTail; // point to the last element #if SKIP_LIST_RECORD_PERFORMANCE @@ -141,130 +118,33 @@ typedef struct SSkipList { #endif } SSkipList; -/* - * iterate the skiplist - * this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may - * continue iterating the skiplist, so add the reference count for skiplist - * TODO add the ref for skip list when one iterator is created - */ typedef struct SSkipListIterator { SSkipList * pSkipList; SSkipListNode *cur; - int32_t step; // the number of nodes that have been checked already - int32_t order; // order of the iterator + int32_t step; // the number of nodes that have been checked already + int32_t order; // order of the iterator } SSkipListIterator; -/** - * - * @param nMaxLevel maximum skip list level - * @param keyType type of key - * @param dupKey allow the duplicated key in the skip list - * @return - */ -SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint16_t keyLen, uint8_t flags, __sl_key_fn_t fn); - -/** - * - * @param pSkipList - * @return NULL will always be returned - */ -void *tSkipListDestroy(SSkipList *pSkipList); - -/** - * - * @param pSkipList - * @param level - * @param headSize - */ -void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize); - -/** - * put the data into the skiplist - * If failed, NULL will be returned, otherwise, the pNode will be returned. - * - * @param pSkipList - * @param pData - * @param dataLen - * @return - */ -SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData, int dataLen); - -/** - * put the skip list node into the skip list. - * If failed, NULL will be returned, otherwise, the pNode will be returned. - * - * @param pSkipList - * @param pNode - * @return - */ -SSkipListNode *tSkipListPutNode(SSkipList *pSkipList, SSkipListNode *pNode); - -/** - * get *all* nodes which key are equivalent to pKey - * - * @param pSkipList - * @param pKey - * @return - */ -SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); - -/** - * display skip list of the given level, for debug purpose only - * @param pSkipList - * @param nlevel - */ -void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel); - -/** - * create skiplist iterator - * @param pSkipList - * @return - */ +#define SL_IS_THREAD_SAFE(s) (((s)->flags) & SL_THREAD_SAFE) +#define SL_DUP_MODE(s) (((s)->flags) & ((((uint8_t)1) << 2) - 1)) +#define SL_GET_NODE_KEY(s, n) ((s)->keyFn((n)->pData)) +#define SL_GET_MIN_KEY(s) SL_GET_NODE_KEY(s, SL_NODE_GET_FORWARD_POINTER((s)->pHead, 0)) +#define SL_GET_MAX_KEY(s) SL_GET_NODE_KEY((s), SL_NODE_GET_BACKWARD_POINTER((s)->pTail, 0)) +#define SL_SIZE(s) (s)->size +#define SL_TSIZE(s) (s)->tsize + +SSkipList * tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint16_t keyLen, uint8_t flags, __sl_key_fn_t fn); +void tSkipListDestroy(SSkipList *pSkipList); +SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData); +SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); +void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel); SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList); - -/** - * create skip list iterator from the given node and specified the order - * @param pSkipList - * @param pNode start position, instead of the first node in skip list - * @param order traverse order of the iterator - * @return - */ -SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* val, int32_t type, int32_t order); - -/** - * forward the skip list iterator - * @param iter - * @return - */ -bool tSkipListIterNext(SSkipListIterator *iter); - -/** - * get the element of skip list node - * @param iter - * @return - */ -SSkipListNode *tSkipListIterGet(SSkipListIterator *iter); - -/** - * destroy the skip list node - * @param iter - * @return - */ -void *tSkipListDestroyIter(SSkipListIterator *iter); - -/* - * remove nodes of the pKey value. - * If more than one node has the same value, all will be removed - * - * @Return - * the count of removed nodes - */ -uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key); - -/* - * remove the specified node in parameters - */ -void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode); +SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *val, int32_t type, int32_t order); +bool tSkipListIterNext(SSkipListIterator *iter); +SSkipListNode * tSkipListIterGet(SSkipListIterator *iter); +void * tSkipListDestroyIter(SSkipListIterator *iter); +uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key); +void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode); #ifdef __cplusplus } diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index c3e8ee41bd..3da137140f 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -19,30 +19,26 @@ #include "tulog.h" #include "tutil.h" -#define DO_MEMSET_PTR_AREA(n) \ - do { \ - int32_t _l = (n)->level; \ - memset(pNode, 0, SL_NODE_HEADER_SIZE(_l)); \ - (n)->level = _l; \ - } while (0) - -static int initForwardBackwardPtr(SSkipList *pSkipList); -static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order); -static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); -static void tSkipListCorrectLevel(SSkipList *pSkipList); +#define DO_MEMSET_PTR_AREA(n) memset((n)->forwards, 0, ((n)->level * 2)) + +static int initForwardBackwardPtr(SSkipList *pSkipList); +static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, int32_t order); +static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); +static void tSkipListCorrectLevel(SSkipList *pSkipList); static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); +static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode); +static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, void *pData); +static SSkipListNode * tSkipListNewNode(uint8_t level); +#define tSkipListFreeNode(n) taosTFree((n)) static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList); static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList); static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList); static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList); -static FORCE_INLINE SSkipListNode *tSkipListPutNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, uint8_t flags, __sl_key_fn_t fn) { SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); - if (pSkipList == NULL) { - return NULL; - } + if (pSkipList == NULL) return NULL; if (maxLevel > MAX_SKIP_LIST_LEVEL) { maxLevel = MAX_SKIP_LIST_LEVEL; @@ -55,19 +51,20 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, u pSkipList->keyFn = fn; pSkipList->comparFn = getKeyComparFunc(keyType); - // pSkipList->level = 1; // TODO: check if 1 is valid if (initForwardBackwardPtr(pSkipList) < 0) { - taosTFree(pSkipList); + tSkipListDestroy(pSkipList); return NULL; } - if (SL_IS_THREAD_SAFE(flags)) { + if (SL_IS_THREAD_SAFE(pSkipList)) { pSkipList->lock = (pthread_rwlock_t *)calloc(1, sizeof(pthread_rwlock_t)); + if (pSkipList->lock == NULL) { + tSkipListDestroy(pSkipList); + return NULL; + } if (pthread_rwlock_init(pSkipList->lock, NULL) != 0) { - taosTFree(pSkipList->pHead); - taosTFree(pSkipList); - + tSkipListDestroy(pSkipList); return NULL; } } @@ -81,17 +78,17 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, u return pSkipList; } -void *tSkipListDestroy(SSkipList *pSkipList) { - if (pSkipList == NULL) return NULL; +void tSkipListDestroy(SSkipList *pSkipList) { + if (pSkipList == NULL) return; tSkipListWLock(pSkipList); - SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); + SSkipListNode *pNode = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, 0); while (pNode != pSkipList->pTail) { SSkipListNode *pTemp = pNode; - pNode = SL_GET_FORWARD_POINTER(pNode, 0); - taosTFree(pTemp); + pNode = SL_NODE_GET_FORWARD_POINTER(pNode, 0); + tSkipListFreeNode(pTemp); } tSkipListUnlock(pSkipList); @@ -100,42 +97,35 @@ void *tSkipListDestroy(SSkipList *pSkipList) { taosTFree(pSkipList->lock); } - taosTFree(pSkipList->pHead); + tSkipListFreeNode(pSkipList->pHead); + tSkipListFreeNode(pSkipList->pTail); taosTFree(pSkipList); - return NULL; -} - -void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) { - if (pSkipList == NULL) { - *level = 1; - *headSize = SL_NODE_HEADER_SIZE(*level); - return; - } - - *level = getSkipListRandLevel(pSkipList); - *headSize = SL_NODE_HEADER_SIZE(*level); } -SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData, int dataLen) { +SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { if (pSkipList == NULL || pData == NULL) return NULL; + SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + uint8_t dupMode = SL_DUP_MODE(pSkipList); + SSkipListNode *pNode = NULL; + tSkipListWLock(pSkipList); - int32_t level = getSkipListRandLevel(pSkipList); + bool hasDup = tSkipListGetPosToPut(pSkipList, forward, pData); - SSkipListNode *pNode = (SSkipListNode *)malloc(SL_NODE_HEADER_SIZE(level) + dataLen); - if (pNode == NULL) { - tSkipListUnlock(pSkipList); - return NULL; - } - - pNode->level = level; - memcpy(SL_GET_NODE_DATA(pNode), pData, dataLen); + if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) { + if (dupMode == SL_UPDATE_DUP_KEY) { + pNode = SL_NODE_GET_FORWARD_POINTER(forward[0], 0); + atomic_store_ptr(&(pNode->pData), pData); + pNode->flags &= (~(SL_NODE_DELETED_FLAG)); + } + } else { + pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList)); + if (pNode != NULL) { + pNode->pData = pData; - if (tSkipListPutNodeImpl(pSkipList, pNode) == NULL) { - tSkipListUnlock(pSkipList); - taosTFree(pNode); - return NULL; + tSkipListDoInsert(pSkipList, forward, pNode); + } } tSkipListUnlock(pSkipList); @@ -143,66 +133,56 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData, int dataLen) { return pNode; } -SSkipListNode *tSkipListPutNode(SSkipList *pSkipList, SSkipListNode *pNode) { - SSkipListNode *pRetNode = NULL; - - if (pSkipList == NULL || pNode == NULL) return NULL; +uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { + uint32_t count = 0; tSkipListWLock(pSkipList); - pRetNode = tSkipListPutNodeImpl(pSkipList, pNode); - tSkipListUnlock(pSkipList); - - return pRetNode; -} - -SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { - SArray *sa = taosArrayInit(1, POINTER_BYTES); - - tSkipListRLock(pSkipList); SSkipListNode *pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); while (1) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); + SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pNode, 0); if (p == pSkipList->pTail) { break; } if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) { break; } - taosArrayPush(sa, &p); - pNode = p; + + tSkipListRemoveNodeImpl(pSkipList, p); + + ++count; } + tSkipListCorrectLevel(pSkipList); + tSkipListUnlock(pSkipList); - return sa; + return count; } -uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { - uint32_t count = 0; +SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { + SArray *sa = taosArrayInit(1, POINTER_BYTES); - tSkipListWLock(pSkipList); + tSkipListRLock(pSkipList); SSkipListNode *pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); while (1) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); + SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pNode, 0); if (p == pSkipList->pTail) { break; } if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) { break; } - - tSkipListRemoveNodeImpl(pSkipList, p); - - ++count; + if (!SL_IS_NODE_DELETED(p)) { + taosArrayPush(sa, &p); + } + pNode = p; } - tSkipListCorrectLevel(pSkipList); - tSkipListUnlock(pSkipList); - return count; + return sa; } void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { @@ -240,68 +220,22 @@ bool tSkipListIterNext(SSkipListIterator *iter) { if (iter->pSkipList == NULL) return false; SSkipList *pSkipList = iter->pSkipList; - uint8_t dupMod = SL_DUP_MODE(pSkipList->flags); tSkipListRLock(pSkipList); - if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate - if (dupMod == SL_APPEND_DUP_KEY) { - if (iter->cur == pSkipList->pHead) { - iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); - iter->step++; - } else { - while (true) { - iter->step++; - SSkipListNode *pNode = iter->cur; - iter->cur = SL_GET_FORWARD_POINTER(pNode, 0); - - if (iter->cur == pSkipList->pTail) break; - if (pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, pNode), SL_GET_NODE_KEY(pSkipList, iter->cur)) == 0) { - continue; - } else { - break; - } - } - } - - if (iter->cur != pSkipList->pTail) { - while (true) { - SSkipListNode *pNode = SL_GET_FORWARD_POINTER(iter->cur, 0); - if (pNode == pSkipList->pTail) break; - if (pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, pNode), SL_GET_NODE_KEY(pSkipList, iter->cur)) == 0) { - iter->step++; - iter->cur = pNode; - } else { - break; - } - } - } - } else { - iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); + if (iter->order == TSDB_ORDER_ASC) { + while (true) { + iter->cur = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0); iter->step++; + if (iter->cur == pSkipList->pTail) break; + if (!SL_IS_NODE_DELETED(iter->cur)) break; } - } else { // descending order iterate - if (dupMod == SL_APPEND_DUP_KEY) { - if (iter->cur == pSkipList->pTail) { - iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); - iter->step++; - } else { - while (true) { - SSkipListNode *pNode = iter->cur; - iter->cur = SL_GET_BACKWARD_POINTER(pNode, 0); - - if (iter->cur == pSkipList->pHead) break; - if (pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, pNode), SL_GET_NODE_KEY(pSkipList, iter->cur)) == 0) { - iter->cur = pNode; - continue; - } else { - break; - } - } - } - } else { - iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); + } else { + while (true) { + iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0); iter->step++; + if (iter->cur == pSkipList->pHead) break; + if (!SL_IS_NODE_DELETED(iter->cur)) break; } } @@ -332,7 +266,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { return; } - SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); + SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); int32_t id = 1; char * prev = NULL; @@ -364,35 +298,34 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { prev = SL_GET_NODE_KEY(pSkipList, p); - p = SL_GET_FORWARD_POINTER(p, nlevel - 1); + p = SL_NODE_GET_FORWARD_POINTER(p, nlevel - 1); } } -static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode, bool hasDupKey) { - uint8_t dupMode = SL_DUP_MODE(pSkipList->flags); - - // FIXME: this may cause the level of skiplist change - if (dupMode == SL_UPDATA_DUP_KEY && hasDupKey) { - tSkipListRemoveNodeImpl(pSkipList, forward[0]); - tSkipListCorrectLevel(pSkipList); - } - +static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { DO_MEMSET_PTR_AREA(pNode); for (int32_t i = 0; i < pNode->level; ++i) { - SSkipListNode *x = forward[i]; - SL_GET_BACKWARD_POINTER(pNode, i) = x; + if (i >= pSkipList->level) { + SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail; + SL_NODE_GET_BACKWARD_POINTER(pNode, i) = pSkipList->pHead; + SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode; + SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode; + } else { + SSkipListNode *x = forward[i]; + SL_NODE_GET_BACKWARD_POINTER(pNode, i) = x; - SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i); - SL_GET_BACKWARD_POINTER(next, i) = pNode; + SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(x, i); + SL_NODE_GET_BACKWARD_POINTER(next, i) = pNode; - SL_GET_FORWARD_POINTER(pNode, i) = next; - SL_GET_FORWARD_POINTER(x, i) = pNode; + SL_NODE_GET_FORWARD_POINTER(pNode, i) = next; + SL_NODE_GET_FORWARD_POINTER(x, i) = pNode; + } } - if (!(dupMode == SL_APPEND_DUP_KEY && hasDupKey)) { - pSkipList->size += 1; - } + if (pSkipList->level < pNode->level) pSkipList->level = pNode->level; + + pSkipList->size += 1; pSkipList->tsize += 1; } @@ -411,78 +344,73 @@ static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t } static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList) { - if (SL_IS_THREAD_SAFE(pSkipList->flags)) { + if (pSkipList->lock) { return pthread_rwlock_wrlock(pSkipList->lock); } return 0; } static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList) { - if (SL_IS_THREAD_SAFE(pSkipList->flags)) { + if (pSkipList->lock) { return pthread_rwlock_rdlock(pSkipList->lock); } return 0; } static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) { - if (SL_IS_THREAD_SAFE(pSkipList->flags)) { + if (pSkipList->lock) { return pthread_rwlock_unlock(pSkipList->lock); } return 0; } -static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { +static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, void *pData) { int compare = 1; bool hasDupKey = false; - char * pNodeKey = SL_GET_NODE_KEY(pSkipList, pNode); - uint8_t dupMode = SL_DUP_MODE(pSkipList->flags); + char * pDataKey = pSkipList->keyFn(pData); if (pSkipList->size == 0) { - for (int i = 0; i < pNode->level; i++) { + for (int i = 0; i < pSkipList->level; i++) { forward[i] = pSkipList->pHead; } } else { char *pKey = NULL; // Compare min key - pKey = SL_GET_SL_MIN_KEY(pSkipList); - compare = pSkipList->comparFn(pNodeKey, pKey); - if ((dupMode == SL_APPEND_DUP_KEY && compare < 0) || (dupMode != SL_APPEND_DUP_KEY && compare <= 0)) { - for (int i = 0; i < pNode->level; i++) { + pKey = SL_GET_MIN_KEY(pSkipList); + compare = pSkipList->comparFn(pDataKey, pKey); + if (compare <= 0) { + for (int i = 0; i < pSkipList->level; i++) { forward[i] = pSkipList->pHead; } + return (compare == 0); } // Compare max key - pKey = SL_GET_SL_MAX_KEY(pSkipList); - compare = pSkipList->comparFn(pNodeKey, pKey); - if ((dupMode == SL_DISCARD_DUP_KEY && compare > 0) || (dupMode != SL_DISCARD_DUP_KEY && compare >= 0)) { - for (int i = 0; i < pNode->level; i++) { - forward[i] = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i); + pKey = SL_GET_MAX_KEY(pSkipList); + compare = pSkipList->comparFn(pDataKey, pKey); + if (compare > 0) { + for (int i = 0; i < pSkipList->level; i++) { + forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i); } return (compare == 0); } SSkipListNode *px = pSkipList->pHead; - for (int i = pNode->level - 1; i >= 0; --i) { - if (i >= pSkipList->level) { - forward[i] = pSkipList->pHead; - continue; - } - - SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i); + for (int i = pSkipList->level - 1; i >= 0; --i) { + SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i); while (p != pSkipList->pTail) { pKey = SL_GET_NODE_KEY(pSkipList, p); - compare = pSkipList->comparFn(pKey, pNodeKey); - if (compare == 0 && hasDupKey == false) hasDupKey = true; - if ((dupMode == SL_APPEND_DUP_KEY && compare > 0) || (dupMode != SL_APPEND_DUP_KEY && compare >= 0)) { + compare = pSkipList->comparFn(pKey, pDataKey); + if (compare >= 0) { + if (compare == 0 && !hasDupKey) hasDupKey = true; break; } else { px = p; - p = SL_GET_FORWARD_POINTER(px, i); + p = SL_NODE_GET_FORWARD_POINTER(px, i); } } @@ -493,40 +421,35 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, return hasDupKey; } -static bool tSkipListIsNodeDup(SSkipList *pSkipList, SSkipListNode *pNode) { - SSkipListNode *pPrevNode = SL_GET_BACKWARD_POINTER(pNode, 0); - SSkipListNode *pNextNode = SL_GET_FORWARD_POINTER(pNode, 0); - char * pNodeKey = SL_GET_NODE_KEY(pSkipList, pNode); - char * pPrevNodeKey = (pPrevNode == pSkipList->pHead) ? NULL : SL_GET_NODE_KEY(pSkipList, pPrevNode); - char * pNextNodeKey = (pNextNode == pSkipList->pTail) ? NULL : SL_GET_NODE_KEY(pSkipList, pNextNode); - - return ((pPrevNodeKey != NULL && pSkipList->comparFn(pNodeKey, pPrevNodeKey) == 0) || - (pNextNodeKey != NULL && pSkipList->comparFn(pNodeKey, pNextNodeKey) == 0)); -} - static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) { int32_t level = pNode->level; - uint8_t dupMode = SL_DUP_MODE(pSkipList->flags); + uint8_t dupMode = SL_DUP_MODE(pSkipList); - bool sizeReduce = !(dupMode == SL_APPEND_DUP_KEY && tSkipListIsNodeDup(pSkipList, pNode)); + if (dupMode == SL_UPDATE_DUP_KEY) { + if (SL_IS_NODE_DELETED(pNode)) { + return; + } else { + SL_SET_NODE_DELETED(pNode); + pSkipList->size--; + } + } else { + for (int32_t j = level - 1; j >= 0; --j) { + SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(pNode, j); + SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(pNode, j); - for (int32_t j = level - 1; j >= 0; --j) { - SSkipListNode *prev = SL_GET_BACKWARD_POINTER(pNode, j); - SSkipListNode *next = SL_GET_FORWARD_POINTER(pNode, j); + SL_NODE_GET_FORWARD_POINTER(prev, j) = next; + SL_NODE_GET_BACKWARD_POINTER(next, j) = prev; + } - SL_GET_FORWARD_POINTER(prev, j) = next; - SL_GET_BACKWARD_POINTER(next, j) = prev; + tSkipListFreeNode(pNode); + pSkipList->size--; + pSkipList->tsize--; } - - taosTFree(pNode); - - if (sizeReduce) pSkipList->size--; - pSkipList->tsize--; } // Function must be called after calling tSkipListRemoveNodeImpl() function static void tSkipListCorrectLevel(SSkipList *pSkipList) { - while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == pSkipList->pTail) { + while (pSkipList->level > 0 && SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == pSkipList->pTail) { pSkipList->level -= 1; } } @@ -587,12 +510,12 @@ static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_ if (order == TSDB_ORDER_ASC) { pNode = pSkipList->pHead; for (int32_t i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); + SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pNode, i); while (p != pSkipList->pTail) { char *key = SL_GET_NODE_KEY(pSkipList, p); if (comparFn(key, val) < 0) { pNode = p; - p = SL_GET_FORWARD_POINTER(p, i); + p = SL_NODE_GET_FORWARD_POINTER(p, i); } else { break; } @@ -601,12 +524,12 @@ static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_ } else { pNode = pSkipList->pTail; for (int32_t i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_GET_BACKWARD_POINTER(pNode, i); + SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(pNode, i); while (p != pSkipList->pHead) { char *key = SL_GET_NODE_KEY(pSkipList, p); if (comparFn(key, val) > 0) { pNode = p; - p = SL_GET_BACKWARD_POINTER(p, i); + p = SL_NODE_GET_BACKWARD_POINTER(p, i); } else { break; } @@ -621,46 +544,34 @@ static int initForwardBackwardPtr(SSkipList *pSkipList) { uint32_t maxLevel = pSkipList->maxLevel; // head info - pSkipList->pHead = (SSkipListNode *)malloc(SL_NODE_HEADER_SIZE(maxLevel) * 2); - if (pSkipList->pHead == NULL) { - return -1; - } - - pSkipList->pHead->level = maxLevel; + pSkipList->pHead = tSkipListNewNode(maxLevel); + if (pSkipList->pHead == NULL) return -1; // tail info - pSkipList->pTail = (SSkipListNode *)POINTER_SHIFT(pSkipList->pHead, SL_NODE_HEADER_SIZE(maxLevel)); - pSkipList->pTail->level = maxLevel; + pSkipList->pTail = tSkipListNewNode(maxLevel); + if (pSkipList->pTail == NULL) { + tSkipListFreeNode(pSkipList->pHead); + return -1; + } for (uint32_t i = 0; i < maxLevel; ++i) { - SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pSkipList->pTail; - SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pSkipList->pHead; + SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pSkipList->pTail; + SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pSkipList->pHead; } return 0; } -static FORCE_INLINE SSkipListNode *tSkipListPutNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) { - SSkipListNode *pRetNode = NULL; - SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; +static SSkipListNode *tSkipListNewNode(uint8_t level) { + int32_t tsize = sizeof(SSkipListNode) + sizeof(SSkipListNode *) * level * 2; - int hasDupKey = tSkipListGetPosToPut(pSkipList, forward, pNode); - if (SL_DUP_MODE(pSkipList->flags) == SL_DISCARD_DUP_KEY && hasDupKey) { - pRetNode = NULL; - } else { - pRetNode = pNode; - tSkipListDoInsert(pSkipList, forward, pNode, hasDupKey); - - if (pNode->level > pSkipList->level) pSkipList->level = pNode->level; - } + SSkipListNode *pNode = (SSkipListNode *)calloc(1, tsize); + if (pNode == NULL) return NULL; - return pRetNode; + pNode->level = level; + return pNode; } -// static void tSkipListSeek(SSkipList *pSkipList, char *key, int order) { -// // TODO -// } - // static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey, // int32_t cond, SSkipListNode ***pRes) { // pthread_rwlock_rdlock(&pSkipList->lock); @@ -774,7 +685,7 @@ static FORCE_INLINE SSkipListNode *tSkipListPutNodeImpl(SSkipList *pSkipList, SS // } // // // compress the minimum level of skip list -// while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == NULL) { +// while (pSkipList->level > 0 && SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == NULL) { // pSkipList->level -= 1; // } // -- GitLab