diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h new file mode 100644 index 0000000000000000000000000000000000000000..62cb1a23f80d4e513fee2c0e3cc1ba4f37b5b865 --- /dev/null +++ b/src/util/inc/tarray.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TAOSARRAY_H +#define TDENGINE_TAOSARRAY_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" + +#define TARRAY_MIN_SIZE 8 +#define TARRAY_GET_ELEM(array, index) ((array)->pData + (index) * (array)->elemSize) + +typedef struct SArray { + size_t size; + size_t capacity; + size_t elemSize; + + void* pData; +} SArray; + +/** + * + * @param size + * @param elemSize + * @return + */ +void* taosArrayInit(size_t size, size_t elemSize); + +/** + * + * @param pArray + * @param pData + * @return + */ +void* taosArrayPush(SArray* pArray, void* pData); + +/** + * + * @param pArray + */ +void taosArrayPop(SArray* pArray); + +/** + * + * @param pArray + * @param index + * @return + */ +void* taosArrayGet(SArray* pArray, size_t index); + +/** + * + * @param pArray + * @return + */ +size_t taosArrayGetSize(SArray* pArray); + +/** + * + * @param pArray + * @param index + * @param pData + */ +void taosArrayInsert(SArray* pArray, int32_t index, void* pData); + +/** + * + * @param pArray + */ +void taosArrayDestory(SArray* pArray); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TAOSARRAY_H diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index d6cdd77c117762f8e561d2dff72857ce4305dcdf..ca9cc330c84bf7e3065bf75d3daa57312b284426 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -20,59 +20,62 @@ extern "C" { #endif -#define MAX_SKIP_LIST_LEVEL 20 - -#include -#include -#include - #include "os.h" #include "ttypes.h" +#include "tarray.h" /* * key of each node * todo move to as the global structure in all search codes... */ +#define MAX_SKIP_LIST_LEVEL 15 +#define SKIP_LIST_RECORD_PERFORMANCE 0 + +typedef char *SSkipListKey; +typedef char *(*__sl_key_fn_t)(const void *); + +/** + * the format of skip list node is as follows: + * +------------+-----------------------+------------------------+-----+------+ + * | node level | forward pointer array | backward pointer array | key | data | + * +------------+-----------------------+------------------------+-----+------+ + * the skiplist node is located in a consecutive memory area, key will not be copy to skip list + */ +typedef struct SSkipListNode { + uint8_t level; +} SSkipListNode; -const static size_t SKIP_LIST_STR_KEY_LENGTH_THRESHOLD = 15; -typedef tVariant tSkipListKey; - -typedef enum tSkipListPointQueryType { - INCLUDE_POINT_QUERY, - EXCLUDE_POINT_QUERY, -} tSkipListPointQueryType; +#define SL_NODE_HEADER_SIZE(_l) (sizeof(SSkipListNode) + ((_l) << 1u) * POINTER_BYTES) -typedef struct tSkipListNode { - uint16_t nLevel; - char * pData; +#define SL_GET_FORWARD_POINTER(n, _l) ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode)))[(_l)] +#define SL_GET_BACKWARD_POINTER(n, _l) \ + ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)] - struct tSkipListNode **pForward; - struct tSkipListNode **pBackward; +#define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level)) +#define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n))) - tSkipListKey key; -} tSkipListNode; +#define SL_GET_NODE_LEVEL(n) *(int32_t *)((n)) /* - * @version 0.2 + * @version 0.3 * @date 2017/11/12 - * the simple version of SkipList. + * the simple version of skip list. + * * for multi-thread safe purpose, we employ pthread_rwlock_t to guarantee to generate - * deterministic result. Later, we will remove the lock in SkipList to further - * enhance the performance. In this case, one should use the concurrent skip list (by - * using michael-scott algorithm) instead of this simple version in a multi-thread - * environment, to achieve higher performance of read/write operations. + * deterministic result. Later, we will remove the lock in SkipList to further enhance the performance. + * In this case, one should use the concurrent skip list (by using michael-scott algorithm) instead of + * this simple version in a multi-thread environment, to achieve higher performance of read/write operations. * * Note: Duplicated primary key situation. * In case of duplicated primary key, two ways can be employed to handle this situation: - * 1. add as normal insertion with out special process. - * 2. add an overflow pointer at each list node, all nodes with the same key will be added - * in the overflow pointer. In this case, the total steps of each search will be reduced significantly. + * 1. add as normal insertion without special process. + * 2. add an overflow pointer at each list node, all nodes with the same key will be added in the overflow pointer. + * In this case, the total steps of each search will be reduced significantly. * Currently, we implement the skip list in a line with the first means, maybe refactor it soon. * * Memory consumption: the memory alignment causes many memory wasted. So, employ a memory * pool will significantly reduce the total memory consumption, as well as the calloc/malloc operation costs. * - * 3. use the iterator pattern to refactor all routines to make it more clean */ // state struct, record following information: @@ -81,7 +84,7 @@ typedef struct tSkipListNode { // avg search rsp time, for latest 1000 queries // total memory size typedef struct tSkipListState { - // in bytes, sizeof(tSkipList)+sizeof(tSkipListNode)*tSkipList->nSize + // in bytes, sizeof(SSkipList)+sizeof(SSkipListNode)*SSkipList->nSize uint64_t nTotalMemSize; uint64_t nLevelNodeCnt[MAX_SKIP_LIST_LEVEL]; uint64_t queryCount; // total query count @@ -101,68 +104,95 @@ typedef struct tSkipListState { uint64_t nTotalElapsedTimeForInsert; } tSkipListState; -typedef struct tSkipList { - tSkipListNode pHead; - uint64_t nSize; - uint16_t nMaxLevel; - uint16_t nLevel; - uint16_t keyType; - uint16_t nMaxKeyLen; +typedef struct SSkipListKeyInfo { + uint8_t dupKey : 2; // if allow duplicated key in the skip list + uint8_t type : 6; // key type + uint8_t len; // maximum key length, used in case of string key +} SSkipListKeyInfo; + +typedef struct SSkipList { + __compar_fn_t comparFn; + __sl_key_fn_t keyFn; + uint32_t size; + uint8_t maxLevel; + uint8_t level; + SSkipListKeyInfo keyInfo; + + pthread_rwlock_t *lock; + SSkipListNode * pHead; + +#if SKIP_LIST_RECORD_PERFORMANCE + tSkipListState state; // skiplist state +#endif - __compar_fn_t comparator; - pthread_rwlock_t lock; // will be removed soon - tSkipListState state; // skiplist state -} tSkipList; +} SSkipList; /* * iterate the skiplist * this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may * continue iterating the skiplist, so add the reference count for skiplist - * TODO add the ref for skiplist when one iterator is created + * TODO add the ref for skip list when one iterator is created */ typedef struct SSkipListIterator { - tSkipList * pSkipList; - tSkipListNode *cur; + SSkipList * pSkipList; + SSkipListNode *cur; int64_t num; } SSkipListIterator; -/* - * query condition structure to denote the range query - * todo merge the point query cond with range query condition +/** + * + * @param nMaxLevel maximum skip list level + * @param keyType type of key + * @param dupKey allow the duplicated key in the skip list + * @return */ -typedef struct tSKipListQueryCond { - // when the upper bounding == lower bounding, it is a point query - tSkipListKey lowerBnd; - tSkipListKey upperBnd; - int32_t lowerBndRelOptr; // relation operator to denote if lower bound is - int32_t upperBndRelOptr; // included or not -} tSKipListQueryCond; - -tSkipList *tSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen); - -void *tSkipListDestroy(tSkipList *pSkipList); - -// create skip list key -tSkipListKey tSkipListCreateKey(int32_t type, char *val, size_t keyLength); +SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t threadsafe, + __sl_key_fn_t fn); -// destroy skip list key -void tSkipListDestroyKey(tSkipListKey *pKey); +/** + * + * @param pSkipList + * @return NULL will always be returned + */ +void *tSkipListDestroy(SSkipList *pSkipList); -// put data into skiplist -tSkipListNode *tSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey); +/** + * + * @param pSkipList + * @param level + * @param headSize + */ +void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize); -/* - * get only *one* node of which key is equalled to pKey, even there are more - * than one nodes are of the same key +/** + * put the skip list node into the skip list. + * If failed, NULL will be returned, otherwise, the pNode will be returned. + * + * @param pSkipList + * @param pNode + * @return */ -tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey); +SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode); -/* - * get all data with the same keys +/** + * get only *one* node of which key is equalled to pKey, even there are more than one nodes are of the same key + * + * @param pSkipList + * @param pKey + * @param keyType + * @return */ -int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes); +SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType); -int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *), +/** + * + * @param pSkipList + * @param pRes + * @param fp + * @param param + * @return + */ +int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *), void *param); /* @@ -173,30 +203,16 @@ int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool ( * true: one node has been removed * false: no node has been removed */ -bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey); +bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey); /* * remove the specified node in parameters */ -void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode); - -// for debug purpose only -void tSkipListPrint(tSkipList *pSkipList, int16_t nlevel); - -/* - * range query & single point query function - */ -int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult); - -/* - * include/exclude point query - */ -int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type, - tSkipListNode ***pResult); +void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode); -int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator *iter); -bool tSkipListIteratorNext(SSkipListIterator *iter); -tSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter); +int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter); +bool tSkipListIteratorNext(SSkipListIterator *iter); +SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter); #ifdef __cplusplus } diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c new file mode 100755 index 0000000000000000000000000000000000000000..017c5422c128d7fd787e8f30665bddafb5df5d96 --- /dev/null +++ b/src/util/src/tarray.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tarray.h" + +void* taosArrayInit(size_t size, size_t elemSize) { + assert(elemSize > 0); + + if (size < TARRAY_MIN_SIZE) { + size = TARRAY_MIN_SIZE; + } + + SArray* pArray = calloc(1, sizeof(SArray)); + if (pArray == NULL) { + return NULL; + } + + pArray->pData = calloc(size, elemSize * size); + if (pArray->pData == NULL) { + free(pArray); + return NULL; + } + + pArray->capacity = size; + pArray->elemSize = elemSize; + return pArray; +} + +static void taosArrayResize(SArray* pArray) { + assert(pArray->size >= pArray->capacity); + + size_t size = pArray->capacity; + size = (size << 1u); + + void* tmp = realloc(pArray->pData, size * pArray->elemSize); + if (tmp == NULL) { + // todo + } + + pArray->pData = tmp; + pArray->capacity = size; +} + +void* taosArrayPush(SArray* pArray, void* pData) { + if (pArray == NULL || pData == NULL) { + return NULL; + } + + if (pArray->size >= pArray->capacity) { + taosArrayResize(pArray); + } + + void* dst = TARRAY_GET_ELEM(pArray, pArray->size); + memcpy(dst, pData, pArray->elemSize); + + pArray->size += 1; + return dst; +} + +void taosArrayPop(SArray* pArray) { + if (pArray == NULL || pArray->size == 0) { + return; + } + + pArray->size -= 1; +} + +void* taosArrayGet(SArray* pArray, size_t index) { + assert(index < pArray->size); + return TARRAY_GET_ELEM(pArray, index); +} + +size_t taosArrayGetSize(SArray* pArray) { return pArray->size; } + +void taosArrayInsert(SArray* pArray, int32_t index, void* pData) { + if (pArray == NULL || pData == NULL) { + return; + } + + if (index >= pArray->size) { + taosArrayPush(pArray, pData); + return; + } + + if (pArray->size >= pArray->capacity) { + taosArrayResize(pArray); + } + + void* dst = TARRAY_GET_ELEM(pArray, index); + + int32_t remain = pArray->size - index; + memmove(dst + pArray->elemSize, dst, pArray->elemSize * remain); + memcpy(dst, pData, pArray->elemSize); + + pArray->size += 1; +} + +void taosArrayDestory(SArray* pArray) { + if (pArray == NULL) { + return; + } + + free(pArray->pData); + free(pArray); +} diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 69a714425375fa7f3771ce6ce4009add77da9ded..c4f0ccab03359193516029a896dd7f0d0f7ab6c3 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -15,49 +15,54 @@ #include "os.h" #include "tlog.h" -#include "taosdef.h" +// #include "tsdb.h" #include "tskiplist.h" #include "tutil.h" -static FORCE_INLINE void recordNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { // record link count in each level - for (int32_t i = 0; i < nLevel; ++i) { +static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level +#if SKIP_LIST_RECORD_PERFORMANCE + for (int32_t i = 0; i < level; ++i) { pSkipList->state.nLevelNodeCnt[i]++; } +#endif } -static FORCE_INLINE void removeNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { - for (int32_t i = 0; i < nLevel; ++i) { +static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) { +#if SKIP_LIST_RECORD_PERFORMANCE + for (int32_t i = 0; i < level; ++i) { pSkipList->state.nLevelNodeCnt[i]--; } +#endif } -static FORCE_INLINE int32_t getSkipListNodeRandomHeight(tSkipList *pSkipList) { +static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) { const uint32_t factor = 4; int32_t n = 1; - while ((rand() % factor) == 0 && n <= pSkipList->nMaxLevel) { + while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) { n++; } return n; } -static FORCE_INLINE int32_t getSkipListNodeLevel(tSkipList *pSkipList) { - int32_t nLevel = getSkipListNodeRandomHeight(pSkipList); - if (pSkipList->nSize == 0) { - nLevel = 1; - pSkipList->nLevel = 1; +static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) { + int32_t level = getSkipListNodeRandomHeight(pSkipList); + if (pSkipList->size == 0) { + level = 1; + pSkipList->level = 1; } else { - if (nLevel > pSkipList->nLevel && pSkipList->nLevel < pSkipList->nMaxLevel) { - nLevel = (++pSkipList->nLevel); + if (level > pSkipList->level && pSkipList->level < pSkipList->maxLevel) { + level = (++pSkipList->level); } } - return nLevel; + return level; } -void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode); +static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t level, SSkipListNode *pNode); -void tSkipListDoRecordPut(tSkipList *pSkipList) { +void tSkipListDoRecordPut(SSkipList *pSkipList) { +#if SKIP_LIST_RECORD_PERFORMANCE const int32_t MAX_RECORD_NUM = 1000; if (pSkipList->state.nInsertObjs == MAX_RECORD_NUM) { @@ -67,61 +72,38 @@ void tSkipListDoRecordPut(tSkipList *pSkipList) { } else { pSkipList->state.nInsertObjs++; } +#endif } -int32_t compareIntVal(const void *pLeft, const void *pRight) { - int64_t lhs = ((tSkipListKey *)pLeft)->i64Key; - int64_t rhs = ((tSkipListKey *)pRight)->i64Key; - - DEFAULT_COMP(lhs, rhs); -} - -int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) { - int64_t lhs = ((tSkipListKey *)pLeft)->i64Key; - double rhs = ((tSkipListKey *)pRight)->dKey; - if (fabs(lhs - rhs) < FLT_EPSILON) { +int32_t compareInt32Val(const void *pLeft, const void *pRight) { + int32_t ret = GET_INT32_VAL(pLeft) - GET_INT32_VAL(pRight); + if (ret == 0) { return 0; } else { - return (lhs > rhs) ? 1 : -1; + return ret > 0 ? 1 : -1; } } -int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) { - double lhs = ((tSkipListKey *)pLeft)->dKey; - int64_t rhs = ((tSkipListKey *)pRight)->i64Key; - if (fabs(lhs - rhs) < FLT_EPSILON) { +int32_t compareInt64Val(const void *pLeft, const void *pRight) { + int32_t ret = GET_INT64_VAL(pLeft) - GET_INT64_VAL(pRight); + if (ret == 0) { return 0; } else { - return (lhs > rhs) ? 1 : -1; + return ret > 0 ? 1 : -1; } } -int32_t compareDoubleVal(const void *pLeft, const void *pRight) { - double ret = (((tSkipListKey *)pLeft)->dKey - ((tSkipListKey *)pRight)->dKey); - if (fabs(ret) < FLT_EPSILON) { +int32_t compareInt16Val(const void *pLeft, const void *pRight) { + int32_t ret = GET_INT16_VAL(pLeft) - GET_INT16_VAL(pRight); + if (ret == 0) { return 0; } else { return ret > 0 ? 1 : -1; } } -int32_t compareStrVal(const void *pLeft, const void *pRight) { - tSkipListKey *pL = (tSkipListKey *)pLeft; - tSkipListKey *pR = (tSkipListKey *)pRight; - - if (pL->nLen == 0 && pR->nLen == 0) { - return 0; - } - - //handle only one-side bound compare situation, there is only lower bound or only upper bound - if (pL->nLen == -1) { - return 1; // no lower bound, lower bound is minimum, always return -1; - } else if (pR->nLen == -1) { - return -1; // no upper bound, upper bound is maximum situation, always return 1; - } - - int32_t ret = strcmp(((tSkipListKey *)pLeft)->pz, ((tSkipListKey *)pRight)->pz); - +int32_t compareInt8Val(const void *pLeft, const void *pRight) { + int32_t ret = GET_INT8_VAL(pLeft) - GET_INT8_VAL(pRight); if (ret == 0) { return 0; } else { @@ -129,716 +111,692 @@ int32_t compareStrVal(const void *pLeft, const void *pRight) { } } -int32_t compareWStrVal(const void *pLeft, const void *pRight) { - tSkipListKey *pL = (tSkipListKey *)pLeft; - tSkipListKey *pR = (tSkipListKey *)pRight; - - if (pL->nLen == 0 && pR->nLen == 0) { - return 0; - } - - //handle only one-side bound compare situation, there is only lower bound or only upper bound - if (pL->nLen == -1) { - return 1; // no lower bound, lower bound is minimum, always return -1; - } else if (pR->nLen == -1) { - return -1; // no upper bound, upper bound is maximum situation, always return 1; - } +int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) { + // int64_t lhs = ((SSkipListKey *)pLeft)->i64Key; + // double rhs = ((SSkipListKey *)pRight)->dKey; + // if (fabs(lhs - rhs) < FLT_EPSILON) { + // return 0; + // } else { + // return (lhs > rhs) ? 1 : -1; + // } + return 0; +} - int32_t ret = wcscmp(((tSkipListKey *)pLeft)->wpz, ((tSkipListKey *)pRight)->wpz); +int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) { + // double lhs = ((SSkipListKey *)pLeft)->dKey; + // int64_t rhs = ((SSkipListKey *)pRight)->i64Key; + // if (fabs(lhs - rhs) < FLT_EPSILON) { + // return 0; + // } else { + // return (lhs > rhs) ? 1 : -1; + // } + return 0; +} - if (ret == 0) { +int32_t compareDoubleVal(const void *pLeft, const void *pRight) { + double ret = GET_DOUBLE_VAL(pLeft) - GET_DOUBLE_VAL(pRight); + if (fabs(ret) < FLT_EPSILON) { return 0; } else { return ret > 0 ? 1 : -1; } } -static __compar_fn_t getKeyFilterComparator(tSkipList *pSkipList, int32_t filterDataType) { - __compar_fn_t comparator = NULL; +int32_t compareStrVal(const void *pLeft, const void *pRight) { + // SSkipListKey *pL = (SSkipListKey *)pLeft; + // SSkipListKey *pR = (SSkipListKey *)pRight; + // + // if (pL->nLen == 0 && pR->nLen == 0) { + // return 0; + // } + // + // // handle only one-side bound compare situation, there is only lower bound or only upper bound + // if (pL->nLen == -1) { + // return 1; // no lower bound, lower bound is minimum, always return -1; + // } else if (pR->nLen == -1) { + // return -1; // no upper bound, upper bound is maximum situation, always return 1; + // } + // + // int32_t ret = strcmp(((SSkipListKey *)pLeft)->pz, ((SSkipListKey *)pRight)->pz); + // + // if (ret == 0) { + // return 0; + // } else { + // return ret > 0 ? 1 : -1; + // } + return 0; +} - switch (pSkipList->keyType) { +int32_t compareWStrVal(const void *pLeft, const void *pRight) { + // SSkipListKey *pL = (SSkipListKey *)pLeft; + // SSkipListKey *pR = (SSkipListKey *)pRight; + // + // if (pL->nLen == 0 && pR->nLen == 0) { + // return 0; + // } + // + // // handle only one-side bound compare situation, there is only lower bound or only upper bound + // if (pL->nLen == -1) { + // return 1; // no lower bound, lower bound is minimum, always return -1; + // } else if (pR->nLen == -1) { + // return -1; // no upper bound, upper bound is maximum situation, always return 1; + // } + // + // int32_t ret = wcscmp(((SSkipListKey *)pLeft)->wpz, ((SSkipListKey *)pRight)->wpz); + // + // if (ret == 0) { + // return 0; + // } else { + // return ret > 0 ? 1 : -1; + // } + return 0; +} + +static __compar_fn_t getKeyFilterComparator(SSkipList *pSkipList, int32_t filterDataType) { + __compar_fn_t comparFn = NULL; + + switch (pSkipList->keyInfo.type) { case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_BIGINT: { + if (filterDataType == TSDB_DATA_TYPE_BIGINT) { + comparFn = compareInt64Val; + break; + } + } case TSDB_DATA_TYPE_BOOL: { if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { - comparator = compareIntVal; + comparFn = compareInt32Val; } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { - comparator = compareIntDoubleVal; + comparFn = compareIntDoubleVal; } break; } case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_DOUBLE: { - if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { - comparator = compareDoubleIntVal; - } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { - comparator = compareDoubleVal; +// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { +// comparFn = compareDoubleIntVal; +// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { +// comparFn = compareDoubleVal; +// } + if (filterDataType == TSDB_DATA_TYPE_DOUBLE) { + comparFn = compareDoubleVal; } break; } case TSDB_DATA_TYPE_BINARY: - comparator = compareStrVal; + comparFn = compareStrVal; break; case TSDB_DATA_TYPE_NCHAR: - comparator = compareWStrVal; + comparFn = compareWStrVal; break; default: - comparator = compareIntVal; + comparFn = compareInt32Val; break; } - return comparator; + return comparFn; } static __compar_fn_t getKeyComparator(int32_t keyType) { - __compar_fn_t comparator = NULL; + __compar_fn_t comparFn = NULL; switch (keyType) { case TSDB_DATA_TYPE_TINYINT: + comparFn = compareInt8Val; + break; case TSDB_DATA_TYPE_SMALLINT: + comparFn = compareInt16Val; + break; case TSDB_DATA_TYPE_INT: + comparFn = compareInt32Val; + break; case TSDB_DATA_TYPE_BIGINT: + comparFn = compareInt64Val; + break; case TSDB_DATA_TYPE_BOOL: - comparator = compareIntVal; + comparFn = compareInt32Val; break; case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_DOUBLE: - comparator = compareDoubleVal; + comparFn = compareDoubleVal; break; case TSDB_DATA_TYPE_BINARY: - comparator = compareStrVal; + comparFn = compareStrVal; break; case TSDB_DATA_TYPE_NCHAR: - comparator = compareWStrVal; + comparFn = compareWStrVal; break; default: - comparator = compareIntVal; + comparFn = compareInt32Val; break; } - return comparator; + return comparFn; } -tSkipList* tSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen) { - tSkipList *pSkipList = (tSkipList *)calloc(1, sizeof(tSkipList)); +SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock, + __sl_key_fn_t fn) { + SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); if (pSkipList == NULL) { return NULL; } - pSkipList->keyType = keyType; - - pSkipList->comparator = getKeyComparator(keyType); - pSkipList->pHead.pForward = (tSkipListNode **)calloc(1, POINTER_BYTES * MAX_SKIP_LIST_LEVEL); - - pSkipList->nMaxLevel = MAX_SKIP_LIST_LEVEL; - pSkipList->nLevel = 1; + if (maxLevel > MAX_SKIP_LIST_LEVEL) { + maxLevel = MAX_SKIP_LIST_LEVEL; + } - pSkipList->nMaxKeyLen = nMaxKeyLen; - pSkipList->nMaxLevel = nMaxLevel; + pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey}; + pSkipList->keyFn = fn; - if (pthread_rwlock_init(&pSkipList->lock, NULL) != 0) { - tfree(pSkipList->pHead.pForward); - tfree(pSkipList); - return NULL; - } + pSkipList->comparFn = getKeyComparator(keyType); + pSkipList->maxLevel = maxLevel; + pSkipList->level = 1; - srand(time(NULL)); - pSkipList->state.nTotalMemSize += sizeof(tSkipList); - return pSkipList; -} + pSkipList->pHead = (SSkipListNode *)calloc(1, SL_NODE_HEADER_SIZE(maxLevel)); + pSkipList->pHead->level = pSkipList->maxLevel; -static void doRemove(tSkipList *pSkipList, tSkipListNode *pNode, tSkipListNode *forward[]) { - int32_t level = pNode->nLevel; - for (int32_t j = level - 1; j >= 0; --j) { - if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) { - forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j]; - } + if (lock) { + pSkipList->lock = calloc(1, sizeof(pthread_rwlock_t)); - if (forward[j]->pForward[j] != NULL) { - forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j]; + if (pthread_rwlock_init(pSkipList->lock, NULL) != 0) { + tfree(pSkipList->pHead); + tfree(pSkipList); + return NULL; } } - pSkipList->state.nTotalMemSize -= (sizeof(tSkipListNode) + POINTER_BYTES * pNode->nLevel * 2); - removeNodeEachLevel(pSkipList, pNode->nLevel); + srand(time(NULL)); + +#if SKIP_LIST_RECORD_PERFORMANCE + pSkipList->state.nTotalMemSize += sizeof(SSkipList); +#endif - tfree(pNode); - --pSkipList->nSize; + return pSkipList; } -static size_t getOneNodeSize(const tSkipListKey *pKey, int32_t nLevel) { - size_t size = sizeof(tSkipListNode) + sizeof(intptr_t) * (nLevel << 1); - if (pKey->nType == TSDB_DATA_TYPE_BINARY) { - size += pKey->nLen + 1; - } else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) { - size += (pKey->nLen + 1) * TSDB_NCHAR_SIZE; +// static void doRemove(SSkipList *pSkipList, SSkipListNode *pNode, SSkipListNode *forward[]) { +// int32_t level = pNode->level; +// for (int32_t j = level - 1; j >= 0; --j) { +// if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) { +// forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j]; +// } +// +// if (forward[j]->pForward[j] != NULL) { +// forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j]; +// } +// } +// +// pSkipList->state.nTotalMemSize -= (sizeof(SSkipListNode) + POINTER_BYTES * pNode->level * 2); +// removeNodeEachLevel(pSkipList, pNode->level); +// +// tfree(pNode); +// --pSkipList->size; +//} + +void *tSkipListDestroy(SSkipList *pSkipList) { + if (pSkipList == NULL) { + return NULL; } - return size; -} + if (pSkipList->lock) { + pthread_rwlock_wrlock(pSkipList->lock); + } -static tSkipListNode *tSkipListCreateNode(void *pData, const tSkipListKey *pKey, int32_t nLevel) { - size_t nodeSize = getOneNodeSize(pKey, nLevel); - tSkipListNode *pNode = (tSkipListNode *)calloc(1, nodeSize); + SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); // pSkipList->pHead.pForward[0]; - pNode->pForward = (tSkipListNode **)(&pNode[1]); - pNode->pBackward = (pNode->pForward + nLevel); + while (pNode) { + SSkipListNode *pTemp = pNode; + pNode = SL_GET_FORWARD_POINTER(pNode, 0); + tfree(pTemp); + } - pNode->pData = pData; + tfree(pSkipList->pHead); - pNode->key = *pKey; - if (pKey->nType == TSDB_DATA_TYPE_BINARY) { - pNode->key.pz = (char *)(pNode->pBackward + nLevel); + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + pthread_rwlock_destroy(pSkipList->lock); - strcpy(pNode->key.pz, pKey->pz); - pNode->key.pz[pKey->nLen] = 0; - } else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) { - pNode->key.wpz = (wchar_t *)(pNode->pBackward + nLevel); - wcsncpy(pNode->key.wpz, pKey->wpz, pKey->nLen); - pNode->key.wpz[pKey->nLen] = 0; + tfree(pSkipList->lock); } - pNode->nLevel = nLevel; - return pNode; -} - -tSkipListKey tSkipListCreateKey(int32_t type, char *val, size_t keyLength) { - tSkipListKey k = {0}; - tVariantCreateFromBinary(&k, val, (uint32_t) keyLength, (uint32_t) type); - return k; + tfree(pSkipList->pHead); + tfree(pSkipList); + return NULL; } -void tSkipListDestroyKey(tSkipListKey *pKey) { tVariantDestroy(pKey); } - -void* tSkipListDestroy(tSkipList *pSkipList) { +void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) { if (pSkipList == NULL) { - return NULL; - } - - pthread_rwlock_wrlock(&pSkipList->lock); - tSkipListNode *pNode = pSkipList->pHead.pForward[0]; - while (pNode) { - tSkipListNode *pTemp = pNode; - pNode = pNode->pForward[0]; - tfree(pTemp); + return; } - tfree(pSkipList->pHead.pForward); - pthread_rwlock_unlock(&pSkipList->lock); - - pthread_rwlock_destroy(&pSkipList->lock); - tfree(pSkipList); - - return NULL; + *level = getSkipListRandLevel(pSkipList); + *headSize = SL_NODE_HEADER_SIZE(*level); } -tSkipListNode *tSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey) { +SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { if (pSkipList == NULL) { return NULL; } - pthread_rwlock_wrlock(&pSkipList->lock); + if (pSkipList->lock) { + pthread_rwlock_wrlock(pSkipList->lock); + } // record one node is put into skiplist tSkipListDoRecordPut(pSkipList); - tSkipListNode *px = &pSkipList->pHead; + SSkipListNode *px = pSkipList->pHead; + SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) { - while (px->pForward[i] != NULL && (pSkipList->comparator(&px->pForward[i]->key, pKey) < 0)) { - px = px->pForward[i]; + for (int32_t i = pSkipList->level - 1; i >= 0; --i) { + SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i); + while (p != NULL) { + char *key = SL_GET_NODE_KEY(pSkipList, p); + char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode); + + // if the forward element is less than the specified key, forward one step + if (pSkipList->comparFn(key, newDatakey) < 0) { + px = p; + + p = SL_GET_FORWARD_POINTER(px, i); + } else { + break; + } } +#if SKIP_LIST_RECORD_PERFORMANCE pSkipList->state.nTotalStepsForInsert++; +#endif forward[i] = px; } - // if the skiplist does not allowed identical key inserted, the new data will be discarded. - if ((insertIdenticalKey == 0) && forward[0] != &pSkipList->pHead && - (pSkipList->comparator(&forward[0]->key, pKey) == 0)) { - pthread_rwlock_unlock(&pSkipList->lock); - return forward[0]; + // if the skip list does not allowed identical key inserted, the new data will be discarded. + if (pSkipList->keyInfo.dupKey == 0 && forward[0] != pSkipList->pHead) { + char *key = SL_GET_NODE_KEY(pSkipList, forward[0]); + char *pNewDataKey = SL_GET_NODE_KEY(pSkipList, pNode); + + if (pSkipList->comparFn(key, pNewDataKey) == 0) { + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + + return forward[0]; + } } - int32_t nLevel = getSkipListNodeLevel(pSkipList); - recordNodeEachLevel(pSkipList, nLevel); +#if SKIP_LIST_RECORD_PERFORMANCE + recordNodeEachLevel(pSkipList, level); +#endif - tSkipListNode *pNode = tSkipListCreateNode(pData, pKey, nLevel); - tSkipListDoInsert(pSkipList, forward, nLevel, pNode); + int32_t level = SL_GET_NODE_LEVEL(pNode); + tSkipListDoInsert(pSkipList, forward, level, pNode); - pSkipList->nSize += 1; + atomic_add_fetch_32(&pSkipList->size, 1); - // char tmpstr[512] = {0}; - // tVariantToString(&pNode->key, tmpstr); - // pTrace("skiplist:%p, node added, key:%s, total list len:%d", pSkipList, - // tmpstr, pSkipList->nSize); +#if SKIP_LIST_RECORD_PERFORMANCE + pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, level); +#endif - pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, nLevel); - pthread_rwlock_unlock(&pSkipList->lock); + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } return pNode; } -void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode) { - for (int32_t i = 0; i < nLevel; ++i) { - tSkipListNode *x = forward[i]; +void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t level, SSkipListNode *pNode) { + for (int32_t i = 0; i < level; ++i) { + SSkipListNode *x = forward[i]; if (x != NULL) { - pNode->pBackward[i] = x; - if (x->pForward[i]) x->pForward[i]->pBackward[i] = pNode; + SL_GET_BACKWARD_POINTER(pNode, i) = x; - pNode->pForward[i] = x->pForward[i]; - x->pForward[i] = pNode; + SSkipListNode *pForward = SL_GET_FORWARD_POINTER(x, i); + if (pForward) { + SL_GET_BACKWARD_POINTER(pForward, i) = pNode; + } + + SL_GET_FORWARD_POINTER(pNode, i) = SL_GET_FORWARD_POINTER(x, i); + SL_GET_FORWARD_POINTER(x, i) = pNode; } else { - pSkipList->pHead.pForward[i] = pNode; - pNode->pBackward[i] = &(pSkipList->pHead); + SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode; + SL_GET_BACKWARD_POINTER(pSkipList->pHead, i) = (pSkipList->pHead); } } } -tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey) { - int32_t sLevel = pSkipList->nLevel - 1; +SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) { + int32_t sLevel = pSkipList->level - 1; int32_t ret = -1; - tSkipListNode *x = &pSkipList->pHead; + // result list + SArray* sa = taosArrayInit(1, POINTER_BYTES); + + SSkipListNode *pNode = pSkipList->pHead; + + if (pSkipList->lock) { + pthread_rwlock_rdlock(pSkipList->lock); + } - pthread_rwlock_rdlock(&pSkipList->lock); +#if SKIP_LIST_RECORD_PERFORMANCE pSkipList->state.queryCount++; +#endif - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, keyType); for (int32_t i = sLevel; i >= 0; --i) { - while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) { - x = x->pForward[i]; - } - - if (ret == 0) { - pthread_rwlock_unlock(&pSkipList->lock); - return x->pForward[i]; - } - } - - pthread_rwlock_unlock(&pSkipList->lock); - return NULL; -} - -static int32_t tSkipListEndParQuery(tSkipList *pSkipList, tSkipListNode *pStartNode, tSkipListKey *pEndKey, - int32_t cond, tSkipListNode ***pRes) { - pthread_rwlock_rdlock(&pSkipList->lock); - tSkipListNode *p = pStartNode; - int32_t numOfRes = 0; - - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pEndKey->nType); - while (p != NULL) { - int32_t ret = filterComparator(&p->key, pEndKey); - if (ret > 0) { - break; - } - - if (ret < 0) { - numOfRes++; - p = p->pForward[0]; - } else if (ret == 0) { - if (cond == TSDB_RELATION_LESS_EQUAL) { - numOfRes++; - p = p->pForward[0]; + SSkipListNode *pNext = SL_GET_FORWARD_POINTER(pNode, i); + while (pNext != NULL) { + char *key = SL_GET_NODE_KEY(pSkipList, pNext); + if ((ret = filterComparFn(key, pKey)) < 0) { + pNode = pNext; + pNext = SL_GET_FORWARD_POINTER(pNext, i); } else { break; } } - } - - (*pRes) = (tSkipListNode **)malloc(POINTER_BYTES * numOfRes); - for (int32_t i = 0; i < numOfRes; ++i) { - (*pRes)[i] = pStartNode; - pStartNode = pStartNode->pForward[0]; - } - pthread_rwlock_unlock(&pSkipList->lock); - - return numOfRes; -} - -/* - * maybe return the copy of tSkipListNode would be better - */ -int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes) { - (*pRes) = NULL; - tSkipListNode *pNode = tSkipListGetOne(pSkipList, pKey); - if (pNode == NULL) { - return 0; - } - - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - // backward check if previous nodes are with the same value. - tSkipListNode *pPrev = pNode->pBackward[0]; - while ((pPrev != &pSkipList->pHead) && filterComparator(&pPrev->key, pKey) == 0) { - pPrev = pPrev->pBackward[0]; - } - - return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes); -} - -static tSkipListNode *tSkipListParQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t cond) { - int32_t sLevel = pSkipList->nLevel - 1; - int32_t ret = -1; - - tSkipListNode *x = &pSkipList->pHead; - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - pthread_rwlock_rdlock(&pSkipList->lock); - - if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) { - for (int32_t i = sLevel; i >= 0; --i) { - while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) { - x = x->pForward[i]; - } - } - - // backward check if previous nodes are with the same value. - if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) { - tSkipListNode *pNode = x->pForward[0]; - while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparator(&pNode->pBackward[0]->key, pKey) == 0)) { - pNode = pNode->pBackward[0]; + // find the qualified key + if (ret == 0) { + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); } - pthread_rwlock_unlock(&pSkipList->lock); - return pNode; - } - if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) { - pthread_rwlock_unlock(&pSkipList->lock); - return x->pForward[0]; - } else { // cond == TSDB_RELATION_LARGE && ret == 0 - tSkipListNode *pn = x->pForward[0]; - while (pn != NULL && filterComparator(&pn->key, pKey) == 0) { - pn = pn->pForward[0]; + SSkipListNode* pResult = SL_GET_FORWARD_POINTER(pNode, i); + taosArrayPush(sa, &pResult); + + // skip list does not allowed duplicated key, abort further retrieve data + if (!pSkipList->keyInfo.dupKey) { + break; } - pthread_rwlock_unlock(&pSkipList->lock); - return pn; - } - } - - pthread_rwlock_unlock(&pSkipList->lock); - return NULL; -} - -int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *), - void *param) { - (*pRes) = (tSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize); - if (NULL == *pRes) { - pError("error skiplist %p, malloc failed", pSkipList); - return -1; - } - - pthread_rwlock_rdlock(&pSkipList->lock); - tSkipListNode *pStartNode = pSkipList->pHead.pForward[0]; - int32_t num = 0; - - for (int32_t i = 0; i < pSkipList->nSize; ++i) { - if (pStartNode == NULL) { - pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1); -#ifdef _DEBUG_VIEW - tSkipListPrint(pSkipList, 1); -#endif - break; } - - if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) { - (*pRes)[num++] = pStartNode; - } - - pStartNode = pStartNode->pForward[0]; - } - - pthread_rwlock_unlock(&pSkipList->lock); - - if (num == 0) { - free(*pRes); - *pRes = NULL; - } else if (num < pSkipList->nSize) { // free unused memory - char* tmp = realloc((*pRes), num * POINTER_BYTES); - assert(tmp != NULL); - - *pRes = (tSkipListNode**)tmp; } - return num; -} - -int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator* iter) { - if (pSkipList == NULL) { - return -1; - } - - iter->pSkipList = pSkipList; - - pthread_rwlock_rdlock(&pSkipList->lock); - iter->cur = NULL;//pSkipList->pHead.pForward[0]; - iter->num = pSkipList->nSize; - pthread_rwlock_unlock(&pSkipList->lock); - - return 0; -} - -bool tSkipListIteratorNext(SSkipListIterator* iter) { - if (iter->num == 0 || iter->pSkipList == NULL) { - return false; - } - - tSkipList* pSkipList = iter->pSkipList; - - pthread_rwlock_rdlock(&pSkipList->lock); - if (iter->cur == NULL) { - iter->cur = pSkipList->pHead.pForward[0]; - } else { - iter->cur = iter->cur->pForward[0]; - } - - pthread_rwlock_unlock(&pSkipList->lock); - - return iter->cur != NULL; -} - -tSkipListNode* tSkipListIteratorGet(SSkipListIterator* iter) { - return iter->cur; -} - -int32_t tSkipListRangeQuery(tSkipList *pSkipList, tSKipListQueryCond *pCond, tSkipListNode ***pRes) { - pSkipList->state.queryCount++; - tSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr); - if (pStart == 0) { - *pRes = NULL; - return 0; - } - - return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes); -} - -static bool removeSupport(tSkipList *pSkipList, tSkipListNode **forward, tSkipListKey *pKey) { - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - if (filterComparator(&forward[0]->pForward[0]->key, pKey) == 0) { - tSkipListNode *p = forward[0]->pForward[0]; - doRemove(pSkipList, p, forward); - } else { // failed to find the node of specified value,abort - return false; - } - - // compress the minimum level of skip list - while (pSkipList->nLevel > 0 && pSkipList->pHead.pForward[pSkipList->nLevel - 1] == NULL) { - pSkipList->nLevel -= 1; - } - - return true; -} - -void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode) { - tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - - pthread_rwlock_rdlock(&pSkipList->lock); - for (int32_t i = 0; i < pNode->nLevel; ++i) { - forward[i] = pNode->pBackward[i]; - } - - removeSupport(pSkipList, forward, &pNode->key); - pthread_rwlock_unlock(&pSkipList->lock); -} - -bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey) { - tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - pthread_rwlock_rdlock(&pSkipList->lock); - - tSkipListNode *x = &pSkipList->pHead; - for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) { - while (x->pForward[i] != NULL && (filterComparator(&x->pForward[i]->key, pKey) < 0)) { - x = x->pForward[i]; - } - forward[i] = x; - } - - bool ret = removeSupport(pSkipList, forward, pKey); - pthread_rwlock_unlock(&pSkipList->lock); - - return ret; -} - -void tSkipListPrint(tSkipList *pSkipList, int16_t nlevel) { - if (pSkipList == NULL || pSkipList->nLevel < nlevel || nlevel <= 0) { + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + + return sa; +} + +// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey, +// int32_t cond, SSkipListNode ***pRes) { +// pthread_rwlock_rdlock(&pSkipList->lock); +// SSkipListNode *p = pStartNode; +// int32_t numOfRes = 0; +// +// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pEndKey->nType); +// while (p != NULL) { +// int32_t ret = filterComparFn(&p->key, pEndKey); +// if (ret > 0) { +// break; +// } +// +// if (ret < 0) { +// numOfRes++; +// p = p->pForward[0]; +// } else if (ret == 0) { +// if (cond == TSDB_RELATION_LESS_EQUAL) { +// numOfRes++; +// p = p->pForward[0]; +// } else { +// break; +// } +// } +// } +// +// (*pRes) = (SSkipListNode **)malloc(POINTER_BYTES * numOfRes); +// for (int32_t i = 0; i < numOfRes; ++i) { +// (*pRes)[i] = pStartNode; +// pStartNode = pStartNode->pForward[0]; +// } +// pthread_rwlock_unlock(&pSkipList->lock); +// +// return numOfRes; +//} +// +///* +// * maybe return the copy of SSkipListNode would be better +// */ +// int32_t tSkipListGets(SSkipList *pSkipList, SSkipListKey *pKey, SSkipListNode ***pRes) { +// (*pRes) = NULL; +// +// SSkipListNode *pNode = tSkipListGet(pSkipList, pKey); +// if (pNode == NULL) { +// return 0; +// } +// +// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType); +// +// // backward check if previous nodes are with the same value. +// SSkipListNode *pPrev = pNode->pBackward[0]; +// while ((pPrev != &pSkipList->pHead) && filterComparFn(&pPrev->key, pKey) == 0) { +// pPrev = pPrev->pBackward[0]; +// } +// +// return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes); +//} +// +// static SSkipListNode *tSkipListParQuery(SSkipList *pSkipList, SSkipListKey *pKey, int32_t cond) { +// int32_t sLevel = pSkipList->level - 1; +// int32_t ret = -1; +// +// SSkipListNode *x = &pSkipList->pHead; +// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType); +// +// pthread_rwlock_rdlock(&pSkipList->lock); +// +// if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) { +// for (int32_t i = sLevel; i >= 0; --i) { +// while (x->pForward[i] != NULL && (ret = filterComparFn(&x->pForward[i]->key, pKey)) < 0) { +// x = x->pForward[i]; +// } +// } +// +// // backward check if previous nodes are with the same value. +// if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) { +// SSkipListNode *pNode = x->pForward[0]; +// while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparFn(&pNode->pBackward[0]->key, pKey) == 0)) { +// pNode = pNode->pBackward[0]; +// } +// pthread_rwlock_unlock(&pSkipList->lock); +// return pNode; +// } +// +// if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) { +// pthread_rwlock_unlock(&pSkipList->lock); +// return x->pForward[0]; +// } else { // cond == TSDB_RELATION_LARGE && ret == 0 +// SSkipListNode *pn = x->pForward[0]; +// while (pn != NULL && filterComparFn(&pn->key, pKey) == 0) { +// pn = pn->pForward[0]; +// } +// pthread_rwlock_unlock(&pSkipList->lock); +// return pn; +// } +// } +// +// pthread_rwlock_unlock(&pSkipList->lock); +// return NULL; +//} +// +// int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *), +// void *param) { +// (*pRes) = (SSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize); +// if (NULL == *pRes) { +// pError("error skiplist %p, malloc failed", pSkipList); +// return -1; +// } +// +// pthread_rwlock_rdlock(&pSkipList->lock); +// SSkipListNode *pStartNode = pSkipList->pHead.pForward[0]; +// int32_t num = 0; +// +// for (int32_t i = 0; i < pSkipList->nSize; ++i) { +// if (pStartNode == NULL) { +// pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1); +//#ifdef _DEBUG_VIEW +// tSkipListPrint(pSkipList, 1); +//#endif +// break; +// } +// +// if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) { +// (*pRes)[num++] = pStartNode; +// } +// +// pStartNode = pStartNode->pForward[0]; +// } +// +// pthread_rwlock_unlock(&pSkipList->lock); +// +// if (num == 0) { +// free(*pRes); +// *pRes = NULL; +// } else if (num < pSkipList->nSize) { // free unused memory +// char *tmp = realloc((*pRes), num * POINTER_BYTES); +// assert(tmp != NULL); +// +// *pRes = (SSkipListNode **)tmp; +// } +// +// return num; +//} +// +// int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter) { +// if (pSkipList == NULL) { +// return -1; +// } +// +// iter->pSkipList = pSkipList; +// if (pSkipList->lock) { +// pthread_rwlock_rdlock(&pSkipList->lock); +// } +// iter->cur = NULL; // pSkipList->pHead.pForward[0]; +// iter->num = pSkipList->size; +// +// if (pSkipList->lock) { +// pthread_rwlock_unlock(&pSkipList->lock); +// } +// +// return 0; +//} +// +// bool tSkipListIteratorNext(SSkipListIterator *iter) { +// if (iter->num == 0 || iter->pSkipList == NULL) { +// return false; +// } +// +// SSkipList *pSkipList = iter->pSkipList; +// +// pthread_rwlock_rdlock(&pSkipList->lock); +// if (iter->cur == NULL) { +// iter->cur = pSkipList->pHead.pForward[0]; +// } else { +// iter->cur = iter->cur->pForward[0]; +// } +// +// pthread_rwlock_unlock(&pSkipList->lock); +// +// return iter->cur != NULL; +//} +// +// SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter) { return iter->cur; } +// +// int32_t tSkipListRangeQuery(SSkipList *pSkipList, tSKipListQueryCond *pCond, SSkipListNode ***pRes) { +// pSkipList->state.queryCount++; +// SSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr); +// if (pStart == 0) { +// *pRes = NULL; +// return 0; +// } +// +// return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes); +//} +// +// static bool removeSupport(SSkipList *pSkipList, SSkipListNode **forward, SSkipListKey *pKey) { +// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType); +// +// if (filterComparFn(&forward[0]->pForward[0]->key, pKey) == 0) { +// SSkipListNode *p = forward[0]->pForward[0]; +// doRemove(pSkipList, p, forward); +// } else { // failed to find the node of specified value,abort +// return false; +// } +// +// // compress the minimum level of skip list +// while (pSkipList->level > 0 && pSkipList->pHead.pForward[pSkipList->level - 1] == NULL) { +// pSkipList->level -= 1; +// } +// +// return true; +//} +// +// void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { +// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; +// +// pthread_rwlock_rdlock(&pSkipList->lock); +// for (int32_t i = 0; i < pNode->level; ++i) { +// forward[i] = pNode->pBackward[i]; +// } +// +// removeSupport(pSkipList, forward, &pNode->key); +// pthread_rwlock_unlock(&pSkipList->lock); +//} +// +// bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey) { +// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; +// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType); +// +// pthread_rwlock_rdlock(&pSkipList->lock); +// +// SSkipListNode *x = &pSkipList->pHead; +// for (int32_t i = pSkipList->level - 1; i >= 0; --i) { +// while (x->pForward[i] != NULL && (filterComparFn(&x->pForward[i]->key, pKey) < 0)) { +// x = x->pForward[i]; +// } +// forward[i] = x; +// } +// +// bool ret = removeSupport(pSkipList, forward, pKey); +// pthread_rwlock_unlock(&pSkipList->lock); +// +// return ret; +//} + +void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { + if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) { return; } - tSkipListNode *p = pSkipList->pHead.pForward[nlevel - 1]; + SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); int32_t id = 1; + while (p) { - switch (pSkipList->keyType) { + char *key = SL_GET_NODE_KEY(pSkipList, p); + switch (pSkipList->keyInfo.type) { case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_BIGINT: - fprintf(stdout, "%d: %" PRId64 " \n", id++, p->key.i64Key); + fprintf(stdout, "%d: %" PRId64 " \n", id++, *(int64_t *)key); break; case TSDB_DATA_TYPE_BINARY: - fprintf(stdout, "%d: %s \n", id++, p->key.pz); + fprintf(stdout, "%d: %s \n", id++, key); break; case TSDB_DATA_TYPE_DOUBLE: - fprintf(stdout, "%d: %lf \n", id++, p->key.dKey); + fprintf(stdout, "%d: %lf \n", id++, *(double *)key); break; default: fprintf(stdout, "\n"); } - p = p->pForward[nlevel - 1]; - } -} - -/* - * query processor based on query condition - */ -int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult) { - // query condition check - int32_t rel = 0; - __compar_fn_t comparator = getKeyComparator(pQueryCond->lowerBnd.nType); - - if (pSkipList == NULL || pQueryCond == NULL || pSkipList->nSize == 0 || - (((rel = comparator(&pQueryCond->lowerBnd, &pQueryCond->upperBnd)) > 0 && - pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_NCHAR && pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_BINARY))) { - (*pResult) = NULL; - return 0; - } - - if (rel == 0) { - /* - * 0 means: pQueryCond->lowerBnd == pQueryCond->upperBnd - * point query - */ - if (pQueryCond->lowerBndRelOptr == TSDB_RELATION_LARGE_EQUAL && - pQueryCond->upperBndRelOptr == TSDB_RELATION_LESS_EQUAL) { // point query - return tSkipListGets(pSkipList, &pQueryCond->lowerBnd, pResult); - } else { - (*pResult) = NULL; - return 0; - } - } else { - /* range query, query operation code check */ - return tSkipListRangeQuery(pSkipList, pQueryCond, pResult); - } -} - -typedef struct MultipleQueryResult { - int32_t len; - tSkipListNode **pData; -} MultipleQueryResult; - -static int32_t mergeQueryResult(MultipleQueryResult *pResults, int32_t numOfResSet, tSkipListNode ***pRes) { - int32_t total = 0; - for (int32_t i = 0; i < numOfResSet; ++i) { - total += pResults[i].len; - } - - (*pRes) = malloc(POINTER_BYTES * total); - int32_t idx = 0; - - for (int32_t i = 0; i < numOfResSet; ++i) { - MultipleQueryResult *pOneResult = &pResults[i]; - for (int32_t j = 0; j < pOneResult->len; ++j) { - (*pRes)[idx++] = pOneResult->pData[j]; - } - } - - return total; -} - -static void removeDuplicateKey(tSkipListKey *pKey, int32_t *numOfKey, __compar_fn_t comparator) { - if (*numOfKey == 1) { - return; - } - - qsort(pKey, *numOfKey, sizeof(pKey[0]), comparator); - int32_t i = 0, j = 1; - - while (i < (*numOfKey) && j < (*numOfKey)) { - int32_t ret = comparator(&pKey[i], &pKey[j]); - if (ret == 0) { - j++; - } else { - pKey[i + 1] = pKey[j]; - i++; - j++; - } - } - - (*numOfKey) = i + 1; -} - -int32_t mergeResult(const tSkipListKey *pKey, int32_t numOfKey, tSkipListNode ***pRes, __compar_fn_t comparator, - tSkipListNode *pNode) { - int32_t i = 0, j = 0; - // merge two sorted arrays in O(n) time - while (i < numOfKey && pNode != NULL) { - int32_t ret = comparator(&pNode->key, &pKey[i]); - if (ret < 0) { - (*pRes)[j++] = pNode; - pNode = pNode->pForward[0]; - } else if (ret == 0) { - pNode = pNode->pForward[0]; - } else { // pNode->key > pkey[i] - i++; - } - } - while (pNode != NULL) { - (*pRes)[j++] = pNode; - pNode = pNode->pForward[0]; - } - return j; -} - -int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type, - tSkipListNode ***pRes) { - if (numOfKey == 0 || pKey == NULL || pSkipList == NULL || pSkipList->nSize == 0 || - (type != INCLUDE_POINT_QUERY && type != EXCLUDE_POINT_QUERY)) { - (*pRes) = NULL; - return 0; - } - - __compar_fn_t comparator = getKeyComparator(pKey->nType); - removeDuplicateKey(pKey, &numOfKey, comparator); - - if (type == INCLUDE_POINT_QUERY) { - if (numOfKey == 1) { - return tSkipListGets(pSkipList, &pKey[0], pRes); - } else { - MultipleQueryResult *pTempResult = (MultipleQueryResult *)malloc(sizeof(MultipleQueryResult) * numOfKey); - for (int32_t i = 0; i < numOfKey; ++i) { - pTempResult[i].len = tSkipListGets(pSkipList, &pKey[i], &pTempResult[i].pData); - } - int32_t num = mergeQueryResult(pTempResult, numOfKey, pRes); - - for (int32_t i = 0; i < numOfKey; ++i) { - free(pTempResult[i].pData); - } - free(pTempResult); - return num; - } - } else { // exclude query - *pRes = malloc(POINTER_BYTES * pSkipList->nSize); - - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - tSkipListNode *pNode = pSkipList->pHead.pForward[0]; - int32_t retLen = mergeResult(pKey, numOfKey, pRes, filterComparator, pNode); - - if (retLen < pSkipList->nSize) { - (*pRes) = realloc(*pRes, POINTER_BYTES * retLen); - } - return retLen; + p = SL_GET_FORWARD_POINTER(p, nlevel - 1); + // p = p->pForward[nlevel - 1]; } }