Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
34df442c
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
34df442c
编写于
3月 12, 2020
作者:
S
slguan
提交者:
GitHub
3月 12, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1351 from taosdata/liaohj_2
Liaohj 2
上级
c1d33817
d1b298bc
变更
23
隐藏空白更改
内联
并排
Showing
23 changed file
with
342 addition
and
1270 deletion
+342
-1270
src/client/src/tcache.c
src/client/src/tcache.c
+1
-1
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+2
-2
src/client/src/tscSql.c
src/client/src/tscSql.c
+5
-3
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+6
-6
src/query/src/tresultBuf.c
src/query/src/tresultBuf.c
+5
-5
src/query/src/tscAst.c
src/query/src/tscAst.c
+0
-1
src/query/src/ttokenizer.c
src/query/src/ttokenizer.c
+5
-5
src/query/src/tvariant.c
src/query/src/tvariant.c
+4
-5
src/util/inc/hash.h
src/util/inc/hash.h
+61
-16
src/util/inc/hashfunc.h
src/util/inc/hashfunc.h
+0
-0
src/util/inc/sskiplist.h
src/util/inc/sskiplist.h
+0
-207
src/util/inc/tarray.h
src/util/inc/tarray.h
+1
-1
src/util/inc/tskiplist.h
src/util/inc/tskiplist.h
+30
-9
src/util/src/hash.c
src/util/src/hash.c
+127
-131
src/util/src/sskiplist.c
src/util/src/sskiplist.c
+0
-848
src/util/src/tarray.c
src/util/src/tarray.c
+1
-1
src/util/src/thashutil.c
src/util/src/thashutil.c
+1
-1
src/util/src/tskiplist.c
src/util/src/tskiplist.c
+65
-0
src/vnode/detail/inc/vnodeQueryImpl.h
src/vnode/detail/inc/vnodeQueryImpl.h
+2
-2
src/vnode/detail/src/vnodeQueryImpl.c
src/vnode/detail/src/vnodeQueryImpl.c
+12
-12
src/vnode/detail/src/vnodeRead.c
src/vnode/detail/src/vnodeRead.c
+6
-6
src/vnode/tsdb/src/tsdbMeta.c
src/vnode/tsdb/src/tsdbMeta.c
+4
-4
tests/examples/c/CMakeLists.txt
tests/examples/c/CMakeLists.txt
+4
-4
未找到文件。
src/client/src/tcache.c
浏览文件 @
34df442c
...
...
@@ -15,12 +15,12 @@
#include "os.h"
#include "hashfunc.h"
#include "tcache.h"
#include "tlog.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "hashutil.h"
#define HASH_MAX_CAPACITY (1024*1024*16)
#define HASH_VALUE_IN_TRASH (-1)
...
...
src/client/src/tscParseInsert.c
浏览文件 @
34df442c
...
...
@@ -1002,7 +1002,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
||
((
NULL
!=
pSql
->
asyncTblPos
)
&&
(
NULL
!=
pSql
->
pTableHashList
)));
if
((
NULL
==
pSql
->
asyncTblPos
)
&&
(
NULL
==
pSql
->
pTableHashList
))
{
pSql
->
pTableHashList
=
taos
InitHashTable
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
pSql
->
pTableHashList
=
taos
HashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
pSql
->
cmd
.
pDataBlocks
=
tscCreateBlockArrayList
();
if
(
NULL
==
pSql
->
pTableHashList
||
NULL
==
pSql
->
cmd
.
pDataBlocks
)
{
...
...
@@ -1260,7 +1260,7 @@ _error_clean:
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
_clean:
taos
CleanUpHashTable
(
pSql
->
pTableHashList
);
taos
HashCleanup
(
pSql
->
pTableHashList
);
pSql
->
pTableHashList
=
NULL
;
pSql
->
asyncTblPos
=
NULL
;
...
...
src/client/src/tscSql.c
浏览文件 @
34df442c
...
...
@@ -206,7 +206,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
pSql
->
asyncTblPos
=
NULL
;
if
(
NULL
!=
pSql
->
pTableHashList
)
{
taos
CleanUpHashTable
(
pSql
->
pTableHashList
);
taos
HashCleanup
(
pSql
->
pTableHashList
);
pSql
->
pTableHashList
=
NULL
;
}
...
...
@@ -705,8 +705,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
// current data are exhausted, fetch more data
if
(
pRes
->
data
==
NULL
||
(
pRes
->
data
!=
NULL
&&
pRes
->
row
>=
pRes
->
numOfRows
&&
pCmd
->
command
==
TSDB_SQL_RETRIEVE
))
{
if
(
pRes
->
data
==
NULL
||
(
pRes
->
data
!=
NULL
&&
pRes
->
row
>=
pRes
->
numOfRows
&&
(
pCmd
->
command
==
TSDB_SQL_RETRIEVE
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE_METRIC
||
pCmd
->
command
==
TSDB_SQL_FETCH
)))
{
taos_fetch_rows_a
(
res
,
asyncFetchCallback
,
pSql
->
pTscObj
);
sem_wait
(
&
pSql
->
rspSem
);
}
...
...
@@ -1079,7 +1081,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pSql
->
asyncTblPos
=
NULL
;
if
(
NULL
!=
pSql
->
pTableHashList
)
{
taos
CleanUpHashTable
(
pSql
->
pTableHashList
);
taos
HashCleanup
(
pSql
->
pTableHashList
);
pSql
->
pTableHashList
=
NULL
;
}
...
...
src/client/src/tscUtil.c
浏览文件 @
34df442c
...
...
@@ -677,7 +677,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
STableDataBlocks
**
dataBlocks
)
{
*
dataBlocks
=
NULL
;
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taos
GetDataFromHashTable
(
pHashList
,
(
const
char
*
)
&
id
,
sizeof
(
id
));
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taos
HashGet
(
pHashList
,
(
const
char
*
)
&
id
,
sizeof
(
id
));
if
(
t1
!=
NULL
)
{
*
dataBlocks
=
*
t1
;
}
...
...
@@ -688,7 +688,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
return
ret
;
}
taos
AddToHashTable
(
pHashList
,
(
const
char
*
)
&
id
,
sizeof
(
int64_t
),
(
char
*
)
dataBlocks
,
POINTER_BYTES
);
taos
HashPut
(
pHashList
,
(
const
char
*
)
&
id
,
sizeof
(
int64_t
),
(
char
*
)
dataBlocks
,
POINTER_BYTES
);
tscAppendDataBlock
(
pDataBlockList
,
*
dataBlocks
);
}
...
...
@@ -698,7 +698,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
SDataBlockList
*
pTableDataBlockList
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
void
*
pVnodeDataBlockHashList
=
taos
InitHashTable
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
void
*
pVnodeDataBlockHashList
=
taos
HashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
SDataBlockList
*
pVnodeDataBlockList
=
tscCreateBlockArrayList
();
for
(
int32_t
i
=
0
;
i
<
pTableDataBlockList
->
nSize
;
++
i
)
{
...
...
@@ -710,7 +710,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
tsInsertHeadSize
,
0
,
pOneTableBlock
->
tableId
,
pOneTableBlock
->
pMeterMeta
,
&
dataBuf
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p failed to prepare the data block buffer for merging table data, code:%d"
,
pSql
,
ret
);
taos
CleanUpHashTable
(
pVnodeDataBlockHashList
);
taos
HashCleanup
(
pVnodeDataBlockHashList
);
tscDestroyBlockArrayList
(
pVnodeDataBlockList
);
return
ret
;
}
...
...
@@ -728,7 +728,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
}
else
{
// failed to allocate memory, free already allocated memory and return error code
tscError
(
"%p failed to allocate memory for merging submit block, size:%d"
,
pSql
,
dataBuf
->
nAllocSize
);
taos
CleanUpHashTable
(
pVnodeDataBlockHashList
);
taos
HashCleanup
(
pVnodeDataBlockHashList
);
tfree
(
dataBuf
->
pData
);
tscDestroyBlockArrayList
(
pVnodeDataBlockList
);
...
...
@@ -761,7 +761,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
pCmd
->
pDataBlocks
=
pVnodeDataBlockList
;
tscFreeUnusedDataBlocks
(
pCmd
->
pDataBlocks
);
taos
CleanUpHashTable
(
pVnodeDataBlockHashList
);
taos
HashCleanup
(
pVnodeDataBlockHashList
);
return
TSDB_CODE_SUCCESS
;
}
...
...
src/query/src/tresultBuf.c
浏览文件 @
34df442c
...
...
@@ -16,7 +16,7 @@ int32_t createDiskbasedResultBuffer(SQueryDiskbasedResultBuf** pResultBuf, int32
pResBuf
->
incStep
=
4
;
// init id hash table
pResBuf
->
idsTable
=
taos
InitHashTable
(
size
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
);
pResBuf
->
idsTable
=
taos
HashInit
(
size
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
);
pResBuf
->
list
=
calloc
(
size
,
sizeof
(
SIDList
));
pResBuf
->
numOfAllocGroupIds
=
size
;
...
...
@@ -56,7 +56,7 @@ tFilePage* getResultBufferPageById(SQueryDiskbasedResultBuf* pResultBuf, int32_t
return
(
tFilePage
*
)(
pResultBuf
->
pBuf
+
DEFAULT_INTERN_BUF_SIZE
*
id
);
}
int32_t
getNumOfResultBufGroupId
(
SQueryDiskbasedResultBuf
*
pResultBuf
)
{
return
taos
NumElemsInHashTabl
e
(
pResultBuf
->
idsTable
);
}
int32_t
getNumOfResultBufGroupId
(
SQueryDiskbasedResultBuf
*
pResultBuf
)
{
return
taos
HashGetSiz
e
(
pResultBuf
->
idsTable
);
}
int32_t
getResBufSize
(
SQueryDiskbasedResultBuf
*
pResultBuf
)
{
return
pResultBuf
->
totalBufSize
;
}
...
...
@@ -95,7 +95,7 @@ static bool noMoreAvailablePages(SQueryDiskbasedResultBuf* pResultBuf) {
static
int32_t
getGroupIndex
(
SQueryDiskbasedResultBuf
*
pResultBuf
,
int32_t
groupId
)
{
assert
(
pResultBuf
!=
NULL
);
char
*
p
=
taos
GetDataFromHashTable
(
pResultBuf
->
idsTable
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
));
char
*
p
=
taos
HashGet
(
pResultBuf
->
idsTable
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
));
if
(
p
==
NULL
)
{
// it is a new group id
return
-
1
;
}
...
...
@@ -121,7 +121,7 @@ static int32_t addNewGroupId(SQueryDiskbasedResultBuf* pResultBuf, int32_t group
pResultBuf
->
numOfAllocGroupIds
=
n
;
}
taos
AddToHashTable
(
pResultBuf
->
idsTable
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
),
&
num
,
sizeof
(
int32_t
));
taos
HashPut
(
pResultBuf
->
idsTable
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
),
&
num
,
sizeof
(
int32_t
));
return
num
;
}
...
...
@@ -210,7 +210,7 @@ void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf) {
}
tfree
(
pResultBuf
->
list
);
taos
CleanUpHashTable
(
pResultBuf
->
idsTable
);
taos
HashCleanup
(
pResultBuf
->
idsTable
);
tfree
(
pResultBuf
);
}
...
...
src/query/src/tscAst.c
浏览文件 @
34df442c
...
...
@@ -14,7 +14,6 @@
*/
#include "os.h"
#include "sskiplist.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tast.h"
...
...
src/query/src/ttokenizer.c
浏览文件 @
34df442c
...
...
@@ -14,12 +14,12 @@
*/
#include "hash.h"
#include "hash
util
.h"
#include "hash
func
.h"
#include "os.h"
#include "shash.h"
#include "taosdef.h"
#include "tstoken.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tutil.h"
// All the keywords of the SQL language are stored in a hash table
...
...
@@ -253,11 +253,11 @@ static void* KeywordHashTable = NULL;
static
void
doInitKeywordsTable
()
{
int
numOfEntries
=
tListLen
(
keywordTable
);
KeywordHashTable
=
taos
InitHashTable
(
numOfEntries
,
MurmurHash3_32
,
false
);
KeywordHashTable
=
taos
HashInit
(
numOfEntries
,
MurmurHash3_32
,
false
);
for
(
int32_t
i
=
0
;
i
<
numOfEntries
;
i
++
)
{
keywordTable
[
i
].
len
=
strlen
(
keywordTable
[
i
].
name
);
void
*
ptr
=
&
keywordTable
[
i
];
taos
AddToHashTable
(
KeywordHashTable
,
keywordTable
[
i
].
name
,
keywordTable
[
i
].
len
,
(
void
*
)
&
ptr
,
POINTER_BYTES
);
taos
HashPut
(
KeywordHashTable
,
keywordTable
[
i
].
name
,
keywordTable
[
i
].
len
,
(
void
*
)
&
ptr
,
POINTER_BYTES
);
}
}
...
...
@@ -275,7 +275,7 @@ int tSQLKeywordCode(const char* z, int n) {
}
}
SKeyword
**
pKey
=
(
SKeyword
**
)
taos
GetDataFromHashTable
(
KeywordHashTable
,
key
,
n
);
SKeyword
**
pKey
=
(
SKeyword
**
)
taos
HashGet
(
KeywordHashTable
,
key
,
n
);
if
(
pKey
!=
NULL
)
{
return
(
*
pKey
)
->
type
;
}
else
{
...
...
src/query/src/tvariant.c
浏览文件 @
34df442c
...
...
@@ -13,17 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tvariant.h"
#include "hash.h"
#include "hash
util
.h"
#include "hash
func
.h"
#include "os.h"
#include "shash.h"
#include "taos.h"
#include "taosdef.h"
#include "tstoken.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tutil.h"
#include "tvariant.h"
#include "taosdef.h"
#include "taos.h"
// todo support scientific expression number and oct number
void
tVariantCreate
(
tVariant
*
pVar
,
SSQLToken
*
token
)
{
tVariantCreateFromString
(
pVar
,
token
->
z
,
token
->
n
,
token
->
type
);
}
...
...
src/util/inc/hash.h
浏览文件 @
34df442c
...
...
@@ -20,7 +20,7 @@
extern
"C"
{
#endif
#include "hash
util
.h"
#include "hash
func
.h"
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_VALUE_IN_TRASH (-1)
...
...
@@ -45,32 +45,77 @@ typedef struct SHashEntry {
uint32_t
num
;
}
SHashEntry
;
typedef
struct
HashObj
{
typedef
struct
S
HashObj
{
SHashEntry
**
hashList
;
uint32_t
capacity
;
// number of slots
int
size
;
// number of elements in hash table
size_t
capacity
;
// number of slots
size_t
size
;
// number of elements in hash table
_hash_fn_t
hashFp
;
// hash function
bool
multithreadSafe
;
// enable lock or not
#if defined
LINUX
pthread_rwlock_t
lock
;
#if defined
(LINUX)
pthread_rwlock_t
*
lock
;
#else
pthread_mutex_t
lock
;
pthread_mutex_t
*
lock
;
#endif
}
SHashObj
;
}
HashObj
;
/**
* init the hash table
*
* @param capacity initial capacity of the hash table
* @param fn hash function to generate the hash value
* @param threadsafe thread safe or not
* @return
*/
SHashObj
*
taosHashInit
(
size_t
capacity
,
_hash_fn_t
fn
,
bool
threadsafe
);
void
*
taosInitHashTable
(
uint32_t
capacity
,
_hash_fn_t
fn
,
bool
multithreadSafe
);
void
taosDeleteFromHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
);
/**
* return the size of hash table
* @param pHashObj
* @return
*/
size_t
taosHashGetSize
(
const
SHashObj
*
pHashObj
);
/**
* put element into hash table, if the element with the same key exists, update it
* @param pHashObj
* @param key
* @param keyLen
* @param data
* @param size
* @return
*/
int32_t
taosHashPut
(
SHashObj
*
pHashObj
,
const
char
*
key
,
size_t
keyLen
,
void
*
data
,
size_t
size
);
int32_t
taosAddToHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
,
void
*
data
,
uint32_t
size
);
int32_t
taosNumElemsInHashTable
(
HashObj
*
pObj
);
/**
* return the payload data with the specified key
*
* @param pHashObj
* @param key
* @param keyLen
* @return
*/
void
*
taosHashGet
(
SHashObj
*
pHashObj
,
const
char
*
key
,
size_t
keyLen
);
char
*
taosGetDataFromHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
);
/**
* remove item with the specified key
* @param pHashObj
* @param key
* @param keyLen
*/
void
taosHashRemove
(
SHashObj
*
pHashObj
,
const
char
*
key
,
size_t
keyLen
);
void
taosCleanUpHashTable
(
void
*
handle
);
/**
* clean up hash table
* @param handle
*/
void
taosHashCleanup
(
SHashObj
*
pHashObj
);
int32_t
taosGetHashMaxOverflowLength
(
HashObj
*
pObj
);
/**
*
* @param pHashObj
* @return
*/
int32_t
taosHashGetMaxOverflowLinkLength
(
const
SHashObj
*
pHashObj
);
#ifdef __cplusplus
}
...
...
src/util/inc/hash
util
.h
→
src/util/inc/hash
func
.h
浏览文件 @
34df442c
文件已移动
src/util/inc/sskiplist.h
已删除
100644 → 0
浏览文件 @
c1d33817
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#ifndef TBASE_TSKIPLIST_H
#define TBASE_TSKIPLIST_H
#ifdef __cplusplus
extern "C" {
#endif
#define MAX_SKIP_LIST_LEVEL 20
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include "os.h"
#include "taosdef.h"
/*
* key of each node
* todo move to as the global structure in all search codes...
*/
const static size_t SKIP_LIST_STR_KEY_LENGTH_THRESHOLD = 15;
typedef tVariant tSkipListKey;
typedef enum tSkipListPointQueryType {
INCLUDE_POINT_QUERY,
EXCLUDE_POINT_QUERY,
} tSkipListPointQueryType;
typedef struct tSkipListNode {
uint16_t nLevel;
char * pData;
struct tSkipListNode **pForward;
struct tSkipListNode **pBackward;
tSkipListKey key;
} tSkipListNode;
/*
* @version 0.2
* @date 2017/11/12
* the simple version of SkipList.
* for multi-thread safe purpose, we employ pthread_rwlock_t to guarantee to generate
* deterministic result. Later, we will remove the lock in SkipList to further
* enhance the performance. In this case, one should use the concurrent skip list (by
* using michael-scott algorithm) instead of this simple version in a multi-thread
* environment, to achieve higher performance of read/write operations.
*
* Note: Duplicated primary key situation.
* In case of duplicated primary key, two ways can be employed to handle this situation:
* 1. add as normal insertion with out special process.
* 2. add an overflow pointer at each list node, all nodes with the same key will be added
* in the overflow pointer. In this case, the total steps of each search will be reduced significantly.
* Currently, we implement the skip list in a line with the first means, maybe refactor it soon.
*
* Memory consumption: the memory alignment causes many memory wasted. So, employ a memory
* pool will significantly reduce the total memory consumption, as well as the calloc/malloc operation costs.
*
* 3. use the iterator pattern to refactor all routines to make it more clean
*/
// state struct, record following information:
// number of links in each level.
// avg search steps, for latest 1000 queries
// avg search rsp time, for latest 1000 queries
// total memory size
typedef struct tSkipListState {
// in bytes, sizeof(tSkipList)+sizeof(tSkipListNode)*tSkipList->nSize
uint64_t nTotalMemSize;
uint64_t nLevelNodeCnt[MAX_SKIP_LIST_LEVEL];
uint64_t queryCount; // total query count
/*
* only record latest 1000 queries
* when the value==1000, = 0,
* nTotalStepsForQueries = 0,
* nTotalElapsedTimeForQueries = 0
*/
uint64_t nRecQueries;
uint16_t nTotalStepsForQueries;
uint64_t nTotalElapsedTimeForQueries;
uint16_t nInsertObjs;
uint16_t nTotalStepsForInsert;
uint64_t nTotalElapsedTimeForInsert;
} tSkipListState;
typedef struct tSkipList {
tSkipListNode pHead;
uint64_t nSize;
uint16_t nMaxLevel;
uint16_t nLevel;
uint16_t keyType;
uint16_t nMaxKeyLen;
__compar_fn_t comparator;
pthread_rwlock_t lock; // will be removed soon
tSkipListState state; // skiplist state
} tSkipList;
/*
* iterate the skiplist
* this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may
* continue iterating the skiplist, so add the reference count for skiplist
* TODO add the ref for skiplist when one iterator is created
*/
typedef struct SSkipListIterator {
tSkipList * pSkipList;
tSkipListNode *cur;
int64_t num;
} SSkipListIterator;
/*
* query condition structure to denote the range query
* todo merge the point query cond with range query condition
*/
typedef struct tSKipListQueryCond {
// when the upper bounding == lower bounding, it is a point query
tSkipListKey lowerBnd;
tSkipListKey upperBnd;
int32_t lowerBndRelOptr; // relation operator to denote if lower bound is
int32_t upperBndRelOptr; // included or not
} tSKipListQueryCond;
tSkipList *SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen);
void *SSkipListDestroy(tSkipList *pSkipList);
// create skip list key
tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength);
// destroy skip list key
void tSkipListDestroyKey(tSkipListKey *pKey);
// put data into skiplist
tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey);
/*
* get only *one* node of which key is equalled to pKey, even there are more
* than one nodes are of the same key
*/
tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey);
/*
* get all data with the same keys
*/
int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes);
int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *),
void *param);
/*
* remove only one node of the pKey value.
* If more than one node has the same value, any one will be removed
*
* @Return
* true: one node has been removed
* false: no node has been removed
*/
bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey);
/*
* remove the specified node in parameters
*/
void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode);
// for debug purpose only
void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel);
/*
* range query & single point query function
*/
int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult);
/*
* include/exclude point query
*/
int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type,
tSkipListNode ***pResult);
int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator *iter);
bool tSkipListIteratorNext(SSkipListIterator *iter);
tSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter);
#ifdef __cplusplus
}
#endif
#endif // TBASE_TSKIPLIST_H
#endif
\ No newline at end of file
src/util/inc/tarray.h
浏览文件 @
34df442c
...
...
@@ -76,7 +76,7 @@ void* taosArrayGetP(SArray* pArray, size_t index);
* @param pArray
* @return
*/
size_t
taosArrayGetSize
(
SArray
*
pArray
);
size_t
taosArrayGetSize
(
const
SArray
*
pArray
);
/**
* insert data into array
...
...
src/util/inc/tskiplist.h
浏览文件 @
34df442c
...
...
@@ -185,15 +185,39 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode);
SArray
*
tSkipListGet
(
SSkipList
*
pSkipList
,
SSkipListKey
pKey
,
int16_t
keyType
);
/**
*
*
get the size of skip list
* @param pSkipList
* @param pRes
* @param fp
* @param param
* @return
*/
int32_t
tSkipListIterateList
(
SSkipList
*
pSkipList
,
SSkipListNode
***
pRes
,
bool
(
*
fp
)(
SSkipListNode
*
,
void
*
),
void
*
param
);
size_t
tSkipListGetSize
(
const
SSkipList
*
pSkipList
);
/**
* create skiplist iterator
* @param pSkipList
* @return
*/
SSkipListIterator
*
tSkipListCreateIter
(
SSkipList
*
pSkipList
);
/**
* forward the skip list iterator
* @param iter
* @return
*/
bool
tSkipListIterNext
(
SSkipListIterator
*
iter
);
/**
* get the element of skip list node
* @param iter
* @return
*/
SSkipListNode
*
tSkipListIterGet
(
SSkipListIterator
*
iter
);
/**
* destroy the skip list node
* @param iter
* @return
*/
void
*
tSkipListDestroyIter
(
SSkipListIterator
*
iter
);
/*
* remove only one node of the pKey value.
...
...
@@ -210,9 +234,6 @@ bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey);
*/
void
tSkipListRemoveNode
(
SSkipList
*
pSkipList
,
SSkipListNode
*
pNode
);
int32_t
tSkipListIteratorReset
(
SSkipList
*
pSkipList
,
SSkipListIterator
*
iter
);
bool
tSkipListIteratorNext
(
SSkipListIterator
*
iter
);
SSkipListNode
*
tSkipListIteratorGet
(
SSkipListIterator
*
iter
);
#ifdef __cplusplus
}
...
...
src/util/src/hash.c
浏览文件 @
34df442c
...
...
@@ -21,7 +21,11 @@
#include "tutil.h"
static
FORCE_INLINE
void
__wr_lock
(
void
*
lock
)
{
#if defined LINUX
if
(
lock
==
NULL
)
{
return
;
}
#if defined (LINUX)
pthread_rwlock_wrlock
(
lock
);
#else
pthread_mutex_lock
(
lock
);
...
...
@@ -29,7 +33,11 @@ static FORCE_INLINE void __wr_lock(void *lock) {
}
static
FORCE_INLINE
void
__rd_lock
(
void
*
lock
)
{
#if defined LINUX
if
(
lock
==
NULL
)
{
return
;
}
#if defined (LINUX)
pthread_rwlock_rdlock
(
lock
);
#else
pthread_mutex_lock
(
lock
);
...
...
@@ -37,7 +45,11 @@ static FORCE_INLINE void __rd_lock(void *lock) {
}
static
FORCE_INLINE
void
__unlock
(
void
*
lock
)
{
#if defined LINUX
if
(
lock
==
NULL
)
{
return
;
}
#if defined (LINUX)
pthread_rwlock_unlock
(
lock
);
#else
pthread_mutex_unlock
(
lock
);
...
...
@@ -45,7 +57,11 @@ static FORCE_INLINE void __unlock(void *lock) {
}
static
FORCE_INLINE
int32_t
__lock_init
(
void
*
lock
)
{
#if defined LINUX
if
(
lock
==
NULL
)
{
return
0
;
}
#if defined (LINUX)
return
pthread_rwlock_init
(
lock
,
NULL
);
#else
return
pthread_mutex_init
(
lock
,
NULL
);
...
...
@@ -53,7 +69,11 @@ static FORCE_INLINE int32_t __lock_init(void *lock) {
}
static
FORCE_INLINE
void
__lock_destroy
(
void
*
lock
)
{
#if defined LINUX
if
(
lock
==
NULL
)
{
return
;
}
#if defined (LINUX)
pthread_rwlock_destroy
(
lock
);
#else
pthread_mutex_destroy
(
lock
);
...
...
@@ -68,21 +88,12 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
return
i
;
}
/**
* hash key function
*
* @param key key string
* @param len length of key
* @return hash value
*/
static
FORCE_INLINE
uint32_t
taosHashKey
(
const
char
*
key
,
uint32_t
len
)
{
return
MurmurHash3_32
(
key
,
len
);
}
/**
* inplace update node in hash table
* @param pObj hash table object
* @param p
Hash
Obj hash table object
* @param pNode data node
*/
static
void
doUpdateHashTable
(
HashObj
*
p
Obj
,
SHashNode
*
pNode
)
{
static
void
doUpdateHashTable
(
SHashObj
*
pHash
Obj
,
SHashNode
*
pNode
)
{
if
(
pNode
->
prev1
)
{
pNode
->
prev1
->
next
=
pNode
;
}
...
...
@@ -96,16 +107,16 @@ static void doUpdateHashTable(HashObj *pObj, SHashNode *pNode) {
/**
* get SHashNode from hashlist, nodes from trash are not included.
* @param pObj Cache objection
* @param p
Hash
Obj Cache objection
* @param key key for hash
* @param keyLen key length
* @return
*/
static
SHashNode
*
doGetNodeFromHashTable
(
HashObj
*
p
Obj
,
const
char
*
key
,
uint32_t
keyLen
,
uint32_t
*
hashVal
)
{
uint32_t
hash
=
(
*
pObj
->
hashFp
)(
key
,
keyLen
);
static
SHashNode
*
doGetNodeFromHashTable
(
SHashObj
*
pHash
Obj
,
const
char
*
key
,
uint32_t
keyLen
,
uint32_t
*
hashVal
)
{
uint32_t
hash
=
(
*
p
Hash
Obj
->
hashFp
)(
key
,
keyLen
);
int32_t
slot
=
HASH_INDEX
(
hash
,
pObj
->
capacity
);
SHashEntry
*
pEntry
=
pObj
->
hashList
[
slot
];
int32_t
slot
=
HASH_INDEX
(
hash
,
p
Hash
Obj
->
capacity
);
SHashEntry
*
pEntry
=
p
Hash
Obj
->
hashList
[
slot
];
SHashNode
*
pNode
=
pEntry
->
next
;
while
(
pNode
)
{
...
...
@@ -117,7 +128,7 @@ static SHashNode *doGetNodeFromHashTable(HashObj *pObj, const char *key, uint32_
}
if
(
pNode
)
{
assert
(
HASH_INDEX
(
pNode
->
hashVal
,
pObj
->
capacity
)
==
slot
);
assert
(
HASH_INDEX
(
pNode
->
hashVal
,
p
Hash
Obj
->
capacity
)
==
slot
);
}
// return the calculated hash value, to avoid calculating it again in other functions
...
...
@@ -131,10 +142,10 @@ static SHashNode *doGetNodeFromHashTable(HashObj *pObj, const char *key, uint32_
/**
* resize the hash list if the threshold is reached
*
* @param pObj
* @param p
Hash
Obj
*/
static
void
taosHashTableResize
(
HashObj
*
p
Obj
)
{
if
(
p
Obj
->
size
<
p
Obj
->
capacity
*
HASH_DEFAULT_LOAD_FACTOR
)
{
static
void
taosHashTableResize
(
SHashObj
*
pHash
Obj
)
{
if
(
p
HashObj
->
size
<
pHash
Obj
->
capacity
*
HASH_DEFAULT_LOAD_FACTOR
)
{
return
;
}
...
...
@@ -142,30 +153,30 @@ static void taosHashTableResize(HashObj *pObj) {
SHashNode
*
pNode
=
NULL
;
SHashNode
*
pNext
=
NULL
;
int32_t
newSize
=
pObj
->
capacity
<<
1U
;
int32_t
newSize
=
p
Hash
Obj
->
capacity
<<
1U
;
if
(
newSize
>
HASH_MAX_CAPACITY
)
{
pTrace
(
"current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached"
,
pObj
->
capacity
,
pTrace
(
"current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached"
,
p
Hash
Obj
->
capacity
,
HASH_MAX_CAPACITY
);
return
;
}
int64_t
st
=
taosGetTimestampUs
();
SHashEntry
**
pNewEntry
=
realloc
(
pObj
->
hashList
,
sizeof
(
SHashEntry
*
)
*
newSize
);
SHashEntry
**
pNewEntry
=
realloc
(
p
Hash
Obj
->
hashList
,
sizeof
(
SHashEntry
*
)
*
newSize
);
if
(
pNewEntry
==
NULL
)
{
pTrace
(
"cache resize failed due to out of memory, capacity remain:%d"
,
pObj
->
capacity
);
pTrace
(
"cache resize failed due to out of memory, capacity remain:%d"
,
p
Hash
Obj
->
capacity
);
return
;
}
pObj
->
hashList
=
pNewEntry
;
for
(
int32_t
i
=
pObj
->
capacity
;
i
<
newSize
;
++
i
)
{
pObj
->
hashList
[
i
]
=
calloc
(
1
,
sizeof
(
SHashEntry
));
p
Hash
Obj
->
hashList
=
pNewEntry
;
for
(
int32_t
i
=
p
Hash
Obj
->
capacity
;
i
<
newSize
;
++
i
)
{
p
Hash
Obj
->
hashList
[
i
]
=
calloc
(
1
,
sizeof
(
SHashEntry
));
}
pObj
->
capacity
=
newSize
;
p
Hash
Obj
->
capacity
=
newSize
;
for
(
int32_t
i
=
0
;
i
<
pObj
->
capacity
;
++
i
)
{
SHashEntry
*
pEntry
=
pObj
->
hashList
[
i
];
for
(
int32_t
i
=
0
;
i
<
p
Hash
Obj
->
capacity
;
++
i
)
{
SHashEntry
*
pEntry
=
p
Hash
Obj
->
hashList
[
i
];
pNode
=
pEntry
->
next
;
if
(
pNode
!=
NULL
)
{
...
...
@@ -173,7 +184,7 @@ static void taosHashTableResize(HashObj *pObj) {
}
while
(
pNode
)
{
int32_t
j
=
HASH_INDEX
(
pNode
->
hashVal
,
pObj
->
capacity
);
int32_t
j
=
HASH_INDEX
(
pNode
->
hashVal
,
p
Hash
Obj
->
capacity
);
if
(
j
==
i
)
{
// this key resides in the same slot, no need to relocate it
pNode
=
pNode
->
next
;
}
else
{
...
...
@@ -199,7 +210,7 @@ static void taosHashTableResize(HashObj *pObj) {
pNode
->
next
=
NULL
;
pNode
->
prev1
=
NULL
;
SHashEntry
*
pNewIndexEntry
=
pObj
->
hashList
[
j
];
SHashEntry
*
pNewIndexEntry
=
p
Hash
Obj
->
hashList
[
j
];
if
(
pNewIndexEntry
->
next
!=
NULL
)
{
assert
(
pNewIndexEntry
->
next
->
prev1
==
pNewIndexEntry
);
...
...
@@ -221,8 +232,8 @@ static void taosHashTableResize(HashObj *pObj) {
int64_t
et
=
taosGetTimestampUs
();
pTrace
(
"hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms"
,
pObj
->
capacity
,
((
double
)
p
Obj
->
size
)
/
p
Obj
->
capacity
,
(
et
-
st
)
/
1000
.
0
);
pTrace
(
"hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms"
,
p
Hash
Obj
->
capacity
,
((
double
)
p
HashObj
->
size
)
/
pHash
Obj
->
capacity
,
(
et
-
st
)
/
1000
.
0
);
}
/**
...
...
@@ -230,43 +241,51 @@ static void taosHashTableResize(HashObj *pObj) {
* @param fn hash function
* @return
*/
void
*
taosInitHashTable
(
uint32_t
capacity
,
_hash_fn_t
fn
,
bool
multithreadS
afe
)
{
SHashObj
*
taosHashInit
(
size_t
capacity
,
_hash_fn_t
fn
,
bool
threads
afe
)
{
if
(
capacity
==
0
||
fn
==
NULL
)
{
return
NULL
;
}
HashObj
*
pObj
=
(
HashObj
*
)
calloc
(
1
,
sizeof
(
HashObj
));
if
(
pObj
==
NULL
)
{
SHashObj
*
pHashObj
=
(
SHashObj
*
)
calloc
(
1
,
sizeof
(
S
HashObj
));
if
(
p
Hash
Obj
==
NULL
)
{
pError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
// the max slots is not defined by user
pObj
->
capacity
=
taosHashCapacity
(
capacity
);
assert
((
p
Obj
->
capacity
&
(
p
Obj
->
capacity
-
1
))
==
0
);
p
Hash
Obj
->
capacity
=
taosHashCapacity
(
capacity
);
assert
((
p
HashObj
->
capacity
&
(
pHash
Obj
->
capacity
-
1
))
==
0
);
pObj
->
hashFp
=
fn
;
p
Hash
Obj
->
hashFp
=
fn
;
p
Obj
->
hashList
=
(
SHashEntry
**
)
calloc
(
p
Obj
->
capacity
,
sizeof
(
SHashEntry
*
));
if
(
pObj
->
hashList
==
NULL
)
{
free
(
pObj
);
p
HashObj
->
hashList
=
(
SHashEntry
**
)
calloc
(
pHash
Obj
->
capacity
,
sizeof
(
SHashEntry
*
));
if
(
p
Hash
Obj
->
hashList
==
NULL
)
{
free
(
p
Hash
Obj
);
pError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
pObj
->
capacity
;
++
i
)
{
pObj
->
hashList
[
i
]
=
calloc
(
1
,
sizeof
(
SHashEntry
));
for
(
int32_t
i
=
0
;
i
<
p
Hash
Obj
->
capacity
;
++
i
)
{
p
Hash
Obj
->
hashList
[
i
]
=
calloc
(
1
,
sizeof
(
SHashEntry
));
}
if
(
multithreadSafe
&&
(
__lock_init
(
pObj
)
!=
0
))
{
free
(
pObj
->
hashList
);
free
(
pObj
);
if
(
threadsafe
)
{
#if defined(LINUX)
pHashObj
->
lock
=
calloc
(
1
,
sizeof
(
pthread_rwlock_t
));
#else
pHashObj
->
lock
=
calloc
(
1
,
sizeof
(
pthread_mutex_t
));
#endif
}
if
(
__lock_init
(
pHashObj
->
lock
)
!=
0
)
{
free
(
pHashObj
->
hashList
);
free
(
pHashObj
);
pError
(
"failed to init lock, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
return
(
void
*
)
p
Obj
;
return
pHash
Obj
;
}
/**
...
...
@@ -277,7 +296,7 @@ void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe)
* @param size size of block
* @return SHashNode
*/
static
SHashNode
*
doCreateHashNode
(
const
char
*
key
,
uint32
_t
keyLen
,
const
char
*
pData
,
size_t
dataSize
,
static
SHashNode
*
doCreateHashNode
(
const
char
*
key
,
size
_t
keyLen
,
const
char
*
pData
,
size_t
dataSize
,
uint32_t
hashVal
)
{
size_t
totalSize
=
dataSize
+
sizeof
(
SHashNode
)
+
keyLen
;
...
...
@@ -298,7 +317,7 @@ static SHashNode *doCreateHashNode(const char *key, uint32_t keyLen, const char
return
pNewNode
;
}
static
SHashNode
*
doUpdateHashNode
(
SHashNode
*
pNode
,
const
char
*
key
,
uint32
_t
keyLen
,
const
char
*
pData
,
static
SHashNode
*
doUpdateHashNode
(
SHashNode
*
pNode
,
const
char
*
key
,
size
_t
keyLen
,
const
char
*
pData
,
size_t
dataSize
)
{
size_t
size
=
dataSize
+
sizeof
(
SHashNode
)
+
keyLen
;
...
...
@@ -320,14 +339,14 @@ static SHashNode *doUpdateHashNode(SHashNode *pNode, const char *key, uint32_t k
/**
* insert the hash node at the front of the linked list
*
* @param pObj
* @param p
Hash
Obj
* @param pNode
*/
static
void
doAddToHashTable
(
HashObj
*
p
Obj
,
SHashNode
*
pNode
)
{
static
void
doAddToHashTable
(
SHashObj
*
pHash
Obj
,
SHashNode
*
pNode
)
{
assert
(
pNode
!=
NULL
);
int32_t
index
=
HASH_INDEX
(
pNode
->
hashVal
,
pObj
->
capacity
);
SHashEntry
*
pEntry
=
pObj
->
hashList
[
index
];
int32_t
index
=
HASH_INDEX
(
pNode
->
hashVal
,
p
Hash
Obj
->
capacity
);
SHashEntry
*
pEntry
=
p
Hash
Obj
->
hashList
[
index
];
pNode
->
next
=
pEntry
->
next
;
...
...
@@ -339,74 +358,60 @@ static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) {
pNode
->
prev1
=
pEntry
;
pEntry
->
num
++
;
pObj
->
size
++
;
p
Hash
Obj
->
size
++
;
}
int32_t
taosNumElemsInHashTable
(
HashObj
*
p
Obj
)
{
if
(
pObj
==
NULL
)
{
size_t
taosHashGetSize
(
const
SHashObj
*
pHash
Obj
)
{
if
(
p
Hash
Obj
==
NULL
)
{
return
0
;
}
return
pObj
->
size
;
return
p
Hash
Obj
->
size
;
}
/**
* add data node into hash table
* @param pObj hash object
* @param p
Hash
Obj hash object
* @param pNode hash node
*/
int32_t
taosAddToHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
,
void
*
data
,
uint32_t
size
)
{
if
(
pObj
->
multithreadSafe
)
{
__wr_lock
(
&
pObj
->
lock
);
}
int32_t
taosHashPut
(
SHashObj
*
pHashObj
,
const
char
*
key
,
size_t
keyLen
,
void
*
data
,
size_t
size
)
{
__wr_lock
(
pHashObj
->
lock
);
uint32_t
hashVal
=
0
;
SHashNode
*
pNode
=
doGetNodeFromHashTable
(
pObj
,
key
,
keyLen
,
&
hashVal
);
SHashNode
*
pNode
=
doGetNodeFromHashTable
(
p
Hash
Obj
,
key
,
keyLen
,
&
hashVal
);
if
(
pNode
==
NULL
)
{
// no data in hash table with the specified key, add it into hash table
taosHashTableResize
(
pObj
);
taosHashTableResize
(
p
Hash
Obj
);
SHashNode
*
pNewNode
=
doCreateHashNode
(
key
,
keyLen
,
data
,
size
,
hashVal
);
if
(
pNewNode
==
NULL
)
{
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
__unlock
(
pHashObj
->
lock
);
return
-
1
;
}
doAddToHashTable
(
pObj
,
pNewNode
);
doAddToHashTable
(
p
Hash
Obj
,
pNewNode
);
}
else
{
SHashNode
*
pNewNode
=
doUpdateHashNode
(
pNode
,
key
,
keyLen
,
data
,
size
);
if
(
pNewNode
==
NULL
)
{
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
__unlock
(
pHashObj
->
lock
);
return
-
1
;
}
doUpdateHashTable
(
pObj
,
pNewNode
);
}
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
doUpdateHashTable
(
pHashObj
,
pNewNode
);
}
__unlock
(
pHashObj
->
lock
);
return
0
;
}
char
*
taosGetDataFromHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
)
{
if
(
pObj
->
multithreadSafe
)
{
__rd_lock
(
&
pObj
->
lock
);
}
void
*
taosHashGet
(
SHashObj
*
pHashObj
,
const
char
*
key
,
size_t
keyLen
)
{
__rd_lock
(
pHashObj
->
lock
);
uint32_t
hashVal
=
0
;
SHashNode
*
pNode
=
doGetNodeFromHashTable
(
pObj
,
key
,
keyLen
,
&
hashVal
);
SHashNode
*
pNode
=
doGetNodeFromHashTable
(
p
Hash
Obj
,
key
,
keyLen
,
&
hashVal
);
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
__unlock
(
pHashObj
->
lock
);
if
(
pNode
!=
NULL
)
{
assert
(
pNode
->
hashVal
==
hashVal
);
...
...
@@ -419,29 +424,24 @@ char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen)
/**
* remove node in hash list
* @param pObj
* @param p
Hash
Obj
* @param pNode
*/
void
taosDeleteFromHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
)
{
if
(
pObj
->
multithreadSafe
)
{
__wr_lock
(
&
pObj
->
lock
);
}
void
taosHashRemove
(
SHashObj
*
pHashObj
,
const
char
*
key
,
size_t
keyLen
)
{
__wr_lock
(
pHashObj
->
lock
);
uint32_t
val
=
0
;
SHashNode
*
pNode
=
doGetNodeFromHashTable
(
pObj
,
key
,
keyLen
,
&
val
);
SHashNode
*
pNode
=
doGetNodeFromHashTable
(
p
Hash
Obj
,
key
,
keyLen
,
&
val
);
if
(
pNode
==
NULL
)
{
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
__unlock
(
pHashObj
->
lock
);
return
;
}
SHashNode
*
pNext
=
pNode
->
next
;
if
(
pNode
->
prev
!=
NULL
)
{
int32_t
slot
=
HASH_INDEX
(
val
,
pObj
->
capacity
);
if
(
pObj
->
hashList
[
slot
]
->
next
==
pNode
)
{
pObj
->
hashList
[
slot
]
->
next
=
pNext
;
int32_t
slot
=
HASH_INDEX
(
val
,
p
Hash
Obj
->
capacity
);
if
(
p
Hash
Obj
->
hashList
[
slot
]
->
next
==
pNode
)
{
p
Hash
Obj
->
hashList
[
slot
]
->
next
=
pNext
;
}
else
{
pNode
->
prev
->
next
=
pNext
;
}
...
...
@@ -451,11 +451,12 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) {
pNext
->
prev
=
pNode
->
prev
;
}
uint32_t
index
=
HASH_INDEX
(
pNode
->
hashVal
,
pObj
->
capacity
);
SHashEntry
*
pEntry
=
pObj
->
hashList
[
index
];
uint32_t
index
=
HASH_INDEX
(
pNode
->
hashVal
,
pHashObj
->
capacity
);
SHashEntry
*
pEntry
=
pHashObj
->
hashList
[
index
];
pEntry
->
num
--
;
pObj
->
size
--
;
p
Hash
Obj
->
size
--
;
pNode
->
next
=
NULL
;
pNode
->
prev
=
NULL
;
...
...
@@ -463,24 +464,21 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) {
pTrace
(
"key:%s %p remove from hash table"
,
pNode
->
key
,
pNode
);
tfree
(
pNode
);
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
__unlock
(
pHashObj
->
lock
);
}
void
taosCleanUpHashTable
(
void
*
handle
)
{
HashObj
*
pObj
=
(
HashObj
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
capacity
<=
0
)
return
;
void
taosHashCleanup
(
SHashObj
*
pHashObj
)
{
if
(
pHashObj
==
NULL
||
pHashObj
->
capacity
<=
0
)
{
return
;
}
SHashNode
*
pNode
,
*
pNext
;
if
(
pObj
->
multithreadSafe
)
{
__wr_lock
(
&
pObj
->
lock
);
}
__wr_lock
(
pHashObj
->
lock
);
if
(
pObj
->
hashList
)
{
for
(
int32_t
i
=
0
;
i
<
pObj
->
capacity
;
++
i
)
{
SHashEntry
*
pEntry
=
pObj
->
hashList
[
i
];
if
(
p
Hash
Obj
->
hashList
)
{
for
(
int32_t
i
=
0
;
i
<
p
Hash
Obj
->
capacity
;
++
i
)
{
SHashEntry
*
pEntry
=
p
Hash
Obj
->
hashList
[
i
];
pNode
=
pEntry
->
next
;
while
(
pNode
)
{
...
...
@@ -492,28 +490,26 @@ void taosCleanUpHashTable(void *handle) {
tfree
(
pEntry
);
}
free
(
pObj
->
hashList
);
free
(
p
Hash
Obj
->
hashList
);
}
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
__lock_destroy
(
&
pObj
->
lock
);
}
__unlock
(
pHashObj
->
lock
);
__lock_destroy
(
pHashObj
->
lock
);
memset
(
p
Obj
,
0
,
sizeof
(
HashObj
));
free
(
pObj
);
memset
(
p
HashObj
,
0
,
sizeof
(
S
HashObj
));
free
(
p
Hash
Obj
);
}
// for profile only
int32_t
taos
GetHashMaxOverflowLength
(
HashObj
*
p
Obj
)
{
if
(
p
Obj
==
NULL
||
p
Obj
->
size
==
0
)
{
int32_t
taos
HashGetMaxOverflowLinkLength
(
const
SHashObj
*
pHash
Obj
)
{
if
(
p
HashObj
==
NULL
||
pHash
Obj
->
size
==
0
)
{
return
0
;
}
int32_t
num
=
0
;
for
(
int32_t
i
=
0
;
i
<
pObj
->
size
;
++
i
)
{
SHashEntry
*
pEntry
=
pObj
->
hashList
[
i
];
for
(
int32_t
i
=
0
;
i
<
p
Hash
Obj
->
size
;
++
i
)
{
SHashEntry
*
pEntry
=
p
Hash
Obj
->
hashList
[
i
];
if
(
num
<
pEntry
->
num
)
{
num
=
pEntry
->
num
;
}
...
...
src/util/src/sskiplist.c
已删除
100644 → 0
浏览文件 @
c1d33817
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include "os.h"
#include "tlog.h"
#include "taosdef.h"
#include "sskiplist.h"
#include "tutil.h"
static FORCE_INLINE void recordNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { // record link count in each level
for (int32_t i = 0; i < nLevel; ++i) {
pSkipList->state.nLevelNodeCnt[i]++;
}
}
static FORCE_INLINE void removeNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) {
for (int32_t i = 0; i < nLevel; ++i) {
pSkipList->state.nLevelNodeCnt[i]--;
}
}
static FORCE_INLINE int32_t getSkipListNodeRandomHeight(tSkipList *pSkipList) {
const uint32_t factor = 4;
int32_t n = 1;
while ((rand() % factor) == 0 && n <= pSkipList->nMaxLevel) {
n++;
}
return n;
}
static FORCE_INLINE int32_t getSkipListNodeLevel(tSkipList *pSkipList) {
int32_t nLevel = getSkipListNodeRandomHeight(pSkipList);
if (pSkipList->nSize == 0) {
nLevel = 1;
pSkipList->nLevel = 1;
} else {
if (nLevel > pSkipList->nLevel && pSkipList->nLevel < pSkipList->nMaxLevel) {
nLevel = (++pSkipList->nLevel);
}
}
return nLevel;
}
void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode);
void SSkipListDoRecordPut(tSkipList *pSkipList) {
const int32_t MAX_RECORD_NUM = 1000;
if (pSkipList->state.nInsertObjs == MAX_RECORD_NUM) {
pSkipList->state.nInsertObjs = 1;
pSkipList->state.nTotalStepsForInsert = 0;
pSkipList->state.nTotalElapsedTimeForInsert = 0;
} else {
pSkipList->state.nInsertObjs++;
}
}
int32_t compareIntVal(const void *pLeft, const void *pRight) {
int64_t lhs = ((tSkipListKey *)pLeft)->i64Key;
int64_t rhs = ((tSkipListKey *)pRight)->i64Key;
DEFAULT_COMP(lhs, rhs);
}
int32_t scompareIntDoubleVal(const void *pLeft, const void *pRight) {
int64_t lhs = ((tSkipListKey *)pLeft)->i64Key;
double rhs = ((tSkipListKey *)pRight)->dKey;
if (fabs(lhs - rhs) < FLT_EPSILON) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
}
int32_t scompareDoubleIntVal(const void *pLeft, const void *pRight) {
double lhs = ((tSkipListKey *)pLeft)->dKey;
int64_t rhs = ((tSkipListKey *)pRight)->i64Key;
if (fabs(lhs - rhs) < FLT_EPSILON) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
}
int32_t scompareDoubleVal(const void *pLeft, const void *pRight) {
double ret = (((tSkipListKey *)pLeft)->dKey - ((tSkipListKey *)pRight)->dKey);
if (fabs(ret) < FLT_EPSILON) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
int32_t scompareStrVal(const void *pLeft, const void *pRight) {
tSkipListKey *pL = (tSkipListKey *)pLeft;
tSkipListKey *pR = (tSkipListKey *)pRight;
if (pL->nLen == 0 && pR->nLen == 0) {
return 0;
}
//handle only one-side bound compare situation, there is only lower bound or only upper bound
if (pL->nLen == -1) {
return 1; // no lower bound, lower bound is minimum, always return -1;
} else if (pR->nLen == -1) {
return -1; // no upper bound, upper bound is maximum situation, always return 1;
}
int32_t ret = strcmp(((tSkipListKey *)pLeft)->pz, ((tSkipListKey *)pRight)->pz);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
int32_t scompareWStrVal(const void *pLeft, const void *pRight) {
tSkipListKey *pL = (tSkipListKey *)pLeft;
tSkipListKey *pR = (tSkipListKey *)pRight;
if (pL->nLen == 0 && pR->nLen == 0) {
return 0;
}
//handle only one-side bound compare situation, there is only lower bound or only upper bound
if (pL->nLen == -1) {
return 1; // no lower bound, lower bound is minimum, always return -1;
} else if (pR->nLen == -1) {
return -1; // no upper bound, upper bound is maximum situation, always return 1;
}
int32_t ret = wcscmp(((tSkipListKey *)pLeft)->wpz, ((tSkipListKey *)pRight)->wpz);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
static __compar_fn_t getKeyFilterComparator(tSkipList *pSkipList, int32_t filterDataType) {
__compar_fn_t comparator = NULL;
switch (pSkipList->keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_BOOL: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
comparator = compareIntVal;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparator = scompareIntDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
comparator = scompareDoubleIntVal;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparator = scompareDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_BINARY:
comparator = scompareStrVal;
break;
case TSDB_DATA_TYPE_NCHAR:
comparator = scompareWStrVal;
break;
default:
comparator = compareIntVal;
break;
}
return comparator;
}
static __compar_fn_t getKeyComparator(int32_t keyType) {
__compar_fn_t comparator = NULL;
switch (keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_BOOL:
comparator = compareIntVal;
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
comparator = scompareDoubleVal;
break;
case TSDB_DATA_TYPE_BINARY:
comparator = scompareStrVal;
break;
case TSDB_DATA_TYPE_NCHAR:
comparator = scompareWStrVal;
break;
default:
comparator = compareIntVal;
break;
}
return comparator;
}
tSkipList* SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen) {
tSkipList *pSkipList = (tSkipList *)calloc(1, sizeof(tSkipList));
if (pSkipList == NULL) {
return NULL;
}
pSkipList->keyType = keyType;
pSkipList->comparator = getKeyComparator(keyType);
pSkipList->pHead.pForward = (tSkipListNode **)calloc(1, POINTER_BYTES * MAX_SKIP_LIST_LEVEL);
pSkipList->nMaxLevel = MAX_SKIP_LIST_LEVEL;
pSkipList->nLevel = 1;
pSkipList->nMaxKeyLen = nMaxKeyLen;
pSkipList->nMaxLevel = nMaxLevel;
if (pthread_rwlock_init(&pSkipList->lock, NULL) != 0) {
tfree(pSkipList->pHead.pForward);
tfree(pSkipList);
return NULL;
}
srand(time(NULL));
pSkipList->state.nTotalMemSize += sizeof(tSkipList);
return pSkipList;
}
static void doRemove(tSkipList *pSkipList, tSkipListNode *pNode, tSkipListNode *forward[]) {
int32_t level = pNode->nLevel;
for (int32_t j = level - 1; j >= 0; --j) {
if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) {
forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j];
}
if (forward[j]->pForward[j] != NULL) {
forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j];
}
}
pSkipList->state.nTotalMemSize -= (sizeof(tSkipListNode) + POINTER_BYTES * pNode->nLevel * 2);
removeNodeEachLevel(pSkipList, pNode->nLevel);
tfree(pNode);
--pSkipList->nSize;
}
static size_t getOneNodeSize(const tSkipListKey *pKey, int32_t nLevel) {
size_t size = sizeof(tSkipListNode) + sizeof(intptr_t) * (nLevel << 1);
if (pKey->nType == TSDB_DATA_TYPE_BINARY) {
size += pKey->nLen + 1;
} else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) {
size += (pKey->nLen + 1) * TSDB_NCHAR_SIZE;
}
return size;
}
static tSkipListNode *SSkipListCreateNode(void *pData, const tSkipListKey *pKey, int32_t nLevel) {
size_t nodeSize = getOneNodeSize(pKey, nLevel);
tSkipListNode *pNode = (tSkipListNode *)calloc(1, nodeSize);
pNode->pForward = (tSkipListNode **)(&pNode[1]);
pNode->pBackward = (pNode->pForward + nLevel);
pNode->pData = pData;
pNode->key = *pKey;
if (pKey->nType == TSDB_DATA_TYPE_BINARY) {
pNode->key.pz = (char *)(pNode->pBackward + nLevel);
strcpy(pNode->key.pz, pKey->pz);
pNode->key.pz[pKey->nLen] = 0;
} else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) {
pNode->key.wpz = (wchar_t *)(pNode->pBackward + nLevel);
wcsncpy(pNode->key.wpz, pKey->wpz, pKey->nLen);
pNode->key.wpz[pKey->nLen] = 0;
}
pNode->nLevel = nLevel;
return pNode;
}
tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength) {
tSkipListKey k = {0};
tVariantCreateFromBinary(&k, val, (uint32_t) keyLength, (uint32_t) type);
return k;
}
void tSkipListDestroyKey(tSkipListKey *pKey) { tVariantDestroy(pKey); }
void* SSkipListDestroy(tSkipList *pSkipList) {
if (pSkipList == NULL) {
return NULL;
}
pthread_rwlock_wrlock(&pSkipList->lock);
tSkipListNode *pNode = pSkipList->pHead.pForward[0];
while (pNode) {
tSkipListNode *pTemp = pNode;
pNode = pNode->pForward[0];
tfree(pTemp);
}
tfree(pSkipList->pHead.pForward);
pthread_rwlock_unlock(&pSkipList->lock);
pthread_rwlock_destroy(&pSkipList->lock);
tfree(pSkipList);
return NULL;
}
tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey) {
if (pSkipList == NULL) {
return NULL;
}
pthread_rwlock_wrlock(&pSkipList->lock);
// record one node is put into skiplist
SSkipListDoRecordPut(pSkipList);
tSkipListNode *px = &pSkipList->pHead;
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) {
while (px->pForward[i] != NULL && (pSkipList->comparator(&px->pForward[i]->key, pKey) < 0)) {
px = px->pForward[i];
}
pSkipList->state.nTotalStepsForInsert++;
forward[i] = px;
}
// if the skiplist does not allowed identical key inserted, the new data will be discarded.
if ((insertIdenticalKey == 0) && forward[0] != &pSkipList->pHead &&
(pSkipList->comparator(&forward[0]->key, pKey) == 0)) {
pthread_rwlock_unlock(&pSkipList->lock);
return forward[0];
}
int32_t nLevel = getSkipListNodeLevel(pSkipList);
recordNodeEachLevel(pSkipList, nLevel);
tSkipListNode *pNode = SSkipListCreateNode(pData, pKey, nLevel);
tSkipListDoInsert(pSkipList, forward, nLevel, pNode);
pSkipList->nSize += 1;
// char tmpstr[512] = {0};
// tVariantToString(&pNode->key, tmpstr);
// pTrace("skiplist:%p, node added, key:%s, total list len:%d", pSkipList,
// tmpstr, pSkipList->nSize);
pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, nLevel);
pthread_rwlock_unlock(&pSkipList->lock);
return pNode;
}
void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode) {
for (int32_t i = 0; i < nLevel; ++i) {
tSkipListNode *x = forward[i];
if (x != NULL) {
pNode->pBackward[i] = x;
if (x->pForward[i]) x->pForward[i]->pBackward[i] = pNode;
pNode->pForward[i] = x->pForward[i];
x->pForward[i] = pNode;
} else {
pSkipList->pHead.pForward[i] = pNode;
pNode->pBackward[i] = &(pSkipList->pHead);
}
}
}
tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey) {
int32_t sLevel = pSkipList->nLevel - 1;
int32_t ret = -1;
tSkipListNode *x = &pSkipList->pHead;
pthread_rwlock_rdlock(&pSkipList->lock);
pSkipList->state.queryCount++;
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
for (int32_t i = sLevel; i >= 0; --i) {
while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) {
x = x->pForward[i];
}
if (ret == 0) {
pthread_rwlock_unlock(&pSkipList->lock);
return x->pForward[i];
}
}
pthread_rwlock_unlock(&pSkipList->lock);
return NULL;
}
static int32_t tSkipListEndParQuery(tSkipList *pSkipList, tSkipListNode *pStartNode, tSkipListKey *pEndKey,
int32_t cond, tSkipListNode ***pRes) {
pthread_rwlock_rdlock(&pSkipList->lock);
tSkipListNode *p = pStartNode;
int32_t numOfRes = 0;
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pEndKey->nType);
while (p != NULL) {
int32_t ret = filterComparator(&p->key, pEndKey);
if (ret > 0) {
break;
}
if (ret < 0) {
numOfRes++;
p = p->pForward[0];
} else if (ret == 0) {
if (cond == TSDB_RELATION_LESS_EQUAL) {
numOfRes++;
p = p->pForward[0];
} else {
break;
}
}
}
(*pRes) = (tSkipListNode **)malloc(POINTER_BYTES * numOfRes);
for (int32_t i = 0; i < numOfRes; ++i) {
(*pRes)[i] = pStartNode;
pStartNode = pStartNode->pForward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return numOfRes;
}
/*
* maybe return the copy of tSkipListNode would be better
*/
int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes) {
(*pRes) = NULL;
tSkipListNode *pNode = tSkipListGetOne(pSkipList, pKey);
if (pNode == NULL) {
return 0;
}
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
// backward check if previous nodes are with the same value.
tSkipListNode *pPrev = pNode->pBackward[0];
while ((pPrev != &pSkipList->pHead) && filterComparator(&pPrev->key, pKey) == 0) {
pPrev = pPrev->pBackward[0];
}
return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes);
}
static tSkipListNode *tSkipListParQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t cond) {
int32_t sLevel = pSkipList->nLevel - 1;
int32_t ret = -1;
tSkipListNode *x = &pSkipList->pHead;
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
pthread_rwlock_rdlock(&pSkipList->lock);
if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) {
for (int32_t i = sLevel; i >= 0; --i) {
while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) {
x = x->pForward[i];
}
}
// backward check if previous nodes are with the same value.
if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) {
tSkipListNode *pNode = x->pForward[0];
while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparator(&pNode->pBackward[0]->key, pKey) == 0)) {
pNode = pNode->pBackward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return pNode;
}
if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) {
pthread_rwlock_unlock(&pSkipList->lock);
return x->pForward[0];
} else { // cond == TSDB_RELATION_LARGE && ret == 0
tSkipListNode *pn = x->pForward[0];
while (pn != NULL && filterComparator(&pn->key, pKey) == 0) {
pn = pn->pForward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return pn;
}
}
pthread_rwlock_unlock(&pSkipList->lock);
return NULL;
}
int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *),
void *param) {
(*pRes) = (tSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize);
if (NULL == *pRes) {
pError("error skiplist %p, malloc failed", pSkipList);
return -1;
}
pthread_rwlock_rdlock(&pSkipList->lock);
tSkipListNode *pStartNode = pSkipList->pHead.pForward[0];
int32_t num = 0;
for (int32_t i = 0; i < pSkipList->nSize; ++i) {
if (pStartNode == NULL) {
pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1);
#ifdef _DEBUG_VIEW
SSkipListPrint(pSkipList, 1);
#endif
break;
}
if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) {
(*pRes)[num++] = pStartNode;
}
pStartNode = pStartNode->pForward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
if (num == 0) {
free(*pRes);
*pRes = NULL;
} else if (num < pSkipList->nSize) { // free unused memory
char* tmp = realloc((*pRes), num * POINTER_BYTES);
assert(tmp != NULL);
*pRes = (tSkipListNode**)tmp;
}
return num;
}
int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator* iter) {
if (pSkipList == NULL) {
return -1;
}
iter->pSkipList = pSkipList;
pthread_rwlock_rdlock(&pSkipList->lock);
iter->cur = NULL;//pSkipList->pHead.pForward[0];
iter->num = pSkipList->nSize;
pthread_rwlock_unlock(&pSkipList->lock);
return 0;
}
bool tSkipListIteratorNext(SSkipListIterator* iter) {
if (iter->num == 0 || iter->pSkipList == NULL) {
return false;
}
tSkipList* pSkipList = iter->pSkipList;
pthread_rwlock_rdlock(&pSkipList->lock);
if (iter->cur == NULL) {
iter->cur = pSkipList->pHead.pForward[0];
} else {
iter->cur = iter->cur->pForward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return iter->cur != NULL;
}
tSkipListNode* tSkipListIteratorGet(SSkipListIterator* iter) {
return iter->cur;
}
int32_t tSkipListRangeQuery(tSkipList *pSkipList, tSKipListQueryCond *pCond, tSkipListNode ***pRes) {
pSkipList->state.queryCount++;
tSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr);
if (pStart == 0) {
*pRes = NULL;
return 0;
}
return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes);
}
static bool removeSupport(tSkipList *pSkipList, tSkipListNode **forward, tSkipListKey *pKey) {
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
if (filterComparator(&forward[0]->pForward[0]->key, pKey) == 0) {
tSkipListNode *p = forward[0]->pForward[0];
doRemove(pSkipList, p, forward);
} else { // failed to find the node of specified value,abort
return false;
}
// compress the minimum level of skip list
while (pSkipList->nLevel > 0 && pSkipList->pHead.pForward[pSkipList->nLevel - 1] == NULL) {
pSkipList->nLevel -= 1;
}
return true;
}
void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode) {
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
pthread_rwlock_rdlock(&pSkipList->lock);
for (int32_t i = 0; i < pNode->nLevel; ++i) {
forward[i] = pNode->pBackward[i];
}
removeSupport(pSkipList, forward, &pNode->key);
pthread_rwlock_unlock(&pSkipList->lock);
}
bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey) {
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
pthread_rwlock_rdlock(&pSkipList->lock);
tSkipListNode *x = &pSkipList->pHead;
for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) {
while (x->pForward[i] != NULL && (filterComparator(&x->pForward[i]->key, pKey) < 0)) {
x = x->pForward[i];
}
forward[i] = x;
}
bool ret = removeSupport(pSkipList, forward, pKey);
pthread_rwlock_unlock(&pSkipList->lock);
return ret;
}
void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel) {
if (pSkipList == NULL || pSkipList->nLevel < nlevel || nlevel <= 0) {
return;
}
tSkipListNode *p = pSkipList->pHead.pForward[nlevel - 1];
int32_t id = 1;
while (p) {
switch (pSkipList->keyType) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BIGINT:
fprintf(stdout, "%d: %" PRId64 " \n", id++, p->key.i64Key);
break;
case TSDB_DATA_TYPE_BINARY:
fprintf(stdout, "%d: %s \n", id++, p->key.pz);
break;
case TSDB_DATA_TYPE_DOUBLE:
fprintf(stdout, "%d: %lf \n", id++, p->key.dKey);
break;
default:
fprintf(stdout, "\n");
}
p = p->pForward[nlevel - 1];
}
}
/*
* query processor based on query condition
*/
int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult) {
// query condition check
int32_t rel = 0;
__compar_fn_t comparator = getKeyComparator(pQueryCond->lowerBnd.nType);
if (pSkipList == NULL || pQueryCond == NULL || pSkipList->nSize == 0 ||
(((rel = comparator(&pQueryCond->lowerBnd, &pQueryCond->upperBnd)) > 0 &&
pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_NCHAR && pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_BINARY))) {
(*pResult) = NULL;
return 0;
}
if (rel == 0) {
/*
* 0 means: pQueryCond->lowerBnd == pQueryCond->upperBnd
* point query
*/
if (pQueryCond->lowerBndRelOptr == TSDB_RELATION_LARGE_EQUAL &&
pQueryCond->upperBndRelOptr == TSDB_RELATION_LESS_EQUAL) { // point query
return tSkipListGets(pSkipList, &pQueryCond->lowerBnd, pResult);
} else {
(*pResult) = NULL;
return 0;
}
} else {
/* range query, query operation code check */
return tSkipListRangeQuery(pSkipList, pQueryCond, pResult);
}
}
typedef struct MultipleQueryResult {
int32_t len;
tSkipListNode **pData;
} MultipleQueryResult;
static int32_t mergeQueryResult(MultipleQueryResult *pResults, int32_t numOfResSet, tSkipListNode ***pRes) {
int32_t total = 0;
for (int32_t i = 0; i < numOfResSet; ++i) {
total += pResults[i].len;
}
(*pRes) = malloc(POINTER_BYTES * total);
int32_t idx = 0;
for (int32_t i = 0; i < numOfResSet; ++i) {
MultipleQueryResult *pOneResult = &pResults[i];
for (int32_t j = 0; j < pOneResult->len; ++j) {
(*pRes)[idx++] = pOneResult->pData[j];
}
}
return total;
}
static void removeDuplicateKey(tSkipListKey *pKey, int32_t *numOfKey, __compar_fn_t comparator) {
if (*numOfKey == 1) {
return;
}
qsort(pKey, *numOfKey, sizeof(pKey[0]), comparator);
int32_t i = 0, j = 1;
while (i < (*numOfKey) && j < (*numOfKey)) {
int32_t ret = comparator(&pKey[i], &pKey[j]);
if (ret == 0) {
j++;
} else {
pKey[i + 1] = pKey[j];
i++;
j++;
}
}
(*numOfKey) = i + 1;
}
int32_t mergeResult(const tSkipListKey *pKey, int32_t numOfKey, tSkipListNode ***pRes, __compar_fn_t comparator,
tSkipListNode *pNode) {
int32_t i = 0, j = 0;
// merge two sorted arrays in O(n) time
while (i < numOfKey && pNode != NULL) {
int32_t ret = comparator(&pNode->key, &pKey[i]);
if (ret < 0) {
(*pRes)[j++] = pNode;
pNode = pNode->pForward[0];
} else if (ret == 0) {
pNode = pNode->pForward[0];
} else { // pNode->key > pkey[i]
i++;
}
}
while (pNode != NULL) {
(*pRes)[j++] = pNode;
pNode = pNode->pForward[0];
}
return j;
}
int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type,
tSkipListNode ***pRes) {
if (numOfKey == 0 || pKey == NULL || pSkipList == NULL || pSkipList->nSize == 0 ||
(type != INCLUDE_POINT_QUERY && type != EXCLUDE_POINT_QUERY)) {
(*pRes) = NULL;
return 0;
}
__compar_fn_t comparator = getKeyComparator(pKey->nType);
removeDuplicateKey(pKey, &numOfKey, comparator);
if (type == INCLUDE_POINT_QUERY) {
if (numOfKey == 1) {
return tSkipListGets(pSkipList, &pKey[0], pRes);
} else {
MultipleQueryResult *pTempResult = (MultipleQueryResult *)malloc(sizeof(MultipleQueryResult) * numOfKey);
for (int32_t i = 0; i < numOfKey; ++i) {
pTempResult[i].len = tSkipListGets(pSkipList, &pKey[i], &pTempResult[i].pData);
}
int32_t num = mergeQueryResult(pTempResult, numOfKey, pRes);
for (int32_t i = 0; i < numOfKey; ++i) {
free(pTempResult[i].pData);
}
free(pTempResult);
return num;
}
} else { // exclude query
*pRes = malloc(POINTER_BYTES * pSkipList->nSize);
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
tSkipListNode *pNode = pSkipList->pHead.pForward[0];
int32_t retLen = mergeResult(pKey, numOfKey, pRes, filterComparator, pNode);
if (retLen < pSkipList->nSize) {
(*pRes) = realloc(*pRes, POINTER_BYTES * retLen);
}
return retLen;
}
}
#endif
\ No newline at end of file
src/util/src/tarray.c
浏览文件 @
34df442c
...
...
@@ -98,7 +98,7 @@ void* taosArrayGetP(SArray* pArray, size_t index) {
return
*
(
void
**
)
ret
;
}
size_t
taosArrayGetSize
(
SArray
*
pArray
)
{
return
pArray
->
size
;
}
size_t
taosArrayGetSize
(
const
SArray
*
pArray
)
{
return
pArray
->
size
;
}
void
*
taosArrayInsert
(
SArray
*
pArray
,
size_t
index
,
void
*
pData
)
{
if
(
pArray
==
NULL
||
pData
==
NULL
)
{
...
...
src/util/src/thashutil.c
浏览文件 @
34df442c
...
...
@@ -7,8 +7,8 @@
* MurmurHash algorithm
*
*/
#include "hashfunc.h"
#include "tutil.h"
#include "hashutil.h"
#define ROTL32(x, r) ((x) << (r) | (x) >> (32 - (r)))
...
...
src/util/src/tskiplist.c
浏览文件 @
34df442c
...
...
@@ -524,6 +524,71 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) {
return
sa
;
}
size_t
tSkipListGetSize
(
const
SSkipList
*
pSkipList
)
{
if
(
pSkipList
==
NULL
)
{
return
0
;
}
return
pSkipList
->
size
;
}
SSkipListIterator
*
tSkipListCreateIter
(
SSkipList
*
pSkipList
)
{
if
(
pSkipList
==
NULL
)
{
return
NULL
;
}
SSkipListIterator
*
iter
=
calloc
(
1
,
sizeof
(
SSkipListIterator
));
iter
->
pSkipList
=
pSkipList
;
if
(
pSkipList
->
lock
)
{
pthread_rwlock_rdlock
(
pSkipList
->
lock
);
}
iter
->
cur
=
NULL
;
iter
->
num
=
pSkipList
->
size
;
if
(
pSkipList
->
lock
)
{
pthread_rwlock_unlock
(
pSkipList
->
lock
);
}
return
iter
;
}
bool
tSkipListIterNext
(
SSkipListIterator
*
iter
)
{
if
(
iter
->
num
==
0
||
iter
->
pSkipList
==
NULL
)
{
return
false
;
}
SSkipList
*
pSkipList
=
iter
->
pSkipList
;
if
(
pSkipList
->
lock
)
{
pthread_rwlock_rdlock
(
pSkipList
->
lock
);
}
if
(
iter
->
cur
==
NULL
)
{
iter
->
cur
=
SL_GET_FORWARD_POINTER
(
pSkipList
->
pHead
,
0
);
}
else
{
iter
->
cur
=
SL_GET_FORWARD_POINTER
(
iter
->
cur
,
0
);
}
if
(
pSkipList
->
lock
)
{
pthread_rwlock_unlock
(
pSkipList
->
lock
);
}
return
iter
->
cur
!=
NULL
;
}
SSkipListNode
*
tSkipListIterGet
(
SSkipListIterator
*
iter
)
{
return
(
iter
==
NULL
)
?
NULL
:
iter
->
cur
;
}
void
*
tSkipListDestroyIter
(
SSkipListIterator
*
iter
)
{
if
(
iter
==
NULL
)
{
return
NULL
;
}
tfree
(
iter
);
return
NULL
;
}
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
// int32_t cond, SSkipListNode ***pRes) {
// pthread_rwlock_rdlock(&pSkipList->lock);
...
...
src/vnode/detail/inc/vnodeQueryImpl.h
浏览文件 @
34df442c
...
...
@@ -23,7 +23,7 @@ extern "C" {
#include "os.h"
#include "hash.h"
#include "hash
util
.h"
#include "hash
func
.h"
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query))
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
...
...
@@ -119,7 +119,7 @@ typedef enum {
typedef
int
(
*
__block_search_fn_t
)(
char
*
data
,
int
num
,
int64_t
key
,
int
order
);
static
FORCE_INLINE
SMeterObj
*
getMeterObj
(
void
*
hashHandle
,
int32_t
sid
)
{
return
*
(
SMeterObj
**
)
taos
GetDataFromHashTable
(
hashHandle
,
(
const
char
*
)
&
sid
,
sizeof
(
sid
));
return
*
(
SMeterObj
**
)
taos
HashGet
(
hashHandle
,
(
const
char
*
)
&
sid
,
sizeof
(
sid
));
}
bool
isQueryKilled
(
SQuery
*
pQuery
);
...
...
src/vnode/detail/src/vnodeQueryImpl.c
浏览文件 @
34df442c
...
...
@@ -14,7 +14,7 @@
*/
#include "hash.h"
#include "hash
util
.h"
#include "hash
func
.h"
#include "os.h"
#include "taosmsg.h"
#include "textbuffer.h"
...
...
@@ -1460,7 +1460,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
int16_t
bytes
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
*
p1
=
(
int32_t
*
)
taos
GetDataFromHashTable
(
pWindowResInfo
->
hashList
,
pData
,
bytes
);
int32_t
*
p1
=
(
int32_t
*
)
taos
HashGet
(
pWindowResInfo
->
hashList
,
pData
,
bytes
);
if
(
p1
!=
NULL
)
{
pWindowResInfo
->
curIndex
=
*
p1
;
}
else
{
// more than the capacity, reallocate the resources
...
...
@@ -1485,7 +1485,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
// add a new result set for a new group
pWindowResInfo
->
curIndex
=
pWindowResInfo
->
size
++
;
taos
AddToHashTable
(
pWindowResInfo
->
hashList
,
pData
,
bytes
,
(
char
*
)
&
pWindowResInfo
->
curIndex
,
sizeof
(
int32_t
));
taos
HashPut
(
pWindowResInfo
->
hashList
,
pData
,
bytes
,
(
char
*
)
&
pWindowResInfo
->
curIndex
,
sizeof
(
int32_t
));
}
return
getWindowResult
(
pWindowResInfo
,
pWindowResInfo
->
curIndex
);
...
...
@@ -2018,7 +2018,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo
->
type
=
type
;
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
type
);
pWindowResInfo
->
hashList
=
taos
InitHashTable
(
threshold
,
fn
,
false
);
pWindowResInfo
->
hashList
=
taos
HashInit
(
threshold
,
fn
,
false
);
pWindowResInfo
->
curIndex
=
-
1
;
pWindowResInfo
->
size
=
0
;
...
...
@@ -2044,7 +2044,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRu
destroyTimeWindowRes
(
pResult
,
pRuntimeEnv
->
pQuery
->
numOfOutputCols
);
}
taos
CleanUpHashTable
(
pWindowResInfo
->
hashList
);
taos
HashCleanup
(
pWindowResInfo
->
hashList
);
tfree
(
pWindowResInfo
->
pResult
);
}
...
...
@@ -2059,11 +2059,11 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
}
pWindowResInfo
->
curIndex
=
-
1
;
taos
CleanUpHashTable
(
pWindowResInfo
->
hashList
);
taos
HashCleanup
(
pWindowResInfo
->
hashList
);
pWindowResInfo
->
size
=
0
;
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
pWindowResInfo
->
type
);
pWindowResInfo
->
hashList
=
taos
InitHashTable
(
pWindowResInfo
->
capacity
,
fn
,
false
);
pWindowResInfo
->
hashList
=
taos
HashInit
(
pWindowResInfo
->
capacity
,
fn
,
false
);
pWindowResInfo
->
startTime
=
0
;
pWindowResInfo
->
prevSKey
=
0
;
...
...
@@ -2081,7 +2081,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
if
(
pResult
->
status
.
closed
)
{
// remove the window slot from hash table
taos
DeleteFromHashTabl
e
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
);
taos
HashRemov
e
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
);
}
else
{
break
;
}
...
...
@@ -2104,14 +2104,14 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
for
(
int32_t
k
=
0
;
k
<
pWindowResInfo
->
size
;
++
k
)
{
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
k
];
int32_t
*
p
=
(
int32_t
*
)
taos
GetDataFromHashTable
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
int32_t
*
p
=
(
int32_t
*
)
taos
HashGet
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
);
int32_t
v
=
(
*
p
-
num
);
assert
(
v
>=
0
&&
v
<=
pWindowResInfo
->
size
);
// todo add the update function for hash table
taos
DeleteFromHashTabl
e
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
);
taos
AddToHashTable
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
,
(
char
*
)
&
v
,
taos
HashRemov
e
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
);
taos
HashPut
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
,
(
char
*
)
&
v
,
sizeof
(
int32_t
));
}
...
...
@@ -4812,7 +4812,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
tfree
(
pSupporter
->
pMeterSidExtInfo
);
if
(
pSupporter
->
pMetersHashTable
!=
NULL
)
{
taos
CleanUpHashTable
(
pSupporter
->
pMetersHashTable
);
taos
HashCleanup
(
pSupporter
->
pMetersHashTable
);
pSupporter
->
pMetersHashTable
=
NULL
;
}
...
...
src/vnode/detail/src/vnodeRead.c
浏览文件 @
34df442c
...
...
@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "hash.h"
#include "hashfunc.h"
#include "ihash.h"
#include "taosmsg.h"
#include "tast.h"
...
...
@@ -25,8 +27,6 @@
#include "vnode.h"
#include "vnodeRead.h"
#include "vnodeUtil.h"
#include "hash.h"
#include "hashutil.h"
int
(
*
pQueryFunc
[])(
SMeterObj
*
,
SQuery
*
)
=
{
vnodeQueryFromCache
,
vnodeQueryFromFile
};
...
...
@@ -651,8 +651,8 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
STableQuerySupportObj
*
pSupporter
=
(
STableQuerySupportObj
*
)
calloc
(
1
,
sizeof
(
STableQuerySupportObj
));
pSupporter
->
numOfMeters
=
1
;
pSupporter
->
pMetersHashTable
=
taos
InitHashTable
(
pSupporter
->
numOfMeters
,
taosIntHash_32
,
false
);
taos
AddToHashTable
(
pSupporter
->
pMetersHashTable
,
(
const
char
*
)
&
pMetersObj
[
0
]
->
sid
,
sizeof
(
pMeterObj
[
0
].
sid
),
pSupporter
->
pMetersHashTable
=
taos
HashInit
(
pSupporter
->
numOfMeters
,
taosIntHash_32
,
false
);
taos
HashPut
(
pSupporter
->
pMetersHashTable
,
(
const
char
*
)
&
pMetersObj
[
0
]
->
sid
,
sizeof
(
pMeterObj
[
0
].
sid
),
(
char
*
)
&
pMetersObj
[
0
],
POINTER_BYTES
);
pSupporter
->
pSidSet
=
NULL
;
...
...
@@ -742,9 +742,9 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
STableQuerySupportObj
*
pSupporter
=
(
STableQuerySupportObj
*
)
calloc
(
1
,
sizeof
(
STableQuerySupportObj
));
pSupporter
->
numOfMeters
=
pQueryMsg
->
numOfSids
;
pSupporter
->
pMetersHashTable
=
taos
InitHashTable
(
pSupporter
->
numOfMeters
,
taosIntHash_32
,
false
);
pSupporter
->
pMetersHashTable
=
taos
HashInit
(
pSupporter
->
numOfMeters
,
taosIntHash_32
,
false
);
for
(
int32_t
i
=
0
;
i
<
pSupporter
->
numOfMeters
;
++
i
)
{
taos
AddToHashTable
(
pSupporter
->
pMetersHashTable
,
(
const
char
*
)
&
pMetersObj
[
i
]
->
sid
,
sizeof
(
pMetersObj
[
i
]
->
sid
),
(
char
*
)
&
pMetersObj
[
i
],
taos
HashPut
(
pSupporter
->
pMetersHashTable
,
(
const
char
*
)
&
pMetersObj
[
i
]
->
sid
,
sizeof
(
pMetersObj
[
i
]
->
sid
),
(
char
*
)
&
pMetersObj
[
i
],
POINTER_BYTES
);
}
...
...
src/vnode/tsdb/src/tsdbMeta.c
浏览文件 @
34df442c
...
...
@@ -32,7 +32,7 @@ STsdbMeta *tsdbCreateMeta(int32_t maxTables) {
return
NULL
;
}
pMeta
->
tableMap
=
taos
InitHashTable
(
maxTables
+
maxTables
/
10
,
taosGetDefaultHashFunction
,
false
);
pMeta
->
tableMap
=
taos
HashInit
(
maxTables
+
maxTables
/
10
,
taosGetDefaultHashFunction
,
false
);
if
(
pMeta
->
tableMap
==
NULL
)
{
free
(
pMeta
->
tables
);
free
(
pMeta
);
...
...
@@ -60,7 +60,7 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
tsdbFreeTable
(
pTemp
);
}
taos
CleanUpHashTable
(
pMeta
->
tableMap
);
taos
HashCleanup
(
pMeta
->
tableMap
);
free
(
pMeta
);
...
...
@@ -205,7 +205,7 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg) {
}
STable
*
tsdbGetTableByUid
(
STsdbMeta
*
pMeta
,
int64_t
uid
)
{
void
*
ptr
=
taos
GetDataFromHashTable
(
pMeta
->
tableMap
,
(
char
*
)(
&
uid
),
sizeof
(
uid
));
void
*
ptr
=
taos
HashGet
(
pMeta
->
tableMap
,
(
char
*
)(
&
uid
),
sizeof
(
uid
));
if
(
ptr
==
NULL
)
return
NULL
;
...
...
@@ -244,7 +244,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
static
int
tsdbAddTableIntoMap
(
STsdbMeta
*
pMeta
,
STable
*
pTable
)
{
// TODO: add the table to the map
int64_t
uid
=
pTable
->
tableId
.
uid
;
if
(
taos
AddToHashTable
(
pMeta
->
tableMap
,
(
char
*
)(
&
uid
),
sizeof
(
uid
),
(
void
*
)(
&
pTable
),
sizeof
(
pTable
))
<
0
)
{
if
(
taos
HashPut
(
pMeta
->
tableMap
,
(
char
*
)(
&
uid
),
sizeof
(
uid
),
(
void
*
)(
&
pTable
),
sizeof
(
pTable
))
<
0
)
{
return
-
1
;
}
return
0
;
...
...
tests/examples/c/CMakeLists.txt
浏览文件 @
34df442c
PROJECT
(
TDengine
)
IF
(
TD_WINDOWS_64
)
INCLUDE_DIRECTORIES
(
${
TD_
ROOT
_DIR
}
/deps/pthread
)
INCLUDE_DIRECTORIES
(
${
TD_
COMMUNITY
_DIR
}
/deps/pthread
)
ENDIF
()
INCLUDE_DIRECTORIES
(
.
${
TD_
ROOT_DIR
}
/src/inc
${
TD_ROOT_DIR
}
/src/client/inc
${
TD_OS
_DIR
}
/inc
)
INCLUDE_DIRECTORIES
(
.
${
TD_
COMMUNITY_DIR
}
/src/inc
${
TD_COMMUNITY_DIR
}
/src/client/inc
${
TD_COMMUNITY
_DIR
}
/inc
)
AUX_SOURCE_DIRECTORY
(
. SRC
)
#ADD_EXECUTABLE(demo ${SRC}
)
#
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
ADD_EXECUTABLE
(
demo demo.c
)
TARGET_LINK_LIBRARIES
(
demo taos_static trpc tutil pthread
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录