From 9f1996f46d5031b2192c287865e1ed72e0542890 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 7 Mar 2020 09:18:14 +0000 Subject: [PATCH] make code compile --- src/client/src/tscAst.c | 2 +- src/util/inc/sskiplist.h | 205 +++++++++ src/util/src/sskiplist.c | 844 ++++++++++++++++++++++++++++++++++++++ src/util/src/thistogram.c | 10 +- 4 files changed, 1055 insertions(+), 6 deletions(-) create mode 100644 src/util/inc/sskiplist.h create mode 100644 src/util/src/sskiplist.c diff --git a/src/client/src/tscAst.c b/src/client/src/tscAst.c index 22100bc1d1..17e7667445 100644 --- a/src/client/src/tscAst.c +++ b/src/client/src/tscAst.c @@ -21,7 +21,7 @@ #include "tscSyntaxtreefunction.h" #include "tschemautil.h" #include "taosdef.h" -#include "tskiplist.h" +#include "sskiplist.h" #include "tsqldef.h" #include "tsqlfunction.h" #include "tstoken.h" diff --git a/src/util/inc/sskiplist.h b/src/util/inc/sskiplist.h new file mode 100644 index 0000000000..28300a69b4 --- /dev/null +++ b/src/util/inc/sskiplist.h @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TBASE_TSKIPLIST_H +#define TBASE_TSKIPLIST_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define MAX_SKIP_LIST_LEVEL 20 + +#include +#include +#include + +#include "os.h" +#include "ttypes.h" + +/* + * key of each node + * todo move to as the global structure in all search codes... + */ + +const static size_t SKIP_LIST_STR_KEY_LENGTH_THRESHOLD = 15; +typedef tVariant tSkipListKey; + +typedef enum tSkipListPointQueryType { + INCLUDE_POINT_QUERY, + EXCLUDE_POINT_QUERY, +} tSkipListPointQueryType; + +typedef struct tSkipListNode { + uint16_t nLevel; + char * pData; + + struct tSkipListNode **pForward; + struct tSkipListNode **pBackward; + + tSkipListKey key; +} tSkipListNode; + +/* + * @version 0.2 + * @date 2017/11/12 + * the simple version of SkipList. + * for multi-thread safe purpose, we employ pthread_rwlock_t to guarantee to generate + * deterministic result. Later, we will remove the lock in SkipList to further + * enhance the performance. In this case, one should use the concurrent skip list (by + * using michael-scott algorithm) instead of this simple version in a multi-thread + * environment, to achieve higher performance of read/write operations. + * + * Note: Duplicated primary key situation. + * In case of duplicated primary key, two ways can be employed to handle this situation: + * 1. add as normal insertion with out special process. + * 2. add an overflow pointer at each list node, all nodes with the same key will be added + * in the overflow pointer. In this case, the total steps of each search will be reduced significantly. + * Currently, we implement the skip list in a line with the first means, maybe refactor it soon. + * + * Memory consumption: the memory alignment causes many memory wasted. So, employ a memory + * pool will significantly reduce the total memory consumption, as well as the calloc/malloc operation costs. + * + * 3. use the iterator pattern to refactor all routines to make it more clean + */ + +// state struct, record following information: +// number of links in each level. +// avg search steps, for latest 1000 queries +// avg search rsp time, for latest 1000 queries +// total memory size +typedef struct tSkipListState { + // in bytes, sizeof(tSkipList)+sizeof(tSkipListNode)*tSkipList->nSize + uint64_t nTotalMemSize; + uint64_t nLevelNodeCnt[MAX_SKIP_LIST_LEVEL]; + uint64_t queryCount; // total query count + + /* + * only record latest 1000 queries + * when the value==1000, = 0, + * nTotalStepsForQueries = 0, + * nTotalElapsedTimeForQueries = 0 + */ + uint64_t nRecQueries; + uint16_t nTotalStepsForQueries; + uint64_t nTotalElapsedTimeForQueries; + + uint16_t nInsertObjs; + uint16_t nTotalStepsForInsert; + uint64_t nTotalElapsedTimeForInsert; +} tSkipListState; + +typedef struct tSkipList { + tSkipListNode pHead; + uint64_t nSize; + uint16_t nMaxLevel; + uint16_t nLevel; + uint16_t keyType; + uint16_t nMaxKeyLen; + + __compar_fn_t comparator; + pthread_rwlock_t lock; // will be removed soon + tSkipListState state; // skiplist state +} tSkipList; + +/* + * iterate the skiplist + * this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may + * continue iterating the skiplist, so add the reference count for skiplist + * TODO add the ref for skiplist when one iterator is created + */ +typedef struct SSkipListIterator { + tSkipList * pSkipList; + tSkipListNode *cur; + int64_t num; +} SSkipListIterator; + +/* + * query condition structure to denote the range query + * todo merge the point query cond with range query condition + */ +typedef struct tSKipListQueryCond { + // when the upper bounding == lower bounding, it is a point query + tSkipListKey lowerBnd; + tSkipListKey upperBnd; + int32_t lowerBndRelOptr; // relation operator to denote if lower bound is + int32_t upperBndRelOptr; // included or not +} tSKipListQueryCond; + +tSkipList *SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen); + +void *SSkipListDestroy(tSkipList *pSkipList); + +// create skip list key +tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength); + +// destroy skip list key +void tSkipListDestroyKey(tSkipListKey *pKey); + +// put data into skiplist +tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey); + +/* + * get only *one* node of which key is equalled to pKey, even there are more + * than one nodes are of the same key + */ +tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey); + +/* + * get all data with the same keys + */ +int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes); + +int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *), + void *param); + +/* + * remove only one node of the pKey value. + * If more than one node has the same value, any one will be removed + * + * @Return + * true: one node has been removed + * false: no node has been removed + */ +bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey); + +/* + * remove the specified node in parameters + */ +void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode); + +// for debug purpose only +void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel); + +/* + * range query & single point query function + */ +int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult); + +/* + * include/exclude point query + */ +int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type, + tSkipListNode ***pResult); + +int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator *iter); +bool tSkipListIteratorNext(SSkipListIterator *iter); +tSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter); + +#ifdef __cplusplus +} +#endif + +#endif // TBASE_TSKIPLIST_H diff --git a/src/util/src/sskiplist.c b/src/util/src/sskiplist.c new file mode 100644 index 0000000000..47948e1660 --- /dev/null +++ b/src/util/src/sskiplist.c @@ -0,0 +1,844 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "os.h" + +#include "tlog.h" +#include "taosdef.h" +#include "sskiplist.h" +#include "tutil.h" + +static FORCE_INLINE void recordNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { // record link count in each level + for (int32_t i = 0; i < nLevel; ++i) { + pSkipList->state.nLevelNodeCnt[i]++; + } +} + +static FORCE_INLINE void removeNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { + for (int32_t i = 0; i < nLevel; ++i) { + pSkipList->state.nLevelNodeCnt[i]--; + } +} + +static FORCE_INLINE int32_t getSkipListNodeRandomHeight(tSkipList *pSkipList) { + const uint32_t factor = 4; + + int32_t n = 1; + while ((rand() % factor) == 0 && n <= pSkipList->nMaxLevel) { + n++; + } + + return n; +} + +static FORCE_INLINE int32_t getSkipListNodeLevel(tSkipList *pSkipList) { + int32_t nLevel = getSkipListNodeRandomHeight(pSkipList); + if (pSkipList->nSize == 0) { + nLevel = 1; + pSkipList->nLevel = 1; + } else { + if (nLevel > pSkipList->nLevel && pSkipList->nLevel < pSkipList->nMaxLevel) { + nLevel = (++pSkipList->nLevel); + } + } + return nLevel; +} + +void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode); + +void SSkipListDoRecordPut(tSkipList *pSkipList) { + const int32_t MAX_RECORD_NUM = 1000; + + if (pSkipList->state.nInsertObjs == MAX_RECORD_NUM) { + pSkipList->state.nInsertObjs = 1; + pSkipList->state.nTotalStepsForInsert = 0; + pSkipList->state.nTotalElapsedTimeForInsert = 0; + } else { + pSkipList->state.nInsertObjs++; + } +} + +int32_t compareIntVal(const void *pLeft, const void *pRight) { + int64_t lhs = ((tSkipListKey *)pLeft)->i64Key; + int64_t rhs = ((tSkipListKey *)pRight)->i64Key; + + DEFAULT_COMP(lhs, rhs); +} + +int32_t scompareIntDoubleVal(const void *pLeft, const void *pRight) { + int64_t lhs = ((tSkipListKey *)pLeft)->i64Key; + double rhs = ((tSkipListKey *)pRight)->dKey; + if (fabs(lhs - rhs) < FLT_EPSILON) { + return 0; + } else { + return (lhs > rhs) ? 1 : -1; + } +} + +int32_t scompareDoubleIntVal(const void *pLeft, const void *pRight) { + double lhs = ((tSkipListKey *)pLeft)->dKey; + int64_t rhs = ((tSkipListKey *)pRight)->i64Key; + if (fabs(lhs - rhs) < FLT_EPSILON) { + return 0; + } else { + return (lhs > rhs) ? 1 : -1; + } +} + +int32_t scompareDoubleVal(const void *pLeft, const void *pRight) { + double ret = (((tSkipListKey *)pLeft)->dKey - ((tSkipListKey *)pRight)->dKey); + if (fabs(ret) < FLT_EPSILON) { + return 0; + } else { + return ret > 0 ? 1 : -1; + } +} + +int32_t scompareStrVal(const void *pLeft, const void *pRight) { + tSkipListKey *pL = (tSkipListKey *)pLeft; + tSkipListKey *pR = (tSkipListKey *)pRight; + + if (pL->nLen == 0 && pR->nLen == 0) { + return 0; + } + + //handle only one-side bound compare situation, there is only lower bound or only upper bound + if (pL->nLen == -1) { + return 1; // no lower bound, lower bound is minimum, always return -1; + } else if (pR->nLen == -1) { + return -1; // no upper bound, upper bound is maximum situation, always return 1; + } + + int32_t ret = strcmp(((tSkipListKey *)pLeft)->pz, ((tSkipListKey *)pRight)->pz); + + if (ret == 0) { + return 0; + } else { + return ret > 0 ? 1 : -1; + } +} + +int32_t scompareWStrVal(const void *pLeft, const void *pRight) { + tSkipListKey *pL = (tSkipListKey *)pLeft; + tSkipListKey *pR = (tSkipListKey *)pRight; + + if (pL->nLen == 0 && pR->nLen == 0) { + return 0; + } + + //handle only one-side bound compare situation, there is only lower bound or only upper bound + if (pL->nLen == -1) { + return 1; // no lower bound, lower bound is minimum, always return -1; + } else if (pR->nLen == -1) { + return -1; // no upper bound, upper bound is maximum situation, always return 1; + } + + int32_t ret = wcscmp(((tSkipListKey *)pLeft)->wpz, ((tSkipListKey *)pRight)->wpz); + + if (ret == 0) { + return 0; + } else { + return ret > 0 ? 1 : -1; + } +} + +static __compar_fn_t getKeyFilterComparator(tSkipList *pSkipList, int32_t filterDataType) { + __compar_fn_t comparator = NULL; + + switch (pSkipList->keyType) { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_BOOL: { + if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { + comparator = compareIntVal; + } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { + comparator = scompareIntDoubleVal; + } + break; + } + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: { + if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { + comparator = scompareDoubleIntVal; + } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { + comparator = scompareDoubleVal; + } + break; + } + case TSDB_DATA_TYPE_BINARY: + comparator = scompareStrVal; + break; + case TSDB_DATA_TYPE_NCHAR: + comparator = scompareWStrVal; + break; + default: + comparator = compareIntVal; + break; + } + + return comparator; +} + +static __compar_fn_t getKeyComparator(int32_t keyType) { + __compar_fn_t comparator = NULL; + + switch (keyType) { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_BOOL: + comparator = compareIntVal; + break; + + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + comparator = scompareDoubleVal; + break; + + case TSDB_DATA_TYPE_BINARY: + comparator = scompareStrVal; + break; + + case TSDB_DATA_TYPE_NCHAR: + comparator = scompareWStrVal; + break; + + default: + comparator = compareIntVal; + break; + } + + return comparator; +} + +tSkipList* SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen) { + tSkipList *pSkipList = (tSkipList *)calloc(1, sizeof(tSkipList)); + if (pSkipList == NULL) { + return NULL; + } + + pSkipList->keyType = keyType; + + pSkipList->comparator = getKeyComparator(keyType); + pSkipList->pHead.pForward = (tSkipListNode **)calloc(1, POINTER_BYTES * MAX_SKIP_LIST_LEVEL); + + pSkipList->nMaxLevel = MAX_SKIP_LIST_LEVEL; + pSkipList->nLevel = 1; + + pSkipList->nMaxKeyLen = nMaxKeyLen; + pSkipList->nMaxLevel = nMaxLevel; + + if (pthread_rwlock_init(&pSkipList->lock, NULL) != 0) { + tfree(pSkipList->pHead.pForward); + tfree(pSkipList); + return NULL; + } + + srand(time(NULL)); + pSkipList->state.nTotalMemSize += sizeof(tSkipList); + return pSkipList; +} + +static void doRemove(tSkipList *pSkipList, tSkipListNode *pNode, tSkipListNode *forward[]) { + int32_t level = pNode->nLevel; + for (int32_t j = level - 1; j >= 0; --j) { + if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) { + forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j]; + } + + if (forward[j]->pForward[j] != NULL) { + forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j]; + } + } + + pSkipList->state.nTotalMemSize -= (sizeof(tSkipListNode) + POINTER_BYTES * pNode->nLevel * 2); + removeNodeEachLevel(pSkipList, pNode->nLevel); + + tfree(pNode); + --pSkipList->nSize; +} + +static size_t getOneNodeSize(const tSkipListKey *pKey, int32_t nLevel) { + size_t size = sizeof(tSkipListNode) + sizeof(intptr_t) * (nLevel << 1); + if (pKey->nType == TSDB_DATA_TYPE_BINARY) { + size += pKey->nLen + 1; + } else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) { + size += (pKey->nLen + 1) * TSDB_NCHAR_SIZE; + } + + return size; +} + +static tSkipListNode *SSkipListCreateNode(void *pData, const tSkipListKey *pKey, int32_t nLevel) { + size_t nodeSize = getOneNodeSize(pKey, nLevel); + tSkipListNode *pNode = (tSkipListNode *)calloc(1, nodeSize); + + pNode->pForward = (tSkipListNode **)(&pNode[1]); + pNode->pBackward = (pNode->pForward + nLevel); + + pNode->pData = pData; + + pNode->key = *pKey; + if (pKey->nType == TSDB_DATA_TYPE_BINARY) { + pNode->key.pz = (char *)(pNode->pBackward + nLevel); + + strcpy(pNode->key.pz, pKey->pz); + pNode->key.pz[pKey->nLen] = 0; + } else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) { + pNode->key.wpz = (wchar_t *)(pNode->pBackward + nLevel); + wcsncpy(pNode->key.wpz, pKey->wpz, pKey->nLen); + pNode->key.wpz[pKey->nLen] = 0; + } + + pNode->nLevel = nLevel; + return pNode; +} + +tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength) { + tSkipListKey k = {0}; + tVariantCreateFromBinary(&k, val, (uint32_t) keyLength, (uint32_t) type); + return k; +} + +void tSkipListDestroyKey(tSkipListKey *pKey) { tVariantDestroy(pKey); } + +void* SSkipListDestroy(tSkipList *pSkipList) { + if (pSkipList == NULL) { + return NULL; + } + + pthread_rwlock_wrlock(&pSkipList->lock); + tSkipListNode *pNode = pSkipList->pHead.pForward[0]; + while (pNode) { + tSkipListNode *pTemp = pNode; + pNode = pNode->pForward[0]; + tfree(pTemp); + } + + tfree(pSkipList->pHead.pForward); + pthread_rwlock_unlock(&pSkipList->lock); + + pthread_rwlock_destroy(&pSkipList->lock); + tfree(pSkipList); + + return NULL; +} + +tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey) { + if (pSkipList == NULL) { + return NULL; + } + + pthread_rwlock_wrlock(&pSkipList->lock); + + // record one node is put into skiplist + SSkipListDoRecordPut(pSkipList); + + tSkipListNode *px = &pSkipList->pHead; + + tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) { + while (px->pForward[i] != NULL && (pSkipList->comparator(&px->pForward[i]->key, pKey) < 0)) { + px = px->pForward[i]; + } + + pSkipList->state.nTotalStepsForInsert++; + forward[i] = px; + } + + // if the skiplist does not allowed identical key inserted, the new data will be discarded. + if ((insertIdenticalKey == 0) && forward[0] != &pSkipList->pHead && + (pSkipList->comparator(&forward[0]->key, pKey) == 0)) { + pthread_rwlock_unlock(&pSkipList->lock); + return forward[0]; + } + + int32_t nLevel = getSkipListNodeLevel(pSkipList); + recordNodeEachLevel(pSkipList, nLevel); + + tSkipListNode *pNode = SSkipListCreateNode(pData, pKey, nLevel); + tSkipListDoInsert(pSkipList, forward, nLevel, pNode); + + pSkipList->nSize += 1; + + // char tmpstr[512] = {0}; + // tVariantToString(&pNode->key, tmpstr); + // pTrace("skiplist:%p, node added, key:%s, total list len:%d", pSkipList, + // tmpstr, pSkipList->nSize); + + pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, nLevel); + pthread_rwlock_unlock(&pSkipList->lock); + + return pNode; +} + +void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode) { + for (int32_t i = 0; i < nLevel; ++i) { + tSkipListNode *x = forward[i]; + if (x != NULL) { + pNode->pBackward[i] = x; + if (x->pForward[i]) x->pForward[i]->pBackward[i] = pNode; + + pNode->pForward[i] = x->pForward[i]; + x->pForward[i] = pNode; + } else { + pSkipList->pHead.pForward[i] = pNode; + pNode->pBackward[i] = &(pSkipList->pHead); + } + } +} + +tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey) { + int32_t sLevel = pSkipList->nLevel - 1; + int32_t ret = -1; + + tSkipListNode *x = &pSkipList->pHead; + + pthread_rwlock_rdlock(&pSkipList->lock); + pSkipList->state.queryCount++; + + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + for (int32_t i = sLevel; i >= 0; --i) { + while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) { + x = x->pForward[i]; + } + + if (ret == 0) { + pthread_rwlock_unlock(&pSkipList->lock); + return x->pForward[i]; + } + } + + pthread_rwlock_unlock(&pSkipList->lock); + return NULL; +} + +static int32_t tSkipListEndParQuery(tSkipList *pSkipList, tSkipListNode *pStartNode, tSkipListKey *pEndKey, + int32_t cond, tSkipListNode ***pRes) { + pthread_rwlock_rdlock(&pSkipList->lock); + tSkipListNode *p = pStartNode; + int32_t numOfRes = 0; + + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pEndKey->nType); + while (p != NULL) { + int32_t ret = filterComparator(&p->key, pEndKey); + if (ret > 0) { + break; + } + + if (ret < 0) { + numOfRes++; + p = p->pForward[0]; + } else if (ret == 0) { + if (cond == TSDB_RELATION_LESS_EQUAL) { + numOfRes++; + p = p->pForward[0]; + } else { + break; + } + } + } + + (*pRes) = (tSkipListNode **)malloc(POINTER_BYTES * numOfRes); + for (int32_t i = 0; i < numOfRes; ++i) { + (*pRes)[i] = pStartNode; + pStartNode = pStartNode->pForward[0]; + } + pthread_rwlock_unlock(&pSkipList->lock); + + return numOfRes; +} + +/* + * maybe return the copy of tSkipListNode would be better + */ +int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes) { + (*pRes) = NULL; + + tSkipListNode *pNode = tSkipListGetOne(pSkipList, pKey); + if (pNode == NULL) { + return 0; + } + + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + // backward check if previous nodes are with the same value. + tSkipListNode *pPrev = pNode->pBackward[0]; + while ((pPrev != &pSkipList->pHead) && filterComparator(&pPrev->key, pKey) == 0) { + pPrev = pPrev->pBackward[0]; + } + + return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes); +} + +static tSkipListNode *tSkipListParQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t cond) { + int32_t sLevel = pSkipList->nLevel - 1; + int32_t ret = -1; + + tSkipListNode *x = &pSkipList->pHead; + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + pthread_rwlock_rdlock(&pSkipList->lock); + + if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) { + for (int32_t i = sLevel; i >= 0; --i) { + while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) { + x = x->pForward[i]; + } + } + + // backward check if previous nodes are with the same value. + if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) { + tSkipListNode *pNode = x->pForward[0]; + while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparator(&pNode->pBackward[0]->key, pKey) == 0)) { + pNode = pNode->pBackward[0]; + } + pthread_rwlock_unlock(&pSkipList->lock); + return pNode; + } + + if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) { + pthread_rwlock_unlock(&pSkipList->lock); + return x->pForward[0]; + } else { // cond == TSDB_RELATION_LARGE && ret == 0 + tSkipListNode *pn = x->pForward[0]; + while (pn != NULL && filterComparator(&pn->key, pKey) == 0) { + pn = pn->pForward[0]; + } + pthread_rwlock_unlock(&pSkipList->lock); + return pn; + } + } + + pthread_rwlock_unlock(&pSkipList->lock); + return NULL; +} + +int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *), + void *param) { + (*pRes) = (tSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize); + if (NULL == *pRes) { + pError("error skiplist %p, malloc failed", pSkipList); + return -1; + } + + pthread_rwlock_rdlock(&pSkipList->lock); + tSkipListNode *pStartNode = pSkipList->pHead.pForward[0]; + int32_t num = 0; + + for (int32_t i = 0; i < pSkipList->nSize; ++i) { + if (pStartNode == NULL) { + pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1); +#ifdef _DEBUG_VIEW + SSkipListPrint(pSkipList, 1); +#endif + break; + } + + if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) { + (*pRes)[num++] = pStartNode; + } + + pStartNode = pStartNode->pForward[0]; + } + + pthread_rwlock_unlock(&pSkipList->lock); + + if (num == 0) { + free(*pRes); + *pRes = NULL; + } else if (num < pSkipList->nSize) { // free unused memory + char* tmp = realloc((*pRes), num * POINTER_BYTES); + assert(tmp != NULL); + + *pRes = (tSkipListNode**)tmp; + } + + return num; +} + +int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator* iter) { + if (pSkipList == NULL) { + return -1; + } + + iter->pSkipList = pSkipList; + + pthread_rwlock_rdlock(&pSkipList->lock); + iter->cur = NULL;//pSkipList->pHead.pForward[0]; + iter->num = pSkipList->nSize; + pthread_rwlock_unlock(&pSkipList->lock); + + return 0; +} + +bool tSkipListIteratorNext(SSkipListIterator* iter) { + if (iter->num == 0 || iter->pSkipList == NULL) { + return false; + } + + tSkipList* pSkipList = iter->pSkipList; + + pthread_rwlock_rdlock(&pSkipList->lock); + if (iter->cur == NULL) { + iter->cur = pSkipList->pHead.pForward[0]; + } else { + iter->cur = iter->cur->pForward[0]; + } + + pthread_rwlock_unlock(&pSkipList->lock); + + return iter->cur != NULL; +} + +tSkipListNode* tSkipListIteratorGet(SSkipListIterator* iter) { + return iter->cur; +} + +int32_t tSkipListRangeQuery(tSkipList *pSkipList, tSKipListQueryCond *pCond, tSkipListNode ***pRes) { + pSkipList->state.queryCount++; + tSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr); + if (pStart == 0) { + *pRes = NULL; + return 0; + } + + return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes); +} + +static bool removeSupport(tSkipList *pSkipList, tSkipListNode **forward, tSkipListKey *pKey) { + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + if (filterComparator(&forward[0]->pForward[0]->key, pKey) == 0) { + tSkipListNode *p = forward[0]->pForward[0]; + doRemove(pSkipList, p, forward); + } else { // failed to find the node of specified value,abort + return false; + } + + // compress the minimum level of skip list + while (pSkipList->nLevel > 0 && pSkipList->pHead.pForward[pSkipList->nLevel - 1] == NULL) { + pSkipList->nLevel -= 1; + } + + return true; +} + +void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode) { + tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + + pthread_rwlock_rdlock(&pSkipList->lock); + for (int32_t i = 0; i < pNode->nLevel; ++i) { + forward[i] = pNode->pBackward[i]; + } + + removeSupport(pSkipList, forward, &pNode->key); + pthread_rwlock_unlock(&pSkipList->lock); +} + +bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey) { + tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + pthread_rwlock_rdlock(&pSkipList->lock); + + tSkipListNode *x = &pSkipList->pHead; + for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) { + while (x->pForward[i] != NULL && (filterComparator(&x->pForward[i]->key, pKey) < 0)) { + x = x->pForward[i]; + } + forward[i] = x; + } + + bool ret = removeSupport(pSkipList, forward, pKey); + pthread_rwlock_unlock(&pSkipList->lock); + + return ret; +} + +void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel) { + if (pSkipList == NULL || pSkipList->nLevel < nlevel || nlevel <= 0) { + return; + } + + tSkipListNode *p = pSkipList->pHead.pForward[nlevel - 1]; + int32_t id = 1; + while (p) { + switch (pSkipList->keyType) { + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_BIGINT: + fprintf(stdout, "%d: %" PRId64 " \n", id++, p->key.i64Key); + break; + case TSDB_DATA_TYPE_BINARY: + fprintf(stdout, "%d: %s \n", id++, p->key.pz); + break; + case TSDB_DATA_TYPE_DOUBLE: + fprintf(stdout, "%d: %lf \n", id++, p->key.dKey); + break; + default: + fprintf(stdout, "\n"); + } + p = p->pForward[nlevel - 1]; + } +} + +/* + * query processor based on query condition + */ +int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult) { + // query condition check + int32_t rel = 0; + __compar_fn_t comparator = getKeyComparator(pQueryCond->lowerBnd.nType); + + if (pSkipList == NULL || pQueryCond == NULL || pSkipList->nSize == 0 || + (((rel = comparator(&pQueryCond->lowerBnd, &pQueryCond->upperBnd)) > 0 && + pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_NCHAR && pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_BINARY))) { + (*pResult) = NULL; + return 0; + } + + if (rel == 0) { + /* + * 0 means: pQueryCond->lowerBnd == pQueryCond->upperBnd + * point query + */ + if (pQueryCond->lowerBndRelOptr == TSDB_RELATION_LARGE_EQUAL && + pQueryCond->upperBndRelOptr == TSDB_RELATION_LESS_EQUAL) { // point query + return tSkipListGets(pSkipList, &pQueryCond->lowerBnd, pResult); + } else { + (*pResult) = NULL; + return 0; + } + } else { + /* range query, query operation code check */ + return tSkipListRangeQuery(pSkipList, pQueryCond, pResult); + } +} + +typedef struct MultipleQueryResult { + int32_t len; + tSkipListNode **pData; +} MultipleQueryResult; + +static int32_t mergeQueryResult(MultipleQueryResult *pResults, int32_t numOfResSet, tSkipListNode ***pRes) { + int32_t total = 0; + for (int32_t i = 0; i < numOfResSet; ++i) { + total += pResults[i].len; + } + + (*pRes) = malloc(POINTER_BYTES * total); + int32_t idx = 0; + + for (int32_t i = 0; i < numOfResSet; ++i) { + MultipleQueryResult *pOneResult = &pResults[i]; + for (int32_t j = 0; j < pOneResult->len; ++j) { + (*pRes)[idx++] = pOneResult->pData[j]; + } + } + + return total; +} + +static void removeDuplicateKey(tSkipListKey *pKey, int32_t *numOfKey, __compar_fn_t comparator) { + if (*numOfKey == 1) { + return; + } + + qsort(pKey, *numOfKey, sizeof(pKey[0]), comparator); + int32_t i = 0, j = 1; + + while (i < (*numOfKey) && j < (*numOfKey)) { + int32_t ret = comparator(&pKey[i], &pKey[j]); + if (ret == 0) { + j++; + } else { + pKey[i + 1] = pKey[j]; + i++; + j++; + } + } + + (*numOfKey) = i + 1; +} + +int32_t mergeResult(const tSkipListKey *pKey, int32_t numOfKey, tSkipListNode ***pRes, __compar_fn_t comparator, + tSkipListNode *pNode) { + int32_t i = 0, j = 0; + // merge two sorted arrays in O(n) time + while (i < numOfKey && pNode != NULL) { + int32_t ret = comparator(&pNode->key, &pKey[i]); + if (ret < 0) { + (*pRes)[j++] = pNode; + pNode = pNode->pForward[0]; + } else if (ret == 0) { + pNode = pNode->pForward[0]; + } else { // pNode->key > pkey[i] + i++; + } + } + + while (pNode != NULL) { + (*pRes)[j++] = pNode; + pNode = pNode->pForward[0]; + } + return j; +} + +int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type, + tSkipListNode ***pRes) { + if (numOfKey == 0 || pKey == NULL || pSkipList == NULL || pSkipList->nSize == 0 || + (type != INCLUDE_POINT_QUERY && type != EXCLUDE_POINT_QUERY)) { + (*pRes) = NULL; + return 0; + } + + __compar_fn_t comparator = getKeyComparator(pKey->nType); + removeDuplicateKey(pKey, &numOfKey, comparator); + + if (type == INCLUDE_POINT_QUERY) { + if (numOfKey == 1) { + return tSkipListGets(pSkipList, &pKey[0], pRes); + } else { + MultipleQueryResult *pTempResult = (MultipleQueryResult *)malloc(sizeof(MultipleQueryResult) * numOfKey); + for (int32_t i = 0; i < numOfKey; ++i) { + pTempResult[i].len = tSkipListGets(pSkipList, &pKey[i], &pTempResult[i].pData); + } + int32_t num = mergeQueryResult(pTempResult, numOfKey, pRes); + + for (int32_t i = 0; i < numOfKey; ++i) { + free(pTempResult[i].pData); + } + free(pTempResult); + return num; + } + } else { // exclude query + *pRes = malloc(POINTER_BYTES * pSkipList->nSize); + + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + tSkipListNode *pNode = pSkipList->pHead.pForward[0]; + int32_t retLen = mergeResult(pKey, numOfKey, pRes, filterComparator, pNode); + + if (retLen < pSkipList->nSize) { + (*pRes) = realloc(*pRes, POINTER_BYTES * retLen); + } + return retLen; + } +} diff --git a/src/util/src/thistogram.c b/src/util/src/thistogram.c index b603163b39..31045a4957 100644 --- a/src/util/src/thistogram.c +++ b/src/util/src/thistogram.c @@ -126,7 +126,7 @@ SHistogramInfo* tHistogramCreate(int32_t numOfEntries) { SHistogramInfo* pHisto = malloc(sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfEntries + 1)); #if !defined(USE_ARRAYLIST) - pHisto->pList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + pHisto->pList = SSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); SInsertSupporter* pss = malloc(sizeof(SInsertSupporter)); pss->numOfEntries = pHisto->maxEntries; pss->pSkipList = pHisto->pList; @@ -185,7 +185,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { SHistBin* entry = calloc(1, sizeof(SHistBin)); entry->val = val; - tSkipListNode* pResNode = tSkipListPut((*pHisto)->pList, entry, &key, 0); + tSkipListNode* pResNode = SSkipListPut((*pHisto)->pList, entry, &key, 0); SHistBin* pEntry1 = (SHistBin*)pResNode->pData; pEntry1->index = -1; @@ -239,7 +239,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { // set the right value for loser-tree assert((*pHisto)->pLoserTree != NULL); if (!(*pHisto)->ordered) { - tSkipListPrint((*pHisto)->pList, 1); + SSkipListPrint((*pHisto)->pList, 1); SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0]; @@ -281,7 +281,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { printf("delta is:%lf\n", pEntry1->delta); - tSkipListPrint((*pHisto)->pList, 1); + SSkipListPrint((*pHisto)->pList, 1); /* the chosen node */ tSkipListNode* pNode = (*pHisto)->pLoserTree->pNode[0].pData; @@ -319,7 +319,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { tLoserTreeAdjust(pTree, pEntry->index + pTree->numOfEntries); // remove the next node in skiplist tSkipListRemoveNode((*pHisto)->pList, pNext); - tSkipListPrint((*pHisto)->pList, 1); + SSkipListPrint((*pHisto)->pList, 1); tLoserTreeDisplay((*pHisto)->pLoserTree); } else { // add to heap -- GitLab