diff --git a/src/client/src/tcache.c b/src/client/src/tcache.c index e6213c45a41f104e6d948c2b46e281e9909ea2b8..f9941c1c0f2102c7fc2b0c31eb264cbabcb3498a 100644 --- a/src/client/src/tcache.c +++ b/src/client/src/tcache.c @@ -15,12 +15,12 @@ #include "os.h" +#include "hashfunc.h" #include "tcache.h" #include "tlog.h" #include "ttime.h" #include "ttimer.h" #include "tutil.h" -#include "hashutil.h" #define HASH_MAX_CAPACITY (1024*1024*16) #define HASH_VALUE_IN_TRASH (-1) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 4c9f99b93235f38cfd995dc822e2ceef2ee1a325..1b7ed4002be01875fa30226b3bb6e0be348c739d 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1002,7 +1002,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { || ((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList))); if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) { - pSql->pTableHashList = taosInitHashTable(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + pSql->pTableHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); pSql->cmd.pDataBlocks = tscCreateBlockArrayList(); if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) { @@ -1260,7 +1260,7 @@ _error_clean: pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); _clean: - taosCleanUpHashTable(pSql->pTableHashList); + taosHashCleanup(pSql->pTableHashList); pSql->pTableHashList = NULL; pSql->asyncTblPos = NULL; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 59e5127e40b8a2b535f18a6630db81f7c6e846bc..bbe2fa8d3a50516a884b5404affab3f53bc5d430 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -206,7 +206,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { pSql->asyncTblPos = NULL; if (NULL != pSql->pTableHashList) { - taosCleanUpHashTable(pSql->pTableHashList); + taosHashCleanup(pSql->pTableHashList); pSql->pTableHashList = NULL; } @@ -705,8 +705,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } // current data are exhausted, fetch more data - if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pCmd->command == TSDB_SQL_RETRIEVE)) { + if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && + (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); + sem_wait(&pSql->rspSem); } @@ -1079,7 +1081,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { pSql->asyncTblPos = NULL; if (NULL != pSql->pTableHashList) { - taosCleanUpHashTable(pSql->pTableHashList); + taosHashCleanup(pSql->pTableHashList); pSql->pTableHashList = NULL; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 3386b73acd104c702b8f528d9a808476abf9e863..320aeee27e5f183b84184c56966780dde558054e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -677,7 +677,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, STableDataBlocks** dataBlocks) { *dataBlocks = NULL; - STableDataBlocks** t1 = (STableDataBlocks**)taosGetDataFromHashTable(pHashList, (const char*)&id, sizeof(id)); + STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); if (t1 != NULL) { *dataBlocks = *t1; } @@ -688,7 +688,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, return ret; } - taosAddToHashTable(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); + taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); tscAppendDataBlock(pDataBlockList, *dataBlocks); } @@ -698,7 +698,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) { SSqlCmd* pCmd = &pSql->cmd; - void* pVnodeDataBlockHashList = taosInitHashTable(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList(); for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) { @@ -710,7 +710,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pMeterMeta, &dataBuf); if (ret != TSDB_CODE_SUCCESS) { tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); - taosCleanUpHashTable(pVnodeDataBlockHashList); + taosHashCleanup(pVnodeDataBlockHashList); tscDestroyBlockArrayList(pVnodeDataBlockList); return ret; } @@ -728,7 +728,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi } else { // failed to allocate memory, free already allocated memory and return error code tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize); - taosCleanUpHashTable(pVnodeDataBlockHashList); + taosHashCleanup(pVnodeDataBlockHashList); tfree(dataBuf->pData); tscDestroyBlockArrayList(pVnodeDataBlockList); @@ -761,7 +761,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi pCmd->pDataBlocks = pVnodeDataBlockList; tscFreeUnusedDataBlocks(pCmd->pDataBlocks); - taosCleanUpHashTable(pVnodeDataBlockHashList); + taosHashCleanup(pVnodeDataBlockHashList); return TSDB_CODE_SUCCESS; } diff --git a/src/query/src/tresultBuf.c b/src/query/src/tresultBuf.c index a7377f16575147934f68148adb2d16126288ffc9..11e17cc5a3437c8c03176cb2ead3318bb9a42d88 100644 --- a/src/query/src/tresultBuf.c +++ b/src/query/src/tresultBuf.c @@ -16,7 +16,7 @@ int32_t createDiskbasedResultBuffer(SQueryDiskbasedResultBuf** pResultBuf, int32 pResBuf->incStep = 4; // init id hash table - pResBuf->idsTable = taosInitHashTable(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); + pResBuf->idsTable = taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pResBuf->list = calloc(size, sizeof(SIDList)); pResBuf->numOfAllocGroupIds = size; @@ -56,7 +56,7 @@ tFilePage* getResultBufferPageById(SQueryDiskbasedResultBuf* pResultBuf, int32_t return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_SIZE * id); } -int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf) { return taosNumElemsInHashTable(pResultBuf->idsTable); } +int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); } int32_t getResBufSize(SQueryDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; } @@ -95,7 +95,7 @@ static bool noMoreAvailablePages(SQueryDiskbasedResultBuf* pResultBuf) { static int32_t getGroupIndex(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId) { assert(pResultBuf != NULL); - char* p = taosGetDataFromHashTable(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t)); + char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t)); if (p == NULL) { // it is a new group id return -1; } @@ -121,7 +121,7 @@ static int32_t addNewGroupId(SQueryDiskbasedResultBuf* pResultBuf, int32_t group pResultBuf->numOfAllocGroupIds = n; } - taosAddToHashTable(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t)); + taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t)); return num; } @@ -210,7 +210,7 @@ void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf) { } tfree(pResultBuf->list); - taosCleanUpHashTable(pResultBuf->idsTable); + taosHashCleanup(pResultBuf->idsTable); tfree(pResultBuf); } diff --git a/src/query/src/tscAst.c b/src/query/src/tscAst.c index 7be583a5ea1c59bae4065185e39e22b2d0d75dd1..ef83083a2c5aa91fdbbb4db5bbe8330ae188d594 100644 --- a/src/query/src/tscAst.c +++ b/src/query/src/tscAst.c @@ -14,7 +14,6 @@ */ #include "os.h" -#include "sskiplist.h" #include "taosdef.h" #include "taosmsg.h" #include "tast.h" diff --git a/src/query/src/ttokenizer.c b/src/query/src/ttokenizer.c index f43d344ca3aeb82b45c15d6eea399f22c12392c1..61d2e59c87fa2dc136d8e51869c10fd6028a9127 100644 --- a/src/query/src/ttokenizer.c +++ b/src/query/src/ttokenizer.c @@ -14,12 +14,12 @@ */ #include "hash.h" -#include "hashutil.h" +#include "hashfunc.h" #include "os.h" #include "shash.h" +#include "taosdef.h" #include "tstoken.h" #include "ttokendef.h" -#include "taosdef.h" #include "tutil.h" // All the keywords of the SQL language are stored in a hash table @@ -253,11 +253,11 @@ static void* KeywordHashTable = NULL; static void doInitKeywordsTable() { int numOfEntries = tListLen(keywordTable); - KeywordHashTable = taosInitHashTable(numOfEntries, MurmurHash3_32, false); + KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, false); for (int32_t i = 0; i < numOfEntries; i++) { keywordTable[i].len = strlen(keywordTable[i].name); void* ptr = &keywordTable[i]; - taosAddToHashTable(KeywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES); + taosHashPut(KeywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES); } } @@ -275,7 +275,7 @@ int tSQLKeywordCode(const char* z, int n) { } } - SKeyword** pKey = (SKeyword**)taosGetDataFromHashTable(KeywordHashTable, key, n); + SKeyword** pKey = (SKeyword**)taosHashGet(KeywordHashTable, key, n); if (pKey != NULL) { return (*pKey)->type; } else { diff --git a/src/query/src/tvariant.c b/src/query/src/tvariant.c index c9168ab252bd39f58ddcf0e09f676ff585c27c56..f0addb733bf496e1f65af8a744f4584e6bb2396c 100644 --- a/src/query/src/tvariant.c +++ b/src/query/src/tvariant.c @@ -13,17 +13,16 @@ * along with this program. If not, see . */ +#include "tvariant.h" #include "hash.h" -#include "hashutil.h" +#include "hashfunc.h" #include "os.h" #include "shash.h" +#include "taos.h" +#include "taosdef.h" #include "tstoken.h" #include "ttokendef.h" -#include "taosdef.h" #include "tutil.h" -#include "tvariant.h" -#include "taosdef.h" -#include "taos.h" // todo support scientific expression number and oct number void tVariantCreate(tVariant *pVar, SSQLToken *token) { tVariantCreateFromString(pVar, token->z, token->n, token->type); } diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index 3d60abe9c69769541bc26d70c7d0b787ed2252d6..8aa4da85f7bc41513312b0ce50715bee636073b4 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -20,7 +20,7 @@ extern "C" { #endif -#include "hashutil.h" +#include "hashfunc.h" #define HASH_MAX_CAPACITY (1024 * 1024 * 16) #define HASH_VALUE_IN_TRASH (-1) @@ -45,32 +45,77 @@ typedef struct SHashEntry { uint32_t num; } SHashEntry; -typedef struct HashObj { +typedef struct SHashObj { SHashEntry **hashList; - uint32_t capacity; // number of slots - int size; // number of elements in hash table + size_t capacity; // number of slots + size_t size; // number of elements in hash table _hash_fn_t hashFp; // hash function - bool multithreadSafe; // enable lock or not -#if defined LINUX - pthread_rwlock_t lock; +#if defined (LINUX) + pthread_rwlock_t* lock; #else - pthread_mutex_t lock; + pthread_mutex_t* lock; #endif +} SHashObj; -} HashObj; +/** + * init the hash table + * + * @param capacity initial capacity of the hash table + * @param fn hash function to generate the hash value + * @param threadsafe thread safe or not + * @return + */ +SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe); -void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe); -void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen); +/** + * return the size of hash table + * @param pHashObj + * @return + */ +size_t taosHashGetSize(const SHashObj *pHashObj); + +/** + * put element into hash table, if the element with the same key exists, update it + * @param pHashObj + * @param key + * @param keyLen + * @param data + * @param size + * @return + */ +int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *data, size_t size); -int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void *data, uint32_t size); -int32_t taosNumElemsInHashTable(HashObj *pObj); +/** + * return the payload data with the specified key + * + * @param pHashObj + * @param key + * @param keyLen + * @return + */ +void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen); -char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen); +/** + * remove item with the specified key + * @param pHashObj + * @param key + * @param keyLen + */ +void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen); -void taosCleanUpHashTable(void *handle); +/** + * clean up hash table + * @param handle + */ +void taosHashCleanup(SHashObj *pHashObj); -int32_t taosGetHashMaxOverflowLength(HashObj *pObj); +/** + * + * @param pHashObj + * @return + */ +int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj); #ifdef __cplusplus } diff --git a/src/util/inc/hashutil.h b/src/util/inc/hashfunc.h similarity index 100% rename from src/util/inc/hashutil.h rename to src/util/inc/hashfunc.h diff --git a/src/util/inc/sskiplist.h b/src/util/inc/sskiplist.h deleted file mode 100644 index f2ae2efc54342a1aa0fd1298b8868132d65e246d..0000000000000000000000000000000000000000 --- a/src/util/inc/sskiplist.h +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#if 0 - -#ifndef TBASE_TSKIPLIST_H -#define TBASE_TSKIPLIST_H - -#ifdef __cplusplus -extern "C" { -#endif - -#define MAX_SKIP_LIST_LEVEL 20 - -#include -#include -#include - -#include "os.h" -#include "taosdef.h" - -/* - * key of each node - * todo move to as the global structure in all search codes... - */ - -const static size_t SKIP_LIST_STR_KEY_LENGTH_THRESHOLD = 15; -typedef tVariant tSkipListKey; - -typedef enum tSkipListPointQueryType { - INCLUDE_POINT_QUERY, - EXCLUDE_POINT_QUERY, -} tSkipListPointQueryType; - -typedef struct tSkipListNode { - uint16_t nLevel; - char * pData; - - struct tSkipListNode **pForward; - struct tSkipListNode **pBackward; - - tSkipListKey key; -} tSkipListNode; - -/* - * @version 0.2 - * @date 2017/11/12 - * the simple version of SkipList. - * for multi-thread safe purpose, we employ pthread_rwlock_t to guarantee to generate - * deterministic result. Later, we will remove the lock in SkipList to further - * enhance the performance. In this case, one should use the concurrent skip list (by - * using michael-scott algorithm) instead of this simple version in a multi-thread - * environment, to achieve higher performance of read/write operations. - * - * Note: Duplicated primary key situation. - * In case of duplicated primary key, two ways can be employed to handle this situation: - * 1. add as normal insertion with out special process. - * 2. add an overflow pointer at each list node, all nodes with the same key will be added - * in the overflow pointer. In this case, the total steps of each search will be reduced significantly. - * Currently, we implement the skip list in a line with the first means, maybe refactor it soon. - * - * Memory consumption: the memory alignment causes many memory wasted. So, employ a memory - * pool will significantly reduce the total memory consumption, as well as the calloc/malloc operation costs. - * - * 3. use the iterator pattern to refactor all routines to make it more clean - */ - -// state struct, record following information: -// number of links in each level. -// avg search steps, for latest 1000 queries -// avg search rsp time, for latest 1000 queries -// total memory size -typedef struct tSkipListState { - // in bytes, sizeof(tSkipList)+sizeof(tSkipListNode)*tSkipList->nSize - uint64_t nTotalMemSize; - uint64_t nLevelNodeCnt[MAX_SKIP_LIST_LEVEL]; - uint64_t queryCount; // total query count - - /* - * only record latest 1000 queries - * when the value==1000, = 0, - * nTotalStepsForQueries = 0, - * nTotalElapsedTimeForQueries = 0 - */ - uint64_t nRecQueries; - uint16_t nTotalStepsForQueries; - uint64_t nTotalElapsedTimeForQueries; - - uint16_t nInsertObjs; - uint16_t nTotalStepsForInsert; - uint64_t nTotalElapsedTimeForInsert; -} tSkipListState; - -typedef struct tSkipList { - tSkipListNode pHead; - uint64_t nSize; - uint16_t nMaxLevel; - uint16_t nLevel; - uint16_t keyType; - uint16_t nMaxKeyLen; - - __compar_fn_t comparator; - pthread_rwlock_t lock; // will be removed soon - tSkipListState state; // skiplist state -} tSkipList; - -/* - * iterate the skiplist - * this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may - * continue iterating the skiplist, so add the reference count for skiplist - * TODO add the ref for skiplist when one iterator is created - */ -typedef struct SSkipListIterator { - tSkipList * pSkipList; - tSkipListNode *cur; - int64_t num; -} SSkipListIterator; - -/* - * query condition structure to denote the range query - * todo merge the point query cond with range query condition - */ -typedef struct tSKipListQueryCond { - // when the upper bounding == lower bounding, it is a point query - tSkipListKey lowerBnd; - tSkipListKey upperBnd; - int32_t lowerBndRelOptr; // relation operator to denote if lower bound is - int32_t upperBndRelOptr; // included or not -} tSKipListQueryCond; - -tSkipList *SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen); - -void *SSkipListDestroy(tSkipList *pSkipList); - -// create skip list key -tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength); - -// destroy skip list key -void tSkipListDestroyKey(tSkipListKey *pKey); - -// put data into skiplist -tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey); - -/* - * get only *one* node of which key is equalled to pKey, even there are more - * than one nodes are of the same key - */ -tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey); - -/* - * get all data with the same keys - */ -int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes); - -int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *), - void *param); - -/* - * remove only one node of the pKey value. - * If more than one node has the same value, any one will be removed - * - * @Return - * true: one node has been removed - * false: no node has been removed - */ -bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey); - -/* - * remove the specified node in parameters - */ -void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode); - -// for debug purpose only -void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel); - -/* - * range query & single point query function - */ -int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult); - -/* - * include/exclude point query - */ -int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type, - tSkipListNode ***pResult); - -int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator *iter); -bool tSkipListIteratorNext(SSkipListIterator *iter); -tSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter); - -#ifdef __cplusplus -} -#endif - -#endif // TBASE_TSKIPLIST_H -#endif \ No newline at end of file diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index 244849a1ed47c43654ea437e85e4b312a24d956e..55bdc849adf4bf6d8941f61c669f73cdcf2880fd 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -76,7 +76,7 @@ void* taosArrayGetP(SArray* pArray, size_t index); * @param pArray * @return */ -size_t taosArrayGetSize(SArray* pArray); +size_t taosArrayGetSize(const SArray* pArray); /** * insert data into array diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 2ebb9bf5f2cd48f67f4c044a2764d0b3ff449c40..0ec1d0eab52298913029f455e1566d9cf5d6630b 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -185,15 +185,39 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode); SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType); /** - * + * get the size of skip list * @param pSkipList - * @param pRes - * @param fp - * @param param * @return */ -int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *), - void *param); +size_t tSkipListGetSize(const SSkipList* pSkipList); + +/** + * create skiplist iterator + * @param pSkipList + * @return + */ +SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList); + +/** + * forward the skip list iterator + * @param iter + * @return + */ +bool tSkipListIterNext(SSkipListIterator *iter); + +/** + * get the element of skip list node + * @param iter + * @return + */ +SSkipListNode *tSkipListIterGet(SSkipListIterator *iter); + +/** + * destroy the skip list node + * @param iter + * @return + */ +void* tSkipListDestroyIter(SSkipListIterator* iter); /* * remove only one node of the pKey value. @@ -210,9 +234,6 @@ bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey); */ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode); -int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter); -bool tSkipListIteratorNext(SSkipListIterator *iter); -SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter); #ifdef __cplusplus } diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 2f643f17fa7c866a224bfe4b2ed969c485123800..c69fbb772314d6a3569ee38ac043eac7fa413229 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -21,7 +21,11 @@ #include "tutil.h" static FORCE_INLINE void __wr_lock(void *lock) { -#if defined LINUX + if (lock == NULL) { + return; + } + +#if defined (LINUX) pthread_rwlock_wrlock(lock); #else pthread_mutex_lock(lock); @@ -29,7 +33,11 @@ static FORCE_INLINE void __wr_lock(void *lock) { } static FORCE_INLINE void __rd_lock(void *lock) { -#if defined LINUX + if (lock == NULL) { + return; + } + +#if defined (LINUX) pthread_rwlock_rdlock(lock); #else pthread_mutex_lock(lock); @@ -37,7 +45,11 @@ static FORCE_INLINE void __rd_lock(void *lock) { } static FORCE_INLINE void __unlock(void *lock) { -#if defined LINUX + if (lock == NULL) { + return; + } + +#if defined (LINUX) pthread_rwlock_unlock(lock); #else pthread_mutex_unlock(lock); @@ -45,7 +57,11 @@ static FORCE_INLINE void __unlock(void *lock) { } static FORCE_INLINE int32_t __lock_init(void *lock) { -#if defined LINUX + if (lock == NULL) { + return 0; + } + +#if defined (LINUX) return pthread_rwlock_init(lock, NULL); #else return pthread_mutex_init(lock, NULL); @@ -53,7 +69,11 @@ static FORCE_INLINE int32_t __lock_init(void *lock) { } static FORCE_INLINE void __lock_destroy(void *lock) { -#if defined LINUX + if (lock == NULL) { + return; + } + +#if defined (LINUX) pthread_rwlock_destroy(lock); #else pthread_mutex_destroy(lock); @@ -68,21 +88,12 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { return i; } -/** - * hash key function - * - * @param key key string - * @param len length of key - * @return hash value - */ -static FORCE_INLINE uint32_t taosHashKey(const char *key, uint32_t len) { return MurmurHash3_32(key, len); } - /** * inplace update node in hash table - * @param pObj hash table object + * @param pHashObj hash table object * @param pNode data node */ -static void doUpdateHashTable(HashObj *pObj, SHashNode *pNode) { +static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) { if (pNode->prev1) { pNode->prev1->next = pNode; } @@ -96,16 +107,16 @@ static void doUpdateHashTable(HashObj *pObj, SHashNode *pNode) { /** * get SHashNode from hashlist, nodes from trash are not included. - * @param pObj Cache objection + * @param pHashObj Cache objection * @param key key for hash * @param keyLen key length * @return */ -static SHashNode *doGetNodeFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen, uint32_t *hashVal) { - uint32_t hash = (*pObj->hashFp)(key, keyLen); +static SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const char *key, uint32_t keyLen, uint32_t *hashVal) { + uint32_t hash = (*pHashObj->hashFp)(key, keyLen); - int32_t slot = HASH_INDEX(hash, pObj->capacity); - SHashEntry *pEntry = pObj->hashList[slot]; + int32_t slot = HASH_INDEX(hash, pHashObj->capacity); + SHashEntry *pEntry = pHashObj->hashList[slot]; SHashNode *pNode = pEntry->next; while (pNode) { @@ -117,7 +128,7 @@ static SHashNode *doGetNodeFromHashTable(HashObj *pObj, const char *key, uint32_ } if (pNode) { - assert(HASH_INDEX(pNode->hashVal, pObj->capacity) == slot); + assert(HASH_INDEX(pNode->hashVal, pHashObj->capacity) == slot); } // return the calculated hash value, to avoid calculating it again in other functions @@ -131,10 +142,10 @@ static SHashNode *doGetNodeFromHashTable(HashObj *pObj, const char *key, uint32_ /** * resize the hash list if the threshold is reached * - * @param pObj + * @param pHashObj */ -static void taosHashTableResize(HashObj *pObj) { - if (pObj->size < pObj->capacity * HASH_DEFAULT_LOAD_FACTOR) { +static void taosHashTableResize(SHashObj *pHashObj) { + if (pHashObj->size < pHashObj->capacity * HASH_DEFAULT_LOAD_FACTOR) { return; } @@ -142,30 +153,30 @@ static void taosHashTableResize(HashObj *pObj) { SHashNode *pNode = NULL; SHashNode *pNext = NULL; - int32_t newSize = pObj->capacity << 1U; + int32_t newSize = pHashObj->capacity << 1U; if (newSize > HASH_MAX_CAPACITY) { - pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", pObj->capacity, + pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", pHashObj->capacity, HASH_MAX_CAPACITY); return; } int64_t st = taosGetTimestampUs(); - SHashEntry **pNewEntry = realloc(pObj->hashList, sizeof(SHashEntry*) * newSize); + SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry*) * newSize); if (pNewEntry == NULL) { - pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity); + pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); return; } - pObj->hashList = pNewEntry; - for(int32_t i = pObj->capacity; i < newSize; ++i) { - pObj->hashList[i] = calloc(1, sizeof(SHashEntry)); + pHashObj->hashList = pNewEntry; + for(int32_t i = pHashObj->capacity; i < newSize; ++i) { + pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry)); } - pObj->capacity = newSize; + pHashObj->capacity = newSize; - for (int32_t i = 0; i < pObj->capacity; ++i) { - SHashEntry *pEntry = pObj->hashList[i]; + for (int32_t i = 0; i < pHashObj->capacity; ++i) { + SHashEntry *pEntry = pHashObj->hashList[i]; pNode = pEntry->next; if (pNode != NULL) { @@ -173,7 +184,7 @@ static void taosHashTableResize(HashObj *pObj) { } while (pNode) { - int32_t j = HASH_INDEX(pNode->hashVal, pObj->capacity); + int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity); if (j == i) { // this key resides in the same slot, no need to relocate it pNode = pNode->next; } else { @@ -199,7 +210,7 @@ static void taosHashTableResize(HashObj *pObj) { pNode->next = NULL; pNode->prev1 = NULL; - SHashEntry *pNewIndexEntry = pObj->hashList[j]; + SHashEntry *pNewIndexEntry = pHashObj->hashList[j]; if (pNewIndexEntry->next != NULL) { assert(pNewIndexEntry->next->prev1 == pNewIndexEntry); @@ -221,8 +232,8 @@ static void taosHashTableResize(HashObj *pObj) { int64_t et = taosGetTimestampUs(); - pTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pObj->capacity, - ((double)pObj->size) / pObj->capacity, (et - st) / 1000.0); + pTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity, + ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); } /** @@ -230,43 +241,51 @@ static void taosHashTableResize(HashObj *pObj) { * @param fn hash function * @return */ -void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe) { +SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { if (capacity == 0 || fn == NULL) { return NULL; } - HashObj *pObj = (HashObj *)calloc(1, sizeof(HashObj)); - if (pObj == NULL) { + SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj)); + if (pHashObj == NULL) { pError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } // the max slots is not defined by user - pObj->capacity = taosHashCapacity(capacity); - assert((pObj->capacity & (pObj->capacity - 1)) == 0); + pHashObj->capacity = taosHashCapacity(capacity); + assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); - pObj->hashFp = fn; + pHashObj->hashFp = fn; - pObj->hashList = (SHashEntry **)calloc(pObj->capacity, sizeof(SHashEntry*)); - if (pObj->hashList == NULL) { - free(pObj); + pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(SHashEntry*)); + if (pHashObj->hashList == NULL) { + free(pHashObj); pError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - for(int32_t i = 0; i < pObj->capacity; ++i) { - pObj->hashList[i] = calloc(1, sizeof(SHashEntry)); + for(int32_t i = 0; i < pHashObj->capacity; ++i) { + pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry)); } - if (multithreadSafe && (__lock_init(pObj) != 0)) { - free(pObj->hashList); - free(pObj); + if (threadsafe) { +#if defined(LINUX) + pHashObj->lock = calloc(1, sizeof(pthread_rwlock_t)); +#else + pHashObj->lock = calloc(1, sizeof(pthread_mutex_t)); +#endif + } + + if (__lock_init(pHashObj->lock) != 0) { + free(pHashObj->hashList); + free(pHashObj); pError("failed to init lock, reason:%s", strerror(errno)); return NULL; } - return (void *)pObj; + return pHashObj; } /** @@ -277,7 +296,7 @@ void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe) * @param size size of block * @return SHashNode */ -static SHashNode *doCreateHashNode(const char *key, uint32_t keyLen, const char *pData, size_t dataSize, +static SHashNode *doCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t dataSize, uint32_t hashVal) { size_t totalSize = dataSize + sizeof(SHashNode) + keyLen; @@ -298,7 +317,7 @@ static SHashNode *doCreateHashNode(const char *key, uint32_t keyLen, const char return pNewNode; } -static SHashNode *doUpdateHashNode(SHashNode *pNode, const char *key, uint32_t keyLen, const char *pData, +static SHashNode *doUpdateHashNode(SHashNode *pNode, const char *key, size_t keyLen, const char *pData, size_t dataSize) { size_t size = dataSize + sizeof(SHashNode) + keyLen; @@ -320,14 +339,14 @@ static SHashNode *doUpdateHashNode(SHashNode *pNode, const char *key, uint32_t k /** * insert the hash node at the front of the linked list * - * @param pObj + * @param pHashObj * @param pNode */ -static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) { +static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) { assert(pNode != NULL); - int32_t index = HASH_INDEX(pNode->hashVal, pObj->capacity); - SHashEntry *pEntry = pObj->hashList[index]; + int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity); + SHashEntry *pEntry = pHashObj->hashList[index]; pNode->next = pEntry->next; @@ -339,74 +358,60 @@ static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) { pNode->prev1 = pEntry; pEntry->num++; - pObj->size++; + pHashObj->size++; } -int32_t taosNumElemsInHashTable(HashObj *pObj) { - if (pObj == NULL) { +size_t taosHashGetSize(const SHashObj *pHashObj) { + if (pHashObj == NULL) { return 0; } - return pObj->size; + return pHashObj->size; } /** * add data node into hash table - * @param pObj hash object + * @param pHashObj hash object * @param pNode hash node */ -int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void *data, uint32_t size) { - if (pObj->multithreadSafe) { - __wr_lock(&pObj->lock); - } +int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *data, size_t size) { + __wr_lock(pHashObj->lock); uint32_t hashVal = 0; - SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &hashVal); + SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &hashVal); if (pNode == NULL) { // no data in hash table with the specified key, add it into hash table - taosHashTableResize(pObj); + taosHashTableResize(pHashObj); SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); if (pNewNode == NULL) { - if (pObj->multithreadSafe) { - __unlock(&pObj->lock); - } + __unlock(pHashObj->lock); return -1; } - doAddToHashTable(pObj, pNewNode); + doAddToHashTable(pHashObj, pNewNode); } else { SHashNode *pNewNode = doUpdateHashNode(pNode, key, keyLen, data, size); if (pNewNode == NULL) { - if (pObj->multithreadSafe) { - __unlock(&pObj->lock); - } - + __unlock(pHashObj->lock); return -1; } - doUpdateHashTable(pObj, pNewNode); - } - - if (pObj->multithreadSafe) { - __unlock(&pObj->lock); + doUpdateHashTable(pHashObj, pNewNode); } + __unlock(pHashObj->lock); return 0; } -char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) { - if (pObj->multithreadSafe) { - __rd_lock(&pObj->lock); - } +void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen) { + __rd_lock(pHashObj->lock); uint32_t hashVal = 0; - SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &hashVal); + SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &hashVal); - if (pObj->multithreadSafe) { - __unlock(&pObj->lock); - } + __unlock(pHashObj->lock); if (pNode != NULL) { assert(pNode->hashVal == hashVal); @@ -419,29 +424,24 @@ char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) /** * remove node in hash list - * @param pObj + * @param pHashObj * @param pNode */ -void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) { - if (pObj->multithreadSafe) { - __wr_lock(&pObj->lock); - } +void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) { + __wr_lock(pHashObj->lock); uint32_t val = 0; - SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &val); + SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &val); if (pNode == NULL) { - if (pObj->multithreadSafe) { - __unlock(&pObj->lock); - } - + __unlock(pHashObj->lock); return; } SHashNode *pNext = pNode->next; if (pNode->prev != NULL) { - int32_t slot = HASH_INDEX(val, pObj->capacity); - if (pObj->hashList[slot]->next == pNode) { - pObj->hashList[slot]->next = pNext; + int32_t slot = HASH_INDEX(val, pHashObj->capacity); + if (pHashObj->hashList[slot]->next == pNode) { + pHashObj->hashList[slot]->next = pNext; } else { pNode->prev->next = pNext; } @@ -451,11 +451,12 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) { pNext->prev = pNode->prev; } - uint32_t index = HASH_INDEX(pNode->hashVal, pObj->capacity); - SHashEntry *pEntry = pObj->hashList[index]; + uint32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity); + + SHashEntry *pEntry = pHashObj->hashList[index]; pEntry->num--; - pObj->size--; + pHashObj->size--; pNode->next = NULL; pNode->prev = NULL; @@ -463,24 +464,21 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) { pTrace("key:%s %p remove from hash table", pNode->key, pNode); tfree(pNode); - if (pObj->multithreadSafe) { - __unlock(&pObj->lock); - } + __unlock(pHashObj->lock); } -void taosCleanUpHashTable(void *handle) { - HashObj *pObj = (HashObj *)handle; - if (pObj == NULL || pObj->capacity <= 0) return; +void taosHashCleanup(SHashObj *pHashObj) { + if (pHashObj == NULL || pHashObj->capacity <= 0) { + return; + } SHashNode *pNode, *pNext; - if (pObj->multithreadSafe) { - __wr_lock(&pObj->lock); - } + __wr_lock(pHashObj->lock); - if (pObj->hashList) { - for (int32_t i = 0; i < pObj->capacity; ++i) { - SHashEntry *pEntry = pObj->hashList[i]; + if (pHashObj->hashList) { + for (int32_t i = 0; i < pHashObj->capacity; ++i) { + SHashEntry *pEntry = pHashObj->hashList[i]; pNode = pEntry->next; while (pNode) { @@ -492,28 +490,26 @@ void taosCleanUpHashTable(void *handle) { tfree(pEntry); } - free(pObj->hashList); + free(pHashObj->hashList); } - if (pObj->multithreadSafe) { - __unlock(&pObj->lock); - __lock_destroy(&pObj->lock); - } + __unlock(pHashObj->lock); + __lock_destroy(pHashObj->lock); - memset(pObj, 0, sizeof(HashObj)); - free(pObj); + memset(pHashObj, 0, sizeof(SHashObj)); + free(pHashObj); } // for profile only -int32_t taosGetHashMaxOverflowLength(HashObj* pObj) { - if (pObj == NULL || pObj->size == 0) { +int32_t taosHashGetMaxOverflowLinkLength(const SHashObj* pHashObj) { + if (pHashObj == NULL || pHashObj->size == 0) { return 0; } int32_t num = 0; - for(int32_t i = 0; i < pObj->size; ++i) { - SHashEntry *pEntry = pObj->hashList[i]; + for(int32_t i = 0; i < pHashObj->size; ++i) { + SHashEntry *pEntry = pHashObj->hashList[i]; if (num < pEntry->num) { num = pEntry->num; } diff --git a/src/util/src/sskiplist.c b/src/util/src/sskiplist.c deleted file mode 100644 index cba38e9bed5242c28a47fb13e00907299d46c184..0000000000000000000000000000000000000000 --- a/src/util/src/sskiplist.c +++ /dev/null @@ -1,848 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#if 0 -#include "os.h" - -#include "tlog.h" -#include "taosdef.h" -#include "sskiplist.h" -#include "tutil.h" - -static FORCE_INLINE void recordNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { // record link count in each level - for (int32_t i = 0; i < nLevel; ++i) { - pSkipList->state.nLevelNodeCnt[i]++; - } -} - -static FORCE_INLINE void removeNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { - for (int32_t i = 0; i < nLevel; ++i) { - pSkipList->state.nLevelNodeCnt[i]--; - } -} - -static FORCE_INLINE int32_t getSkipListNodeRandomHeight(tSkipList *pSkipList) { - const uint32_t factor = 4; - - int32_t n = 1; - while ((rand() % factor) == 0 && n <= pSkipList->nMaxLevel) { - n++; - } - - return n; -} - -static FORCE_INLINE int32_t getSkipListNodeLevel(tSkipList *pSkipList) { - int32_t nLevel = getSkipListNodeRandomHeight(pSkipList); - if (pSkipList->nSize == 0) { - nLevel = 1; - pSkipList->nLevel = 1; - } else { - if (nLevel > pSkipList->nLevel && pSkipList->nLevel < pSkipList->nMaxLevel) { - nLevel = (++pSkipList->nLevel); - } - } - return nLevel; -} - -void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode); - -void SSkipListDoRecordPut(tSkipList *pSkipList) { - const int32_t MAX_RECORD_NUM = 1000; - - if (pSkipList->state.nInsertObjs == MAX_RECORD_NUM) { - pSkipList->state.nInsertObjs = 1; - pSkipList->state.nTotalStepsForInsert = 0; - pSkipList->state.nTotalElapsedTimeForInsert = 0; - } else { - pSkipList->state.nInsertObjs++; - } -} - -int32_t compareIntVal(const void *pLeft, const void *pRight) { - int64_t lhs = ((tSkipListKey *)pLeft)->i64Key; - int64_t rhs = ((tSkipListKey *)pRight)->i64Key; - - DEFAULT_COMP(lhs, rhs); -} - -int32_t scompareIntDoubleVal(const void *pLeft, const void *pRight) { - int64_t lhs = ((tSkipListKey *)pLeft)->i64Key; - double rhs = ((tSkipListKey *)pRight)->dKey; - if (fabs(lhs - rhs) < FLT_EPSILON) { - return 0; - } else { - return (lhs > rhs) ? 1 : -1; - } -} - -int32_t scompareDoubleIntVal(const void *pLeft, const void *pRight) { - double lhs = ((tSkipListKey *)pLeft)->dKey; - int64_t rhs = ((tSkipListKey *)pRight)->i64Key; - if (fabs(lhs - rhs) < FLT_EPSILON) { - return 0; - } else { - return (lhs > rhs) ? 1 : -1; - } -} - -int32_t scompareDoubleVal(const void *pLeft, const void *pRight) { - double ret = (((tSkipListKey *)pLeft)->dKey - ((tSkipListKey *)pRight)->dKey); - if (fabs(ret) < FLT_EPSILON) { - return 0; - } else { - return ret > 0 ? 1 : -1; - } -} - -int32_t scompareStrVal(const void *pLeft, const void *pRight) { - tSkipListKey *pL = (tSkipListKey *)pLeft; - tSkipListKey *pR = (tSkipListKey *)pRight; - - if (pL->nLen == 0 && pR->nLen == 0) { - return 0; - } - - //handle only one-side bound compare situation, there is only lower bound or only upper bound - if (pL->nLen == -1) { - return 1; // no lower bound, lower bound is minimum, always return -1; - } else if (pR->nLen == -1) { - return -1; // no upper bound, upper bound is maximum situation, always return 1; - } - - int32_t ret = strcmp(((tSkipListKey *)pLeft)->pz, ((tSkipListKey *)pRight)->pz); - - if (ret == 0) { - return 0; - } else { - return ret > 0 ? 1 : -1; - } -} - -int32_t scompareWStrVal(const void *pLeft, const void *pRight) { - tSkipListKey *pL = (tSkipListKey *)pLeft; - tSkipListKey *pR = (tSkipListKey *)pRight; - - if (pL->nLen == 0 && pR->nLen == 0) { - return 0; - } - - //handle only one-side bound compare situation, there is only lower bound or only upper bound - if (pL->nLen == -1) { - return 1; // no lower bound, lower bound is minimum, always return -1; - } else if (pR->nLen == -1) { - return -1; // no upper bound, upper bound is maximum situation, always return 1; - } - - int32_t ret = wcscmp(((tSkipListKey *)pLeft)->wpz, ((tSkipListKey *)pRight)->wpz); - - if (ret == 0) { - return 0; - } else { - return ret > 0 ? 1 : -1; - } -} - -static __compar_fn_t getKeyFilterComparator(tSkipList *pSkipList, int32_t filterDataType) { - __compar_fn_t comparator = NULL; - - switch (pSkipList->keyType) { - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_BOOL: { - if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { - comparator = compareIntVal; - } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { - comparator = scompareIntDoubleVal; - } - break; - } - case TSDB_DATA_TYPE_FLOAT: - case TSDB_DATA_TYPE_DOUBLE: { - if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { - comparator = scompareDoubleIntVal; - } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { - comparator = scompareDoubleVal; - } - break; - } - case TSDB_DATA_TYPE_BINARY: - comparator = scompareStrVal; - break; - case TSDB_DATA_TYPE_NCHAR: - comparator = scompareWStrVal; - break; - default: - comparator = compareIntVal; - break; - } - - return comparator; -} - -static __compar_fn_t getKeyComparator(int32_t keyType) { - __compar_fn_t comparator = NULL; - - switch (keyType) { - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_BOOL: - comparator = compareIntVal; - break; - - case TSDB_DATA_TYPE_FLOAT: - case TSDB_DATA_TYPE_DOUBLE: - comparator = scompareDoubleVal; - break; - - case TSDB_DATA_TYPE_BINARY: - comparator = scompareStrVal; - break; - - case TSDB_DATA_TYPE_NCHAR: - comparator = scompareWStrVal; - break; - - default: - comparator = compareIntVal; - break; - } - - return comparator; -} - -tSkipList* SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen) { - tSkipList *pSkipList = (tSkipList *)calloc(1, sizeof(tSkipList)); - if (pSkipList == NULL) { - return NULL; - } - - pSkipList->keyType = keyType; - - pSkipList->comparator = getKeyComparator(keyType); - pSkipList->pHead.pForward = (tSkipListNode **)calloc(1, POINTER_BYTES * MAX_SKIP_LIST_LEVEL); - - pSkipList->nMaxLevel = MAX_SKIP_LIST_LEVEL; - pSkipList->nLevel = 1; - - pSkipList->nMaxKeyLen = nMaxKeyLen; - pSkipList->nMaxLevel = nMaxLevel; - - if (pthread_rwlock_init(&pSkipList->lock, NULL) != 0) { - tfree(pSkipList->pHead.pForward); - tfree(pSkipList); - return NULL; - } - - srand(time(NULL)); - pSkipList->state.nTotalMemSize += sizeof(tSkipList); - return pSkipList; -} - -static void doRemove(tSkipList *pSkipList, tSkipListNode *pNode, tSkipListNode *forward[]) { - int32_t level = pNode->nLevel; - for (int32_t j = level - 1; j >= 0; --j) { - if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) { - forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j]; - } - - if (forward[j]->pForward[j] != NULL) { - forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j]; - } - } - - pSkipList->state.nTotalMemSize -= (sizeof(tSkipListNode) + POINTER_BYTES * pNode->nLevel * 2); - removeNodeEachLevel(pSkipList, pNode->nLevel); - - tfree(pNode); - --pSkipList->nSize; -} - -static size_t getOneNodeSize(const tSkipListKey *pKey, int32_t nLevel) { - size_t size = sizeof(tSkipListNode) + sizeof(intptr_t) * (nLevel << 1); - if (pKey->nType == TSDB_DATA_TYPE_BINARY) { - size += pKey->nLen + 1; - } else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) { - size += (pKey->nLen + 1) * TSDB_NCHAR_SIZE; - } - - return size; -} - -static tSkipListNode *SSkipListCreateNode(void *pData, const tSkipListKey *pKey, int32_t nLevel) { - size_t nodeSize = getOneNodeSize(pKey, nLevel); - tSkipListNode *pNode = (tSkipListNode *)calloc(1, nodeSize); - - pNode->pForward = (tSkipListNode **)(&pNode[1]); - pNode->pBackward = (pNode->pForward + nLevel); - - pNode->pData = pData; - - pNode->key = *pKey; - if (pKey->nType == TSDB_DATA_TYPE_BINARY) { - pNode->key.pz = (char *)(pNode->pBackward + nLevel); - - strcpy(pNode->key.pz, pKey->pz); - pNode->key.pz[pKey->nLen] = 0; - } else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) { - pNode->key.wpz = (wchar_t *)(pNode->pBackward + nLevel); - wcsncpy(pNode->key.wpz, pKey->wpz, pKey->nLen); - pNode->key.wpz[pKey->nLen] = 0; - } - - pNode->nLevel = nLevel; - return pNode; -} - -tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength) { - tSkipListKey k = {0}; - tVariantCreateFromBinary(&k, val, (uint32_t) keyLength, (uint32_t) type); - return k; -} - -void tSkipListDestroyKey(tSkipListKey *pKey) { tVariantDestroy(pKey); } - -void* SSkipListDestroy(tSkipList *pSkipList) { - if (pSkipList == NULL) { - return NULL; - } - - pthread_rwlock_wrlock(&pSkipList->lock); - tSkipListNode *pNode = pSkipList->pHead.pForward[0]; - while (pNode) { - tSkipListNode *pTemp = pNode; - pNode = pNode->pForward[0]; - tfree(pTemp); - } - - tfree(pSkipList->pHead.pForward); - pthread_rwlock_unlock(&pSkipList->lock); - - pthread_rwlock_destroy(&pSkipList->lock); - tfree(pSkipList); - - return NULL; -} - -tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey) { - if (pSkipList == NULL) { - return NULL; - } - - pthread_rwlock_wrlock(&pSkipList->lock); - - // record one node is put into skiplist - SSkipListDoRecordPut(pSkipList); - - tSkipListNode *px = &pSkipList->pHead; - - tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) { - while (px->pForward[i] != NULL && (pSkipList->comparator(&px->pForward[i]->key, pKey) < 0)) { - px = px->pForward[i]; - } - - pSkipList->state.nTotalStepsForInsert++; - forward[i] = px; - } - - // if the skiplist does not allowed identical key inserted, the new data will be discarded. - if ((insertIdenticalKey == 0) && forward[0] != &pSkipList->pHead && - (pSkipList->comparator(&forward[0]->key, pKey) == 0)) { - pthread_rwlock_unlock(&pSkipList->lock); - return forward[0]; - } - - int32_t nLevel = getSkipListNodeLevel(pSkipList); - recordNodeEachLevel(pSkipList, nLevel); - - tSkipListNode *pNode = SSkipListCreateNode(pData, pKey, nLevel); - tSkipListDoInsert(pSkipList, forward, nLevel, pNode); - - pSkipList->nSize += 1; - - // char tmpstr[512] = {0}; - // tVariantToString(&pNode->key, tmpstr); - // pTrace("skiplist:%p, node added, key:%s, total list len:%d", pSkipList, - // tmpstr, pSkipList->nSize); - - pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, nLevel); - pthread_rwlock_unlock(&pSkipList->lock); - - return pNode; -} - -void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode) { - for (int32_t i = 0; i < nLevel; ++i) { - tSkipListNode *x = forward[i]; - if (x != NULL) { - pNode->pBackward[i] = x; - if (x->pForward[i]) x->pForward[i]->pBackward[i] = pNode; - - pNode->pForward[i] = x->pForward[i]; - x->pForward[i] = pNode; - } else { - pSkipList->pHead.pForward[i] = pNode; - pNode->pBackward[i] = &(pSkipList->pHead); - } - } -} - -tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey) { - int32_t sLevel = pSkipList->nLevel - 1; - int32_t ret = -1; - - tSkipListNode *x = &pSkipList->pHead; - - pthread_rwlock_rdlock(&pSkipList->lock); - pSkipList->state.queryCount++; - - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - for (int32_t i = sLevel; i >= 0; --i) { - while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) { - x = x->pForward[i]; - } - - if (ret == 0) { - pthread_rwlock_unlock(&pSkipList->lock); - return x->pForward[i]; - } - } - - pthread_rwlock_unlock(&pSkipList->lock); - return NULL; -} - -static int32_t tSkipListEndParQuery(tSkipList *pSkipList, tSkipListNode *pStartNode, tSkipListKey *pEndKey, - int32_t cond, tSkipListNode ***pRes) { - pthread_rwlock_rdlock(&pSkipList->lock); - tSkipListNode *p = pStartNode; - int32_t numOfRes = 0; - - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pEndKey->nType); - while (p != NULL) { - int32_t ret = filterComparator(&p->key, pEndKey); - if (ret > 0) { - break; - } - - if (ret < 0) { - numOfRes++; - p = p->pForward[0]; - } else if (ret == 0) { - if (cond == TSDB_RELATION_LESS_EQUAL) { - numOfRes++; - p = p->pForward[0]; - } else { - break; - } - } - } - - (*pRes) = (tSkipListNode **)malloc(POINTER_BYTES * numOfRes); - for (int32_t i = 0; i < numOfRes; ++i) { - (*pRes)[i] = pStartNode; - pStartNode = pStartNode->pForward[0]; - } - pthread_rwlock_unlock(&pSkipList->lock); - - return numOfRes; -} - -/* - * maybe return the copy of tSkipListNode would be better - */ -int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes) { - (*pRes) = NULL; - - tSkipListNode *pNode = tSkipListGetOne(pSkipList, pKey); - if (pNode == NULL) { - return 0; - } - - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - // backward check if previous nodes are with the same value. - tSkipListNode *pPrev = pNode->pBackward[0]; - while ((pPrev != &pSkipList->pHead) && filterComparator(&pPrev->key, pKey) == 0) { - pPrev = pPrev->pBackward[0]; - } - - return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes); -} - -static tSkipListNode *tSkipListParQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t cond) { - int32_t sLevel = pSkipList->nLevel - 1; - int32_t ret = -1; - - tSkipListNode *x = &pSkipList->pHead; - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - pthread_rwlock_rdlock(&pSkipList->lock); - - if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) { - for (int32_t i = sLevel; i >= 0; --i) { - while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) { - x = x->pForward[i]; - } - } - - // backward check if previous nodes are with the same value. - if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) { - tSkipListNode *pNode = x->pForward[0]; - while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparator(&pNode->pBackward[0]->key, pKey) == 0)) { - pNode = pNode->pBackward[0]; - } - pthread_rwlock_unlock(&pSkipList->lock); - return pNode; - } - - if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) { - pthread_rwlock_unlock(&pSkipList->lock); - return x->pForward[0]; - } else { // cond == TSDB_RELATION_LARGE && ret == 0 - tSkipListNode *pn = x->pForward[0]; - while (pn != NULL && filterComparator(&pn->key, pKey) == 0) { - pn = pn->pForward[0]; - } - pthread_rwlock_unlock(&pSkipList->lock); - return pn; - } - } - - pthread_rwlock_unlock(&pSkipList->lock); - return NULL; -} - -int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *), - void *param) { - (*pRes) = (tSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize); - if (NULL == *pRes) { - pError("error skiplist %p, malloc failed", pSkipList); - return -1; - } - - pthread_rwlock_rdlock(&pSkipList->lock); - tSkipListNode *pStartNode = pSkipList->pHead.pForward[0]; - int32_t num = 0; - - for (int32_t i = 0; i < pSkipList->nSize; ++i) { - if (pStartNode == NULL) { - pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1); -#ifdef _DEBUG_VIEW - SSkipListPrint(pSkipList, 1); -#endif - break; - } - - if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) { - (*pRes)[num++] = pStartNode; - } - - pStartNode = pStartNode->pForward[0]; - } - - pthread_rwlock_unlock(&pSkipList->lock); - - if (num == 0) { - free(*pRes); - *pRes = NULL; - } else if (num < pSkipList->nSize) { // free unused memory - char* tmp = realloc((*pRes), num * POINTER_BYTES); - assert(tmp != NULL); - - *pRes = (tSkipListNode**)tmp; - } - - return num; -} - -int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator* iter) { - if (pSkipList == NULL) { - return -1; - } - - iter->pSkipList = pSkipList; - - pthread_rwlock_rdlock(&pSkipList->lock); - iter->cur = NULL;//pSkipList->pHead.pForward[0]; - iter->num = pSkipList->nSize; - pthread_rwlock_unlock(&pSkipList->lock); - - return 0; -} - -bool tSkipListIteratorNext(SSkipListIterator* iter) { - if (iter->num == 0 || iter->pSkipList == NULL) { - return false; - } - - tSkipList* pSkipList = iter->pSkipList; - - pthread_rwlock_rdlock(&pSkipList->lock); - if (iter->cur == NULL) { - iter->cur = pSkipList->pHead.pForward[0]; - } else { - iter->cur = iter->cur->pForward[0]; - } - - pthread_rwlock_unlock(&pSkipList->lock); - - return iter->cur != NULL; -} - -tSkipListNode* tSkipListIteratorGet(SSkipListIterator* iter) { - return iter->cur; -} - -int32_t tSkipListRangeQuery(tSkipList *pSkipList, tSKipListQueryCond *pCond, tSkipListNode ***pRes) { - pSkipList->state.queryCount++; - tSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr); - if (pStart == 0) { - *pRes = NULL; - return 0; - } - - return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes); -} - -static bool removeSupport(tSkipList *pSkipList, tSkipListNode **forward, tSkipListKey *pKey) { - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - if (filterComparator(&forward[0]->pForward[0]->key, pKey) == 0) { - tSkipListNode *p = forward[0]->pForward[0]; - doRemove(pSkipList, p, forward); - } else { // failed to find the node of specified value,abort - return false; - } - - // compress the minimum level of skip list - while (pSkipList->nLevel > 0 && pSkipList->pHead.pForward[pSkipList->nLevel - 1] == NULL) { - pSkipList->nLevel -= 1; - } - - return true; -} - -void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode) { - tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - - pthread_rwlock_rdlock(&pSkipList->lock); - for (int32_t i = 0; i < pNode->nLevel; ++i) { - forward[i] = pNode->pBackward[i]; - } - - removeSupport(pSkipList, forward, &pNode->key); - pthread_rwlock_unlock(&pSkipList->lock); -} - -bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey) { - tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - pthread_rwlock_rdlock(&pSkipList->lock); - - tSkipListNode *x = &pSkipList->pHead; - for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) { - while (x->pForward[i] != NULL && (filterComparator(&x->pForward[i]->key, pKey) < 0)) { - x = x->pForward[i]; - } - forward[i] = x; - } - - bool ret = removeSupport(pSkipList, forward, pKey); - pthread_rwlock_unlock(&pSkipList->lock); - - return ret; -} - -void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel) { - if (pSkipList == NULL || pSkipList->nLevel < nlevel || nlevel <= 0) { - return; - } - - tSkipListNode *p = pSkipList->pHead.pForward[nlevel - 1]; - int32_t id = 1; - while (p) { - switch (pSkipList->keyType) { - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_BIGINT: - fprintf(stdout, "%d: %" PRId64 " \n", id++, p->key.i64Key); - break; - case TSDB_DATA_TYPE_BINARY: - fprintf(stdout, "%d: %s \n", id++, p->key.pz); - break; - case TSDB_DATA_TYPE_DOUBLE: - fprintf(stdout, "%d: %lf \n", id++, p->key.dKey); - break; - default: - fprintf(stdout, "\n"); - } - p = p->pForward[nlevel - 1]; - } -} - -/* - * query processor based on query condition - */ -int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult) { - // query condition check - int32_t rel = 0; - __compar_fn_t comparator = getKeyComparator(pQueryCond->lowerBnd.nType); - - if (pSkipList == NULL || pQueryCond == NULL || pSkipList->nSize == 0 || - (((rel = comparator(&pQueryCond->lowerBnd, &pQueryCond->upperBnd)) > 0 && - pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_NCHAR && pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_BINARY))) { - (*pResult) = NULL; - return 0; - } - - if (rel == 0) { - /* - * 0 means: pQueryCond->lowerBnd == pQueryCond->upperBnd - * point query - */ - if (pQueryCond->lowerBndRelOptr == TSDB_RELATION_LARGE_EQUAL && - pQueryCond->upperBndRelOptr == TSDB_RELATION_LESS_EQUAL) { // point query - return tSkipListGets(pSkipList, &pQueryCond->lowerBnd, pResult); - } else { - (*pResult) = NULL; - return 0; - } - } else { - /* range query, query operation code check */ - return tSkipListRangeQuery(pSkipList, pQueryCond, pResult); - } -} - -typedef struct MultipleQueryResult { - int32_t len; - tSkipListNode **pData; -} MultipleQueryResult; - -static int32_t mergeQueryResult(MultipleQueryResult *pResults, int32_t numOfResSet, tSkipListNode ***pRes) { - int32_t total = 0; - for (int32_t i = 0; i < numOfResSet; ++i) { - total += pResults[i].len; - } - - (*pRes) = malloc(POINTER_BYTES * total); - int32_t idx = 0; - - for (int32_t i = 0; i < numOfResSet; ++i) { - MultipleQueryResult *pOneResult = &pResults[i]; - for (int32_t j = 0; j < pOneResult->len; ++j) { - (*pRes)[idx++] = pOneResult->pData[j]; - } - } - - return total; -} - -static void removeDuplicateKey(tSkipListKey *pKey, int32_t *numOfKey, __compar_fn_t comparator) { - if (*numOfKey == 1) { - return; - } - - qsort(pKey, *numOfKey, sizeof(pKey[0]), comparator); - int32_t i = 0, j = 1; - - while (i < (*numOfKey) && j < (*numOfKey)) { - int32_t ret = comparator(&pKey[i], &pKey[j]); - if (ret == 0) { - j++; - } else { - pKey[i + 1] = pKey[j]; - i++; - j++; - } - } - - (*numOfKey) = i + 1; -} - -int32_t mergeResult(const tSkipListKey *pKey, int32_t numOfKey, tSkipListNode ***pRes, __compar_fn_t comparator, - tSkipListNode *pNode) { - int32_t i = 0, j = 0; - // merge two sorted arrays in O(n) time - while (i < numOfKey && pNode != NULL) { - int32_t ret = comparator(&pNode->key, &pKey[i]); - if (ret < 0) { - (*pRes)[j++] = pNode; - pNode = pNode->pForward[0]; - } else if (ret == 0) { - pNode = pNode->pForward[0]; - } else { // pNode->key > pkey[i] - i++; - } - } - - while (pNode != NULL) { - (*pRes)[j++] = pNode; - pNode = pNode->pForward[0]; - } - return j; -} - -int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type, - tSkipListNode ***pRes) { - if (numOfKey == 0 || pKey == NULL || pSkipList == NULL || pSkipList->nSize == 0 || - (type != INCLUDE_POINT_QUERY && type != EXCLUDE_POINT_QUERY)) { - (*pRes) = NULL; - return 0; - } - - __compar_fn_t comparator = getKeyComparator(pKey->nType); - removeDuplicateKey(pKey, &numOfKey, comparator); - - if (type == INCLUDE_POINT_QUERY) { - if (numOfKey == 1) { - return tSkipListGets(pSkipList, &pKey[0], pRes); - } else { - MultipleQueryResult *pTempResult = (MultipleQueryResult *)malloc(sizeof(MultipleQueryResult) * numOfKey); - for (int32_t i = 0; i < numOfKey; ++i) { - pTempResult[i].len = tSkipListGets(pSkipList, &pKey[i], &pTempResult[i].pData); - } - int32_t num = mergeQueryResult(pTempResult, numOfKey, pRes); - - for (int32_t i = 0; i < numOfKey; ++i) { - free(pTempResult[i].pData); - } - free(pTempResult); - return num; - } - } else { // exclude query - *pRes = malloc(POINTER_BYTES * pSkipList->nSize); - - __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); - - tSkipListNode *pNode = pSkipList->pHead.pForward[0]; - int32_t retLen = mergeResult(pKey, numOfKey, pRes, filterComparator, pNode); - - if (retLen < pSkipList->nSize) { - (*pRes) = realloc(*pRes, POINTER_BYTES * retLen); - } - return retLen; - } -} - -#endif \ No newline at end of file diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index 4ed2da0567be79ac019e2dfeea4a045601a1c430..fb2dac827e13d4526b6b7a6c358a34dd575eb941 100755 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -98,7 +98,7 @@ void* taosArrayGetP(SArray* pArray, size_t index) { return *(void**)ret; } -size_t taosArrayGetSize(SArray* pArray) { return pArray->size; } +size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; } void* taosArrayInsert(SArray* pArray, size_t index, void* pData) { if (pArray == NULL || pData == NULL) { diff --git a/src/util/src/thashutil.c b/src/util/src/thashutil.c index cf16efe2f8e539f9611952111bafc5d4ff214d3e..f4b4e9faa262c8f5968cb546138feacce4895c38 100644 --- a/src/util/src/thashutil.c +++ b/src/util/src/thashutil.c @@ -7,8 +7,8 @@ * MurmurHash algorithm * */ +#include "hashfunc.h" #include "tutil.h" -#include "hashutil.h" #define ROTL32(x, r) ((x) << (r) | (x) >> (32 - (r))) diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index c4f0ccab03359193516029a896dd7f0d0f7ab6c3..9b8c86658d4de65c2e620ff8a249044f3fcaf6cd 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -524,6 +524,71 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) { return sa; } +size_t tSkipListGetSize(const SSkipList* pSkipList) { + if (pSkipList == NULL) { + return 0; + } + + return pSkipList->size; +} + +SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList) { + if (pSkipList == NULL) { + return NULL; + } + + SSkipListIterator* iter = calloc(1, sizeof(SSkipListIterator)); + + iter->pSkipList = pSkipList; + if (pSkipList->lock) { + pthread_rwlock_rdlock(pSkipList->lock); + } + + iter->cur = NULL; + iter->num = pSkipList->size; + + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + + return iter; +} + +bool tSkipListIterNext(SSkipListIterator *iter) { + if (iter->num == 0 || iter->pSkipList == NULL) { + return false; + } + + SSkipList *pSkipList = iter->pSkipList; + + if (pSkipList->lock) { + pthread_rwlock_rdlock(pSkipList->lock); + } + + if (iter->cur == NULL) { + iter->cur = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); + } else { + iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); + } + + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); + } + + return iter->cur != NULL; +} + +SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) { return (iter == NULL)? NULL:iter->cur; } + +void* tSkipListDestroyIter(SSkipListIterator* iter) { + if (iter == NULL) { + return NULL; + } + + tfree(iter); + return NULL; +} + // static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey, // int32_t cond, SSkipListNode ***pRes) { // pthread_rwlock_rdlock(&pSkipList->lock); diff --git a/src/vnode/detail/inc/vnodeQueryImpl.h b/src/vnode/detail/inc/vnodeQueryImpl.h index e3507d5f82e8c156ffdc5a3babae8ea5af079398..9c28af22c7818b3eee53360760c3a60b14b90b13 100644 --- a/src/vnode/detail/inc/vnodeQueryImpl.h +++ b/src/vnode/detail/inc/vnodeQueryImpl.h @@ -23,7 +23,7 @@ extern "C" { #include "os.h" #include "hash.h" -#include "hashutil.h" +#include "hashfunc.h" #define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query)) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0) @@ -119,7 +119,7 @@ typedef enum { typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order); static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) { - return *(SMeterObj**)taosGetDataFromHashTable(hashHandle, (const char*)&sid, sizeof(sid)); + return *(SMeterObj**)taosHashGet(hashHandle, (const char*)&sid, sizeof(sid)); } bool isQueryKilled(SQuery* pQuery); diff --git a/src/vnode/detail/src/vnodeQueryImpl.c b/src/vnode/detail/src/vnodeQueryImpl.c index f761205719c8020b871448942461b2df2dc265d9..2cdcc9130010c12d10553a6089f7ae441d7e94e1 100644 --- a/src/vnode/detail/src/vnodeQueryImpl.c +++ b/src/vnode/detail/src/vnodeQueryImpl.c @@ -14,7 +14,7 @@ */ #include "hash.h" -#include "hashutil.h" +#include "hashfunc.h" #include "os.h" #include "taosmsg.h" #include "textbuffer.h" @@ -1460,7 +1460,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin int16_t bytes) { SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t *p1 = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, pData, bytes); + int32_t *p1 = (int32_t *)taosHashGet(pWindowResInfo->hashList, pData, bytes); if (p1 != NULL) { pWindowResInfo->curIndex = *p1; } else { // more than the capacity, reallocate the resources @@ -1485,7 +1485,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin // add a new result set for a new group pWindowResInfo->curIndex = pWindowResInfo->size++; - taosAddToHashTable(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); + taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); } return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex); @@ -2018,7 +2018,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun pWindowResInfo->type = type; _hash_fn_t fn = taosGetDefaultHashFunction(type); - pWindowResInfo->hashList = taosInitHashTable(threshold, fn, false); + pWindowResInfo->hashList = taosHashInit(threshold, fn, false); pWindowResInfo->curIndex = -1; pWindowResInfo->size = 0; @@ -2044,7 +2044,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRu destroyTimeWindowRes(pResult, pRuntimeEnv->pQuery->numOfOutputCols); } - taosCleanUpHashTable(pWindowResInfo->hashList); + taosHashCleanup(pWindowResInfo->hashList); tfree(pWindowResInfo->pResult); } @@ -2059,11 +2059,11 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR } pWindowResInfo->curIndex = -1; - taosCleanUpHashTable(pWindowResInfo->hashList); + taosHashCleanup(pWindowResInfo->hashList); pWindowResInfo->size = 0; _hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type); - pWindowResInfo->hashList = taosInitHashTable(pWindowResInfo->capacity, fn, false); + pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, false); pWindowResInfo->startTime = 0; pWindowResInfo->prevSKey = 0; @@ -2081,7 +2081,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { for (int32_t i = 0; i < num; ++i) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; if (pResult->status.closed) { // remove the window slot from hash table - taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); + taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); } else { break; } @@ -2104,14 +2104,14 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { for (int32_t k = 0; k < pWindowResInfo->size; ++k) { SWindowResult *pResult = &pWindowResInfo->pResult[k]; - int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, + int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); int32_t v = (*p - num); assert(v >= 0 && v <= pWindowResInfo->size); // todo add the update function for hash table - taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); - taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v, + taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); + taosHashPut(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v, sizeof(int32_t)); } @@ -4812,7 +4812,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { tfree(pSupporter->pMeterSidExtInfo); if (pSupporter->pMetersHashTable != NULL) { - taosCleanUpHashTable(pSupporter->pMetersHashTable); + taosHashCleanup(pSupporter->pMetersHashTable); pSupporter->pMetersHashTable = NULL; } diff --git a/src/vnode/detail/src/vnodeRead.c b/src/vnode/detail/src/vnodeRead.c index fb7c85e61c61272159202a2bda4ef7f30fdd93d0..f21294a68e0928b55ebd7fcff28f1eb818af171e 100644 --- a/src/vnode/detail/src/vnodeRead.c +++ b/src/vnode/detail/src/vnodeRead.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "hash.h" +#include "hashfunc.h" #include "ihash.h" #include "taosmsg.h" #include "tast.h" @@ -25,8 +27,6 @@ #include "vnode.h" #include "vnodeRead.h" #include "vnodeUtil.h" -#include "hash.h" -#include "hashutil.h" int (*pQueryFunc[])(SMeterObj *, SQuery *) = {vnodeQueryFromCache, vnodeQueryFromFile}; @@ -651,8 +651,8 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE STableQuerySupportObj *pSupporter = (STableQuerySupportObj *)calloc(1, sizeof(STableQuerySupportObj)); pSupporter->numOfMeters = 1; - pSupporter->pMetersHashTable = taosInitHashTable(pSupporter->numOfMeters, taosIntHash_32, false); - taosAddToHashTable(pSupporter->pMetersHashTable, (const char*) &pMetersObj[0]->sid, sizeof(pMeterObj[0].sid), + pSupporter->pMetersHashTable = taosHashInit(pSupporter->numOfMeters, taosIntHash_32, false); + taosHashPut(pSupporter->pMetersHashTable, (const char*) &pMetersObj[0]->sid, sizeof(pMeterObj[0].sid), (char *)&pMetersObj[0], POINTER_BYTES); pSupporter->pSidSet = NULL; @@ -742,9 +742,9 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE STableQuerySupportObj *pSupporter = (STableQuerySupportObj *)calloc(1, sizeof(STableQuerySupportObj)); pSupporter->numOfMeters = pQueryMsg->numOfSids; - pSupporter->pMetersHashTable = taosInitHashTable(pSupporter->numOfMeters, taosIntHash_32, false); + pSupporter->pMetersHashTable = taosHashInit(pSupporter->numOfMeters, taosIntHash_32, false); for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { - taosAddToHashTable(pSupporter->pMetersHashTable, (const char*) &pMetersObj[i]->sid, sizeof(pMetersObj[i]->sid), (char *)&pMetersObj[i], + taosHashPut(pSupporter->pMetersHashTable, (const char*) &pMetersObj[i]->sid, sizeof(pMetersObj[i]->sid), (char *)&pMetersObj[i], POINTER_BYTES); } diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 6c9cc2404aafb57d13d1e103c5940de600ce7efb..573921192b790fa083684277f85da3559838a30d 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -32,7 +32,7 @@ STsdbMeta *tsdbCreateMeta(int32_t maxTables) { return NULL; } - pMeta->tableMap = taosInitHashTable(maxTables + maxTables / 10, taosGetDefaultHashFunction, false); + pMeta->tableMap = taosHashInit(maxTables + maxTables / 10, taosGetDefaultHashFunction, false); if (pMeta->tableMap == NULL) { free(pMeta->tables); free(pMeta); @@ -60,7 +60,7 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) { tsdbFreeTable(pTemp); } - taosCleanUpHashTable(pMeta->tableMap); + taosHashCleanup(pMeta->tableMap); free(pMeta); @@ -205,7 +205,7 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg) { } STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) { - void *ptr = taosGetDataFromHashTable(pMeta->tableMap, (char *)(&uid), sizeof(uid)); + void *ptr = taosHashGet(pMeta->tableMap, (char *)(&uid), sizeof(uid)); if (ptr == NULL) return NULL; @@ -244,7 +244,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) { // TODO: add the table to the map int64_t uid = pTable->tableId.uid; - if (taosAddToHashTable(pMeta->tableMap, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) { + if (taosHashPut(pMeta->tableMap, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) { return -1; } return 0; diff --git a/tests/examples/c/CMakeLists.txt b/tests/examples/c/CMakeLists.txt index 287fca7d410b88d240642a57ec194b3d0c686975..81d912fc00d9d543cbee9835ef6c791b1b67542f 100644 --- a/tests/examples/c/CMakeLists.txt +++ b/tests/examples/c/CMakeLists.txt @@ -1,13 +1,13 @@ PROJECT(TDengine) IF (TD_WINDOWS_64) - INCLUDE_DIRECTORIES(${TD_ROOT_DIR}/deps/pthread) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread) ENDIF () -INCLUDE_DIRECTORIES(. ${TD_ROOT_DIR}/src/inc ${TD_ROOT_DIR}/src/client/inc ${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(. ${TD_COMMUNITY_DIR}/src/inc ${TD_COMMUNITY_DIR}/src/client/inc ${TD_COMMUNITY_DIR}/inc) AUX_SOURCE_DIRECTORY(. SRC) -#ADD_EXECUTABLE(demo ${SRC}) -#TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread ) +ADD_EXECUTABLE(demo demo.c) +TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )