Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
爱划水de鲸鱼哥~
TDengine
提交
e9cc28ed
T
TDengine
项目概览
爱划水de鲸鱼哥~
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e9cc28ed
编写于
12月 25, 2019
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update the feature referred in issue#1009. [tbase-1380]
上级
4b72c79f
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
784 addition
and
94 deletion
+784
-94
src/inc/hash.h
src/inc/hash.h
+72
-0
src/inc/hashutil.h
src/inc/hashutil.h
+44
-0
src/system/detail/src/vnodeQueryImpl.c
src/system/detail/src/vnodeQueryImpl.c
+69
-56
src/system/detail/src/vnodeQueryProcess.c
src/system/detail/src/vnodeQueryProcess.c
+8
-8
src/system/detail/src/vnodeRead.c
src/system/detail/src/vnodeRead.c
+8
-4
src/system/detail/src/vnodeShell.c
src/system/detail/src/vnodeShell.c
+2
-1
src/util/src/hash.c
src/util/src/hash.c
+537
-0
src/util/src/thashutil.c
src/util/src/thashutil.c
+25
-1
src/util/src/ttokenizer.c
src/util/src/ttokenizer.c
+19
-24
未找到文件。
src/inc/hash.h
0 → 100644
浏览文件 @
e9cc28ed
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HASH_H
#define TDENGINE_HASH_H
#include "hashutil.h"
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_VALUE_IN_TRASH (-1)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
typedef
struct
SHashNode
{
char
*
key
;
// null-terminated string
union
{
struct
SHashNode
*
prev
;
struct
SHashEntry
*
prev1
;
};
struct
SHashNode
*
next
;
uint32_t
hashVal
;
// the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
uint32_t
keyLen
;
// length of the key
char
data
[];
}
SHashNode
;
typedef
struct
SHashEntry
{
SHashNode
*
next
;
uint32_t
num
;
}
SHashEntry
;
typedef
struct
HashObj
{
SHashEntry
**
hashList
;
uint32_t
capacity
;
int
size
;
_hash_fn_t
hashFp
;
bool
multithreadSafe
;
// enable lock
#if defined LINUX
pthread_rwlock_t
lock
;
#else
pthread_mutex_t
lock
;
#endif
}
HashObj
;
void
*
taosInitHashTable
(
uint32_t
capacity
,
_hash_fn_t
fn
,
bool
multithreadSafe
);
int32_t
taosAddToHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
,
void
*
data
,
uint32_t
size
);
void
taosDeleteFromHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
);
char
*
taosGetDataFromHash
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
);
void
taosCleanUpHashTable
(
void
*
handle
);
int32_t
taosGetHashMaxOverflowLength
(
HashObj
*
pObj
);
int32_t
taosCheckHashTable
(
HashObj
*
pObj
);
#endif // TDENGINE_HASH_H
src/inc/hashutil.h
0 → 100644
浏览文件 @
e9cc28ed
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HASHUTIL_H
#define TDENGINE_HASHUTIL_H
typedef
uint32_t
(
*
_hash_fn_t
)(
const
char
*
,
uint32_t
);
/**
* murmur hash algorithm
* @key usually string
* @len key length
* @seed hash seed
* @out an int32 value
*/
uint32_t
MurmurHash3_32
(
const
char
*
key
,
uint32_t
len
);
/**
*
* @param key
* @param len
* @return
*/
uint32_t
taosIntHash_32
(
const
char
*
key
,
uint32_t
len
);
uint32_t
taosIntHash_64
(
const
char
*
key
,
uint32_t
len
);
_hash_fn_t
taosGetDefaultHashFunction
(
int32_t
type
);
#endif //TDENGINE_HASHUTIL_H
src/system/detail/src/vnodeQueryImpl.c
浏览文件 @
e9cc28ed
...
...
@@ -17,6 +17,8 @@
#include "taosmsg.h"
#include "textbuffer.h"
#include "ttime.h"
#include "hash.h"
#include "hashutil.h"
#include "tinterpolation.h"
#include "tscJoinProcess.h"
...
...
@@ -52,8 +54,6 @@ enum {
static
int32_t
readDataFromDiskFile
(
int
fd
,
SQInfo
*
pQInfo
,
SQueryFilesInfo
*
pQueryFile
,
char
*
buf
,
uint64_t
offset
,
int32_t
size
);
//__read_data_fn_t readDataFunctor[2] = {copyDataFromMMapBuffer, readDataFromDiskFile};
static
void
vnodeInitLoadCompBlockInfo
(
SQueryLoadCompBlockInfo
*
pCompBlockLoadInfo
);
static
int32_t
moveToNextBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
step
,
__block_search_fn_t
searchFn
,
bool
loadData
);
...
...
@@ -163,6 +163,30 @@ bool isGroupbyNormalCol(SSqlGroupbyExpr *pGroupbyExpr) {
return
false
;
}
int16_t
getGroupbyColumnType
(
SQuery
*
pQuery
,
SSqlGroupbyExpr
*
pGroupbyExpr
)
{
assert
(
pGroupbyExpr
!=
NULL
);
int32_t
colId
=
-
2
;
int16_t
type
=
TSDB_DATA_TYPE_NULL
;
for
(
int32_t
i
=
0
;
i
<
pGroupbyExpr
->
numOfGroupCols
;
++
i
)
{
SColIndexEx
*
pColIndex
=
&
pGroupbyExpr
->
columnInfo
[
i
];
if
(
pColIndex
->
flag
==
TSDB_COL_NORMAL
)
{
colId
=
pColIndex
->
colId
;
break
;
}
}
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
if
(
colId
==
pQuery
->
colList
[
i
].
data
.
colId
)
{
type
=
pQuery
->
colList
[
i
].
data
.
type
;
break
;
}
}
return
type
;
}
bool
isSelectivityWithTagsQuery
(
SQuery
*
pQuery
)
{
bool
hasTags
=
false
;
int32_t
numOfSelectivity
=
0
;
...
...
@@ -1446,7 +1470,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *
return
true
;
}
static
int32_t
setGroupResultForKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
char
*
pData
,
int16_t
type
,
char
*
columnData
)
{
static
int32_t
setGroupResultForKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
)
{
SOutputRes
*
pOutputRes
=
NULL
;
// ignore the null value
...
...
@@ -1454,35 +1478,17 @@ static int32_t setGroupResultForKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData,
return
-
1
;
}
int64_t
t
=
0
;
switch
(
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
t
=
GET_INT8_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
t
=
GET_INT64_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
t
=
GET_INT16_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_INT
:
default:
t
=
GET_INT32_VAL
(
pData
);
break
;
}
SOutputRes
**
p1
=
(
SOutputRes
**
)
taosGetIntHashData
(
pRuntimeEnv
->
hashList
,
t
);
SOutputRes
**
p1
=
(
SOutputRes
**
)
taosGetDataFromHash
(
pRuntimeEnv
->
hashList
,
pData
,
bytes
);
if
(
p1
!=
NULL
)
{
pOutputRes
=
*
p1
;
}
else
{
// more than the threshold number, discard data that are not belong to current groups
}
else
{
// more than the threshold number, discard data that are not belong to current groups
if
(
pRuntimeEnv
->
usedIndex
>=
10000
)
{
return
-
1
;
}
// add a new result set for a new group
char
*
b
=
(
char
*
)
&
pRuntimeEnv
->
pResult
[
pRuntimeEnv
->
usedIndex
++
];
pOutputRes
=
*
(
SOutputRes
**
)
taosAddIntHash
(
pRuntimeEnv
->
hashList
,
t
,
(
char
*
)
&
b
);
pOutputRes
=
&
pRuntimeEnv
->
pResult
[
pRuntimeEnv
->
usedIndex
++
];
taosAddToHashTable
(
pRuntimeEnv
->
hashList
,
pData
,
bytes
,
(
char
*
)
&
pOutputRes
,
POINTER_BYTES
);
}
setGroupOutputBuffer
(
pRuntimeEnv
,
pOutputRes
);
...
...
@@ -1686,7 +1692,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
if
(
groupbyStateValue
)
{
char
*
stateVal
=
groupbyColumnData
+
bytes
*
offset
;
int32_t
ret
=
setGroupResultForKey
(
pRuntimeEnv
,
stateVal
,
type
,
groupbyColumnData
);
int32_t
ret
=
setGroupResultForKey
(
pRuntimeEnv
,
stateVal
,
type
,
bytes
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
continue
;
}
...
...
@@ -2229,7 +2235,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree
(
pRuntimeEnv
->
secondaryUnzipBuffer
);
taosCleanUp
IntHash
(
pRuntimeEnv
->
hashList
);
taosCleanUp
HashTable
(
pRuntimeEnv
->
hashList
);
if
(
pRuntimeEnv
->
pCtx
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pRuntimeEnv
->
pQuery
->
numOfOutputCols
;
++
i
)
{
...
...
@@ -3742,9 +3748,11 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
// check data in file or cache
bool
dataInCache
=
true
;
bool
dataInDisk
=
true
;
pSupporter
->
runtimeEnv
.
pQuery
=
pQuery
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pSupporter
->
runtimeEnv
;
pRuntimeEnv
->
pQuery
=
pQuery
;
vnodeCheckIfDataExists
(
&
pSupporter
->
r
untimeEnv
,
pMeterObj
,
&
dataInDisk
,
&
dataInCache
);
vnodeCheckIfDataExists
(
pR
untimeEnv
,
pMeterObj
,
&
dataInDisk
,
&
dataInCache
);
/* data in file or cache is not qualified for the query. abort */
if
(
!
(
dataInCache
||
dataInDisk
))
{
...
...
@@ -3755,11 +3763,11 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
return
TSDB_CODE_SUCCESS
;
}
p
Supporter
->
runtimeEnv
.
pTSBuf
=
param
;
p
Supporter
->
runtimeEnv
.
cur
.
vnodeIndex
=
-
1
;
p
RuntimeEnv
->
pTSBuf
=
param
;
p
RuntimeEnv
->
cur
.
vnodeIndex
=
-
1
;
if
(
param
!=
NULL
)
{
int16_t
order
=
(
pQuery
->
order
.
order
==
p
Supporter
->
runtimeEnv
.
pTSBuf
->
tsOrder
)
?
TSQL_SO_ASC
:
TSQL_SO_DESC
;
tsBufSetTraverseOrder
(
p
Supporter
->
runtimeEnv
.
pTSBuf
,
order
);
int16_t
order
=
(
pQuery
->
order
.
order
==
p
RuntimeEnv
->
pTSBuf
->
tsOrder
)
?
TSQL_SO_ASC
:
TSQL_SO_DESC
;
tsBufSetTraverseOrder
(
p
RuntimeEnv
->
pTSBuf
,
order
);
}
// create runtime environment
...
...
@@ -3775,9 +3783,13 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
return
ret
;
}
pSupporter
->
runtimeEnv
.
hashList
=
taosInitIntHash
(
10039
,
sizeof
(
void
*
),
taosHashInt
);
pSupporter
->
runtimeEnv
.
usedIndex
=
0
;
pSupporter
->
runtimeEnv
.
pResult
=
pSupporter
->
pResult
;
int16_t
type
=
getGroupbyColumnType
(
pQuery
,
pQuery
->
pGroupbyExpr
);
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
type
);
pRuntimeEnv
->
hashList
=
taosInitHashTable
(
10039
,
fn
,
false
);
pRuntimeEnv
->
usedIndex
=
0
;
pRuntimeEnv
->
pResult
=
pSupporter
->
pResult
;
}
// in case of last_row query, we set the query timestamp to pMeterObj->lastKey;
...
...
@@ -3820,7 +3832,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
int64_t
rs
=
taosGetIntervalStartTimestamp
(
pSupporter
->
rawSKey
,
pQuery
->
nAggTimeInterval
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
);
taosInitInterpoInfo
(
&
p
Supporter
->
runtimeEnv
.
interpoInfo
,
pQuery
->
order
.
order
,
rs
,
0
,
0
);
taosInitInterpoInfo
(
&
p
RuntimeEnv
->
interpoInfo
,
pQuery
->
order
.
order
,
rs
,
0
,
0
);
allocMemForInterpo
(
pSupporter
,
pQuery
,
pMeterObj
);
if
(
!
isPointInterpoQuery
(
pQuery
))
{
...
...
@@ -3843,9 +3855,9 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
teardownQueryRuntimeEnv
(
&
pSupporter
->
runtimeEnv
);
tfree
(
pSupporter
->
pMeterSidExtInfo
);
if
(
pSupporter
->
pMeter
Obj
!=
NULL
)
{
taosCleanUp
IntHash
(
pSupporter
->
pMeterObj
);
pSupporter
->
pMeter
Obj
=
NULL
;
if
(
pSupporter
->
pMeter
sHashTable
!=
NULL
)
{
taosCleanUp
HashTable
(
pSupporter
->
pMetersHashTable
);
pSupporter
->
pMeter
sHashTable
=
NULL
;
}
if
(
pSupporter
->
pSidSet
!=
NULL
||
isGroupbyNormalCol
(
pQInfo
->
query
.
pGroupbyExpr
))
{
...
...
@@ -3904,10 +3916,11 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
pQuery
->
pointsRead
=
0
;
changeExecuteScanOrder
(
pQuery
,
true
);
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pSupporter
->
runtimeEnv
;
doInitQueryFileInfoFD
(
&
p
Supporter
->
runtimeEnv
.
vnodeFileInfo
);
vnodeInitDataBlockInfo
(
&
p
Supporter
->
runtimeEnv
.
loadBlockInfo
);
vnodeInitLoadCompBlockInfo
(
&
p
Supporter
->
runtimeEnv
.
loadCompBlockInfo
);
doInitQueryFileInfoFD
(
&
p
RuntimeEnv
->
vnodeFileInfo
);
vnodeInitDataBlockInfo
(
&
p
RuntimeEnv
->
loadBlockInfo
);
vnodeInitLoadCompBlockInfo
(
&
p
RuntimeEnv
->
loadCompBlockInfo
);
/*
* since we employ the output control mechanism in main loop.
...
...
@@ -3929,15 +3942,15 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
}
// get one queried meter
SMeterObj
*
pMeter
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pSupporter
->
pSidSet
->
pSids
[
0
]
->
sid
);
SMeterObj
*
pMeter
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pSupporter
->
pSidSet
->
pSids
[
0
]
->
sid
);
p
Supporter
->
runtimeEnv
.
pTSBuf
=
param
;
p
Supporter
->
runtimeEnv
.
cur
.
vnodeIndex
=
-
1
;
p
RuntimeEnv
->
pTSBuf
=
param
;
p
RuntimeEnv
->
cur
.
vnodeIndex
=
-
1
;
// set the ts-comp file traverse order
if
(
param
!=
NULL
)
{
int16_t
order
=
(
pQuery
->
order
.
order
==
p
Supporter
->
runtimeEnv
.
pTSBuf
->
tsOrder
)
?
TSQL_SO_ASC
:
TSQL_SO_DESC
;
tsBufSetTraverseOrder
(
p
Supporter
->
runtimeEnv
.
pTSBuf
,
order
);
int16_t
order
=
(
pQuery
->
order
.
order
==
p
RuntimeEnv
->
pTSBuf
->
tsOrder
)
?
TSQL_SO_ASC
:
TSQL_SO_DESC
;
tsBufSetTraverseOrder
(
p
RuntimeEnv
->
pTSBuf
,
order
);
}
int32_t
ret
=
setupQueryRuntimeEnv
(
pMeter
,
pQuery
,
&
pSupporter
->
runtimeEnv
,
pTagSchema
,
TSQL_SO_ASC
,
true
);
...
...
@@ -3953,9 +3966,9 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
}
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
// group by columns not tags;
p
Supporter
->
runtimeEnv
.
hashList
=
taosInitIntHash
(
10039
,
sizeof
(
void
*
),
taosHashInt
);
p
Supporter
->
runtimeEnv
.
usedIndex
=
0
;
p
Supporter
->
runtimeEnv
.
pResult
=
pSupporter
->
pResult
;
p
RuntimeEnv
->
hashList
=
taosInitHashTable
(
10039
,
taosIntHash_64
,
false
);
p
RuntimeEnv
->
usedIndex
=
0
;
p
RuntimeEnv
->
pResult
=
pSupporter
->
pResult
;
}
if
(
pQuery
->
nAggTimeInterval
!=
0
)
{
...
...
@@ -3976,7 +3989,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
return
TSDB_CODE_SERV_NO_DISKSPACE
;
}
p
Supporter
->
runtimeEnv
.
numOfRowsPerPage
=
(
DEFAULT_INTERN_BUF_SIZE
-
sizeof
(
tFilePage
))
/
pQuery
->
rowSize
;
p
RuntimeEnv
->
numOfRowsPerPage
=
(
DEFAULT_INTERN_BUF_SIZE
-
sizeof
(
tFilePage
))
/
pQuery
->
rowSize
;
pSupporter
->
lastPageId
=
-
1
;
pSupporter
->
bufSize
=
pSupporter
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
;
...
...
@@ -3995,7 +4008,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
TSKEY
revisedStime
=
taosGetIntervalStartTimestamp
(
pSupporter
->
rawSKey
,
pQuery
->
nAggTimeInterval
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
);
taosInitInterpoInfo
(
&
p
Supporter
->
runtimeEnv
.
interpoInfo
,
pQuery
->
order
.
order
,
revisedStime
,
0
,
0
);
taosInitInterpoInfo
(
&
p
RuntimeEnv
->
interpoInfo
,
pQuery
->
order
.
order
,
revisedStime
,
0
,
0
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4014,7 +4027,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
}
else
{
int32_t
num
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSupporter
->
numOfMeters
;
++
i
)
{
SMeterObj
*
pMeter
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pSupporter
->
pSidSet
->
pSids
[
i
]
->
sid
);
SMeterObj
*
pMeter
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pSupporter
->
pSidSet
->
pSids
[
i
]
->
sid
);
atomic_fetch_sub_32
(
&
(
pMeter
->
numOfQueries
),
1
);
if
(
pMeter
->
numOfQueries
>
0
)
{
...
...
@@ -5060,7 +5073,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
if
(
pMeterInfo
[
i
].
pMeterQInfo
!=
NULL
&&
pMeterInfo
[
i
].
pMeterQInfo
->
lastResRows
>
0
)
{
int32_t
index
=
pMeterInfo
[
i
].
meterOrderIdx
;
pRuntimeEnv
->
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pSupporter
->
pSidSet
->
pSids
[
index
]
->
sid
);
pRuntimeEnv
->
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pSupporter
->
pSidSet
->
pSids
[
index
]
->
sid
);
assert
(
pRuntimeEnv
->
pMeterObj
==
pMeterInfo
[
i
].
pMeterObj
);
int32_t
ret
=
setIntervalQueryExecutionContext
(
pSupporter
,
i
,
pMeterInfo
[
i
].
pMeterQInfo
);
...
...
@@ -5666,7 +5679,7 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet
TSKEY
skey
,
ekey
;
for
(
int32_t
i
=
0
;
i
<
pSidSet
->
numOfSids
;
++
i
)
{
// load all meter meta info
SMeterObj
*
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pMeterSidExtInfo
[
i
]
->
sid
);
SMeterObj
*
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pMeterSidExtInfo
[
i
]
->
sid
);
if
(
pMeterObj
==
NULL
)
{
dError
(
"QInfo:%p failed to find required sid:%d"
,
pQInfo
,
pMeterSidExtInfo
[
i
]
->
sid
);
continue
;
...
...
src/system/detail/src/vnodeQueryProcess.c
浏览文件 @
e9cc28ed
...
...
@@ -92,7 +92,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
SMeterSidExtInfo
**
pMeterSidExtInfo
=
pSupporter
->
pMeterSidExtInfo
;
SMeterObj
*
pTempMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pMeterSidExtInfo
[
0
]
->
sid
);
SMeterObj
*
pTempMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pMeterSidExtInfo
[
0
]
->
sid
);
assert
(
pTempMeterObj
!=
NULL
);
__block_search_fn_t
searchFn
=
vnodeSearchKeyFunc
[
pTempMeterObj
->
searchAlgorithm
];
...
...
@@ -111,7 +111,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
}
for
(
int32_t
k
=
start
;
k
<=
end
;
++
k
)
{
SMeterObj
*
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pMeterSidExtInfo
[
k
]
->
sid
);
SMeterObj
*
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pMeterSidExtInfo
[
k
]
->
sid
);
if
(
pMeterObj
==
NULL
)
{
dError
(
"QInfo:%p failed to find meterId:%d, continue"
,
pQInfo
,
pMeterSidExtInfo
[
k
]
->
sid
);
continue
;
...
...
@@ -266,7 +266,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
SMeterDataBlockInfoEx
*
pDataBlockInfoEx
=
NULL
;
int32_t
nAllocBlocksInfoSize
=
0
;
SMeterObj
*
pTempMeter
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pSupporter
->
pMeterSidExtInfo
[
0
]
->
sid
);
SMeterObj
*
pTempMeter
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pSupporter
->
pMeterSidExtInfo
[
0
]
->
sid
);
__block_search_fn_t
searchFn
=
vnodeSearchKeyFunc
[
pTempMeter
->
searchAlgorithm
];
int32_t
vnodeId
=
pTempMeter
->
vnode
;
...
...
@@ -475,7 +475,7 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
SMeterObj
*
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pMeterSidExtInfo
[
index
]
->
sid
);
SMeterObj
*
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pMeterSidExtInfo
[
index
]
->
sid
);
if
(
pMeterObj
==
NULL
)
{
dError
(
"QInfo:%p do not find required meter id: %d, all meterObjs id is:"
,
pQInfo
,
pMeterSidExtInfo
[
index
]
->
sid
);
return
false
;
...
...
@@ -576,7 +576,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
SQuery
*
pQuery
=
&
pQInfo
->
query
;
tSidSet
*
pSids
=
pSupporter
->
pSidSet
;
SMeterObj
*
pOneMeter
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pMeterSidExtInfo
[
0
]
->
sid
);
SMeterObj
*
pOneMeter
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pMeterSidExtInfo
[
0
]
->
sid
);
resetCtxOutputBuf
(
pRuntimeEnv
);
...
...
@@ -604,7 +604,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
}
// get the last key of meters that belongs to this group
SMeterObj
*
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
Obj
,
pMeterSidExtInfo
[
k
]
->
sid
);
SMeterObj
*
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeter
sHashTable
,
pMeterSidExtInfo
[
k
]
->
sid
);
if
(
pMeterObj
!=
NULL
)
{
if
(
key
<
pMeterObj
->
lastKey
)
{
key
=
pMeterObj
->
lastKey
;
...
...
@@ -674,10 +674,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
}
pRuntimeEnv
->
usedIndex
=
0
;
taosCleanUp
IntHash
(
pRuntimeEnv
->
hashList
);
taosCleanUp
HashTable
(
pRuntimeEnv
->
hashList
);
int32_t
primeHashSlot
=
10039
;
pRuntimeEnv
->
hashList
=
taosInit
IntHash
(
primeHashSlot
,
POINTER_BYTES
,
taosHashInt
);
pRuntimeEnv
->
hashList
=
taosInit
HashTable
(
primeHashSlot
,
taosIntHash_32
,
false
);
while
(
pSupporter
->
meterIdx
<
pSupporter
->
numOfMeters
)
{
int32_t
k
=
pSupporter
->
meterIdx
;
...
...
src/system/detail/src/vnodeRead.c
浏览文件 @
e9cc28ed
...
...
@@ -25,6 +25,8 @@
#include "vnode.h"
#include "vnodeRead.h"
#include "vnodeUtil.h"
#include "hash.h"
#include "hashutil.h"
int
(
*
pQueryFunc
[])(
SMeterObj
*
,
SQuery
*
)
=
{
vnodeQueryFromCache
,
vnodeQueryFromFile
};
...
...
@@ -655,8 +657,9 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
SMeterQuerySupportObj
*
pSupporter
=
(
SMeterQuerySupportObj
*
)
calloc
(
1
,
sizeof
(
SMeterQuerySupportObj
));
pSupporter
->
numOfMeters
=
1
;
pSupporter
->
pMeterObj
=
taosInitIntHash
(
pSupporter
->
numOfMeters
,
POINTER_BYTES
,
taosHashInt
);
taosAddIntHash
(
pSupporter
->
pMeterObj
,
pMetersObj
[
0
]
->
sid
,
(
char
*
)
&
pMetersObj
[
0
]);
pSupporter
->
pMetersHashTable
=
taosInitHashTable
(
pSupporter
->
numOfMeters
,
taosIntHash_32
,
false
);
taosAddToHashTable
(
pSupporter
->
pMetersHashTable
,
(
const
char
*
)
&
pMetersObj
[
0
]
->
sid
,
sizeof
(
pMeterObj
[
0
].
sid
),
(
char
*
)
&
pMetersObj
[
0
],
POINTER_BYTES
);
pSupporter
->
pSidSet
=
NULL
;
pSupporter
->
subgroupIdx
=
-
1
;
...
...
@@ -748,9 +751,10 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
SMeterQuerySupportObj
*
pSupporter
=
(
SMeterQuerySupportObj
*
)
calloc
(
1
,
sizeof
(
SMeterQuerySupportObj
));
pSupporter
->
numOfMeters
=
pQueryMsg
->
numOfSids
;
pSupporter
->
pMeter
Obj
=
taosInitIntHash
(
pSupporter
->
numOfMeters
,
POINTER_BYTES
,
taosHashInt
);
pSupporter
->
pMeter
sHashTable
=
taosInitHashTable
(
pSupporter
->
numOfMeters
,
taosIntHash_32
,
false
);
for
(
int32_t
i
=
0
;
i
<
pSupporter
->
numOfMeters
;
++
i
)
{
taosAddIntHash
(
pSupporter
->
pMeterObj
,
pMetersObj
[
i
]
->
sid
,
(
char
*
)
&
pMetersObj
[
i
]);
taosAddToHashTable
(
pSupporter
->
pMetersHashTable
,
(
const
char
*
)
&
pMetersObj
[
i
]
->
sid
,
sizeof
(
pMetersObj
[
i
]
->
sid
),
(
char
*
)
&
pMetersObj
[
i
],
POINTER_BYTES
);
}
pSupporter
->
pMeterSidExtInfo
=
(
SMeterSidExtInfo
**
)
pQueryMsg
->
pSidExtInfo
;
...
...
src/system/detail/src/vnodeShell.c
浏览文件 @
e9cc28ed
...
...
@@ -472,7 +472,8 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
assert
(
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
);
if
(
numOfRows
==
0
&&
(
pRetrieve
->
qhandle
==
(
uint64_t
)
pObj
->
qhandle
)
&&
(
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
))
{
if
(
numOfRows
==
0
&&
(
pRetrieve
->
qhandle
==
(
uint64_t
)
pObj
->
qhandle
)
&&
(
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
&&
pRetrieve
->
qhandle
!=
0
)
{
dTrace
(
"QInfo:%p %s free qhandle code:%d"
,
pObj
->
qhandle
,
__FUNCTION__
,
code
);
vnodeDecRefCount
(
pObj
->
qhandle
);
pObj
->
qhandle
=
NULL
;
...
...
src/util/src/hash.c
0 → 100644
浏览文件 @
e9cc28ed
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "hash.h"
#include "tlog.h"
#include "ttime.h"
#include "tutil.h"
static
FORCE_INLINE
void
__wr_lock
(
void
*
lock
)
{
#if defined LINUX
pthread_rwlock_wrlock
(
lock
);
#else
pthread_mutex_lock
(
lock
);
#endif
}
static
FORCE_INLINE
void
__rd_lock
(
void
*
lock
)
{
#if defined LINUX
pthread_rwlock_rdlock
(
lock
);
#else
pthread_mutex_lock
(
lock
);
#endif
}
static
FORCE_INLINE
void
__unlock
(
void
*
lock
)
{
#if defined LINUX
pthread_rwlock_unlock
(
lock
);
#else
pthread_mutex_unlock
(
lock
);
#endif
}
static
FORCE_INLINE
int32_t
__lock_init
(
void
*
lock
)
{
#if defined LINUX
return
pthread_rwlock_init
(
lock
,
NULL
);
#else
return
pthread_mutex_init
(
lock
,
NULL
);
#endif
}
static
FORCE_INLINE
void
__lock_destroy
(
void
*
lock
)
{
#if defined LINUX
pthread_rwlock_destroy
(
lock
);
#else
pthread_mutex_destroy
(
lock
);
#endif
}
static
FORCE_INLINE
int32_t
taosHashCapacity
(
int32_t
length
)
{
int32_t
len
=
MIN
(
length
,
HASH_MAX_CAPACITY
);
uint32_t
i
=
4
;
while
(
i
<
len
)
i
=
(
i
<<
1U
);
return
i
;
}
/**
* hash key function
*
* @param key key string
* @param len length of key
* @return hash value
*/
static
FORCE_INLINE
uint32_t
taosHashKey
(
const
char
*
key
,
uint32_t
len
)
{
return
MurmurHash3_32
(
key
,
len
);
}
/**
* inplace update node in hash table
* @param pObj hash table object
* @param pNode data node
*/
static
void
doUpdateHashTable
(
HashObj
*
pObj
,
SHashNode
*
pNode
)
{
if
(
pNode
->
prev1
)
{
pNode
->
prev1
->
next
=
pNode
;
}
if
(
pNode
->
next
)
{
(
pNode
->
next
)
->
prev
=
pNode
;
}
pTrace
(
"key:%s %p update hash table"
,
pNode
->
key
,
pNode
);
}
/**
* get SHashNode from hashlist, nodes from trash are not included.
* @param pObj Cache objection
* @param key key for hash
* @param keyLen key length
* @return
*/
static
SHashNode
*
doGetNodeFromHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
,
uint32_t
*
hashVal
)
{
uint32_t
hash
=
(
*
pObj
->
hashFp
)(
key
,
keyLen
);
int32_t
slot
=
HASH_INDEX
(
hash
,
pObj
->
capacity
);
SHashEntry
*
pEntry
=
pObj
->
hashList
[
slot
];
SHashNode
*
pNode
=
pEntry
->
next
;
while
(
pNode
)
{
if
((
pNode
->
keyLen
==
keyLen
)
&&
(
memcmp
(
pNode
->
key
,
key
,
keyLen
)
==
0
))
{
break
;
}
pNode
=
pNode
->
next
;
}
if
(
pNode
)
{
assert
(
HASH_INDEX
(
pNode
->
hashVal
,
pObj
->
capacity
)
==
slot
);
}
// return the calculated hash value, to avoid calculating it again in other functions
if
(
hashVal
!=
NULL
)
{
*
hashVal
=
hash
;
}
return
pNode
;
}
/**
* resize the hash list if the threshold is reached
*
* @param pObj
*/
static
void
taosHashTableResize
(
HashObj
*
pObj
)
{
if
(
pObj
->
size
<
pObj
->
capacity
*
HASH_DEFAULT_LOAD_FACTOR
)
{
return
;
}
// double the original capacity
SHashNode
*
pNode
=
NULL
;
SHashNode
*
pNext
=
NULL
;
int32_t
newSize
=
pObj
->
capacity
<<
1U
;
if
(
newSize
>
HASH_MAX_CAPACITY
)
{
pTrace
(
"current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached"
,
pObj
->
capacity
,
HASH_MAX_CAPACITY
);
return
;
}
int64_t
st
=
taosGetTimestampUs
();
SHashEntry
**
pNewEntry
=
realloc
(
pObj
->
hashList
,
sizeof
(
SHashEntry
*
)
*
newSize
);
if
(
pNewEntry
==
NULL
)
{
pTrace
(
"cache resize failed due to out of memory, capacity remain:%d"
,
pObj
->
capacity
);
return
;
}
pObj
->
hashList
=
pNewEntry
;
for
(
int32_t
i
=
pObj
->
capacity
;
i
<
newSize
;
++
i
)
{
pObj
->
hashList
[
i
]
=
calloc
(
1
,
sizeof
(
SHashEntry
));
}
pObj
->
capacity
=
newSize
;
for
(
int32_t
i
=
0
;
i
<
pObj
->
capacity
;
++
i
)
{
SHashEntry
*
pEntry
=
pObj
->
hashList
[
i
];
pNode
=
pEntry
->
next
;
if
(
pNode
!=
NULL
)
{
assert
(
pNode
->
prev1
==
pEntry
&&
pEntry
->
num
>
0
);
}
while
(
pNode
)
{
int32_t
j
=
HASH_INDEX
(
pNode
->
hashVal
,
pObj
->
capacity
);
if
(
j
==
i
)
{
// this key resides in the same slot, no need to relocate it
pNode
=
pNode
->
next
;
}
else
{
pNext
=
pNode
->
next
;
// remove from current slot
assert
(
pNode
->
prev1
!=
NULL
);
if
(
pNode
->
prev1
==
pEntry
)
{
// first node of the overflow linked list
pEntry
->
next
=
pNode
->
next
;
}
else
{
pNode
->
prev
->
next
=
pNode
->
next
;
}
pEntry
->
num
--
;
assert
(
pEntry
->
num
>=
0
);
if
(
pNode
->
next
!=
NULL
)
{
(
pNode
->
next
)
->
prev
=
pNode
->
prev
;
}
// added into new slot
pNode
->
next
=
NULL
;
pNode
->
prev1
=
NULL
;
SHashEntry
*
pNewIndexEntry
=
pObj
->
hashList
[
j
];
if
(
pNewIndexEntry
->
next
!=
NULL
)
{
assert
(
pNewIndexEntry
->
next
->
prev1
==
pNewIndexEntry
);
pNewIndexEntry
->
next
->
prev
=
pNode
;
}
pNode
->
next
=
pNewIndexEntry
->
next
;
pNode
->
prev1
=
pNewIndexEntry
;
pNewIndexEntry
->
next
=
pNode
;
pNewIndexEntry
->
num
++
;
// continue
pNode
=
pNext
;
}
}
}
int64_t
et
=
taosGetTimestampUs
();
pTrace
(
"hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms"
,
pObj
->
capacity
,
((
double
)
pObj
->
size
)
/
pObj
->
capacity
,
(
et
-
st
)
/
1000
.
0
);
}
/**
* @param capacity maximum slots available for hash elements
* @param fn hash function
* @return
*/
void
*
taosInitHashTable
(
uint32_t
capacity
,
_hash_fn_t
fn
,
bool
multithreadSafe
)
{
if
(
capacity
==
0
||
fn
==
NULL
)
{
return
NULL
;
}
HashObj
*
pObj
=
(
HashObj
*
)
calloc
(
1
,
sizeof
(
HashObj
));
if
(
pObj
==
NULL
)
{
pError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
// the max slots is not defined by user
pObj
->
capacity
=
taosHashCapacity
(
capacity
);
assert
((
pObj
->
capacity
&
(
pObj
->
capacity
-
1
))
==
0
);
pObj
->
hashFp
=
fn
;
pObj
->
hashList
=
(
SHashEntry
**
)
calloc
(
pObj
->
capacity
,
sizeof
(
SHashEntry
*
));
if
(
pObj
->
hashList
==
NULL
)
{
free
(
pObj
);
pError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
pObj
->
capacity
;
++
i
)
{
pObj
->
hashList
[
i
]
=
calloc
(
1
,
sizeof
(
SHashEntry
));
}
if
(
multithreadSafe
&&
(
__lock_init
(
pObj
)
!=
0
))
{
free
(
pObj
->
hashList
);
free
(
pObj
);
pError
(
"failed to init lock, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
return
(
void
*
)
pObj
;
}
/**
* @param key key of object for hash, usually a null-terminated string
* @param keyLen length of key
* @param pData actually data. required a consecutive memory block, no pointer is allowed
* in pData. Pointer copy causes memory access error.
* @param size size of block
* @return SHashNode
*/
static
SHashNode
*
doCreateHashNode
(
const
char
*
key
,
uint32_t
keyLen
,
const
char
*
pData
,
size_t
dataSize
,
uint32_t
hashVal
)
{
size_t
totalSize
=
dataSize
+
sizeof
(
SHashNode
)
+
keyLen
;
SHashNode
*
pNewNode
=
calloc
(
1
,
totalSize
);
if
(
pNewNode
==
NULL
)
{
pError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
memcpy
(
pNewNode
->
data
,
pData
,
dataSize
);
pNewNode
->
key
=
pNewNode
->
data
+
dataSize
;
memcpy
(
pNewNode
->
key
,
key
,
keyLen
);
pNewNode
->
keyLen
=
keyLen
;
pNewNode
->
hashVal
=
hashVal
;
return
pNewNode
;
}
static
SHashNode
*
doUpdateHashNode
(
SHashNode
*
pNode
,
const
char
*
key
,
uint32_t
keyLen
,
const
char
*
pData
,
size_t
dataSize
)
{
size_t
size
=
dataSize
+
sizeof
(
SHashNode
)
+
keyLen
;
SHashNode
*
pNewNode
=
(
SHashNode
*
)
realloc
(
pNode
,
size
);
if
(
pNewNode
==
NULL
)
{
return
NULL
;
}
memcpy
(
pNewNode
->
data
,
pData
,
dataSize
);
pNewNode
->
key
=
pNewNode
->
data
+
dataSize
;
assert
(
memcmp
(
pNewNode
->
key
,
key
,
keyLen
)
==
0
&&
keyLen
==
pNewNode
->
keyLen
);
memcpy
(
pNewNode
->
key
,
key
,
keyLen
);
return
pNewNode
;
}
/**
* insert the hash node at the front of the linked list
*
* @param pObj
* @param pNode
*/
static
void
doAddToHashTable
(
HashObj
*
pObj
,
SHashNode
*
pNode
)
{
assert
(
pNode
!=
NULL
);
int32_t
index
=
HASH_INDEX
(
pNode
->
hashVal
,
pObj
->
capacity
);
SHashEntry
*
pEntry
=
pObj
->
hashList
[
index
];
pNode
->
next
=
pEntry
->
next
;
if
(
pEntry
->
next
)
{
pEntry
->
next
->
prev
=
pNode
;
}
pEntry
->
next
=
pNode
;
pNode
->
prev1
=
pEntry
;
pEntry
->
num
++
;
pObj
->
size
++
;
char
key
[
512
]
=
{
0
};
memcpy
(
key
,
pNode
->
key
,
MIN
(
512
,
pNode
->
keyLen
));
pTrace
(
"key:%s %p add to hash table"
,
key
,
pNode
);
}
/**
* add data node into hash table
* @param pObj hash object
* @param pNode hash node
*/
int32_t
taosAddToHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
,
void
*
data
,
uint32_t
size
)
{
if
(
pObj
->
multithreadSafe
)
{
__wr_lock
(
&
pObj
->
lock
);
}
uint32_t
hashVal
=
0
;
SHashNode
*
pNode
=
doGetNodeFromHashTable
(
pObj
,
key
,
keyLen
,
&
hashVal
);
if
(
pNode
==
NULL
)
{
// no data in hash table with the specified key, add it into hash table
taosHashTableResize
(
pObj
);
SHashNode
*
pNewNode
=
doCreateHashNode
(
key
,
keyLen
,
data
,
size
,
hashVal
);
if
(
pNewNode
==
NULL
)
{
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
return
-
1
;
}
doAddToHashTable
(
pObj
,
pNewNode
);
}
else
{
SHashNode
*
pNewNode
=
doUpdateHashNode
(
pNode
,
key
,
keyLen
,
data
,
size
);
if
(
pNewNode
==
NULL
)
{
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
return
-
1
;
}
doUpdateHashTable
(
pObj
,
pNewNode
);
}
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
return
0
;
}
char
*
taosGetDataFromHash
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
)
{
if
(
pObj
->
multithreadSafe
)
{
__rd_lock
(
&
pObj
->
lock
);
}
uint32_t
hashVal
=
0
;
SHashNode
*
pNode
=
doGetNodeFromHashTable
(
pObj
,
key
,
keyLen
,
&
hashVal
);
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
if
(
pNode
!=
NULL
)
{
assert
(
pNode
->
hashVal
==
hashVal
);
return
pNode
->
data
;
}
else
{
return
NULL
;
}
}
/**
* remove node in hash list
* @param pObj
* @param pNode
*/
void
taosDeleteFromHashTable
(
HashObj
*
pObj
,
const
char
*
key
,
uint32_t
keyLen
)
{
if
(
pObj
->
multithreadSafe
)
{
__wr_lock
(
&
pObj
->
lock
);
}
SHashNode
*
pNode
=
doGetNodeFromHashTable
(
pObj
,
key
,
keyLen
,
NULL
);
if
(
pNode
==
NULL
)
{
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
return
;
}
SHashNode
*
pNext
=
pNode
->
next
;
if
(
pNode
->
prev
!=
NULL
)
{
pNode
->
prev
->
next
=
pNext
;
}
if
(
pNext
!=
NULL
)
{
pNext
->
prev
=
pNode
->
prev
;
}
uint32_t
index
=
HASH_INDEX
(
pNode
->
hashVal
,
pObj
->
capacity
);
SHashEntry
*
pEntry
=
pObj
->
hashList
[
index
];
pEntry
->
num
--
;
pObj
->
size
--
;
pNode
->
next
=
NULL
;
pNode
->
prev
=
NULL
;
pTrace
(
"key:%s %p remove from hash table"
,
pNode
->
key
,
pNode
);
tfree
(
pNode
);
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
}
}
void
taosCleanUpHashTable
(
void
*
handle
)
{
HashObj
*
pObj
=
(
HashObj
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
capacity
<=
0
)
return
;
SHashNode
*
pNode
,
*
pNext
;
if
(
pObj
->
multithreadSafe
)
{
__wr_lock
(
&
pObj
->
lock
);
}
if
(
pObj
->
hashList
)
{
for
(
int32_t
i
=
0
;
i
<
pObj
->
capacity
;
++
i
)
{
SHashEntry
*
pEntry
=
pObj
->
hashList
[
i
];
pNode
=
pEntry
->
next
;
while
(
pNode
)
{
pNext
=
pNode
->
next
;
free
(
pNode
);
pNode
=
pNext
;
}
}
free
(
pObj
->
hashList
);
}
if
(
pObj
->
multithreadSafe
)
{
__unlock
(
&
pObj
->
lock
);
__lock_destroy
(
&
pObj
->
lock
);
}
memset
(
pObj
,
0
,
sizeof
(
HashObj
));
free
(
pObj
);
}
// for profile only
int32_t
taosGetHashMaxOverflowLength
(
HashObj
*
pObj
)
{
if
(
pObj
==
NULL
||
pObj
->
size
==
0
)
{
return
0
;
}
int32_t
num
=
0
;
for
(
int32_t
i
=
0
;
i
<
pObj
->
size
;
++
i
)
{
SHashEntry
*
pEntry
=
pObj
->
hashList
[
i
];
if
(
num
<
pEntry
->
num
)
{
num
=
pEntry
->
num
;
}
}
return
num
;
}
int32_t
taosCheckHashTable
(
HashObj
*
pObj
)
{
for
(
int32_t
i
=
0
;
i
<
pObj
->
capacity
;
++
i
)
{
SHashEntry
*
pEntry
=
pObj
->
hashList
[
i
];
SHashNode
*
pNode
=
pEntry
->
next
;
if
(
pNode
!=
NULL
)
{
assert
(
pEntry
==
pNode
->
prev1
);
int32_t
num
=
1
;
SHashNode
*
pNext
=
pNode
->
next
;
while
(
pNext
)
{
assert
(
pNext
->
prev
==
pNode
);
pNode
=
pNext
;
pNext
=
pNext
->
next
;
num
++
;
}
assert
(
num
==
pEntry
->
num
);
}
}
}
src/util/src/thashutil.c
浏览文件 @
e9cc28ed
...
...
@@ -8,6 +8,7 @@
*
*/
#include "tutil.h"
#include "hashutil.h"
#define ROTL32(x, r) ((x) << (r) | (x) >> (32 - (r)))
...
...
@@ -67,7 +68,7 @@ static void MurmurHash3_32_s(const void *key, int len, uint32_t seed, void *out)
*
(
uint32_t
*
)
out
=
h1
;
}
uint32_t
MurmurHash3_32
(
const
void
*
key
,
in
t
len
)
{
uint32_t
MurmurHash3_32
(
const
char
*
key
,
uint32_
t
len
)
{
const
int32_t
hashSeed
=
0x12345678
;
uint32_t
val
=
0
;
...
...
@@ -75,3 +76,26 @@ uint32_t MurmurHash3_32(const void *key, int len) {
return
val
;
}
uint32_t
taosIntHash_32
(
const
char
*
key
,
uint32_t
UNUSED_PARAM
(
len
))
{
return
*
(
uint32_t
*
)
key
;
}
uint32_t
taosIntHash_64
(
const
char
*
key
,
uint32_t
UNUSED_PARAM
(
len
))
{
uint64_t
val
=
*
(
uint64_t
*
)
key
;
uint64_t
hash
=
val
>>
16U
;
hash
+=
(
val
&
0xFFFFU
);
return
hash
;
}
_hash_fn_t
taosGetDefaultHashFunction
(
int32_t
type
)
{
_hash_fn_t
fn
=
NULL
;
switch
(
type
)
{
case
TSDB_DATA_TYPE_BIGINT
:
fn
=
taosIntHash_64
;
break
;
case
TSDB_DATA_TYPE_BINARY
:
fn
=
MurmurHash3_32
;
break
;
case
TSDB_DATA_TYPE_INT
:
fn
=
taosIntHash_32
;
break
;
default:
fn
=
taosIntHash_32
;
break
;
}
return
fn
;
}
\ No newline at end of file
src/util/src/ttokenizer.c
浏览文件 @
e9cc28ed
...
...
@@ -14,11 +14,13 @@
*/
#include "os.h"
#include "hashutil.h"
#include "shash.h"
#include "tutil.h"
#include "tsqldef.h"
#include "tstoken.h"
#include "ttypes.h"
#include "hash.h"
// All the keywords of the SQL language are stored in a hash table
typedef
struct
SKeyword
{
...
...
@@ -225,11 +227,9 @@ static SKeyword keywordTable[] = {
{
"STABLE"
,
TK_STABLE
},
{
"FILE"
,
TK_FILE
},
{
"VNODES"
,
TK_VNODES
},
{
"UNION"
,
TK_UNION
},
};
/* This is the hash table */
static
pthread_mutex_t
mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
const
char
isIdChar
[]
=
{
/* x0 x1 x2 x3 x4 x5 x6 x7 x8 x9 xA xB xC xD xE xF */
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
/* 0x */
...
...
@@ -243,27 +243,22 @@ static const char isIdChar[] = {
};
static
void
*
KeywordHashTable
=
NULL
;
int
tSQLKeywordCode
(
const
char
*
z
,
int
n
)
{
int
i
;
static
char
needInit
=
1
;
if
(
needInit
)
{
// Initialize the keyword hash table
pthread_mutex_lock
(
&
mutex
);
// double check
if
(
needInit
)
{
int
nk
=
tListLen
(
keywordTable
);
KeywordHashTable
=
taosInitStrHash
(
nk
,
POINTER_BYTES
,
taosHashStringStep1
);
for
(
i
=
0
;
i
<
nk
;
i
++
)
{
keywordTable
[
i
].
len
=
strlen
(
keywordTable
[
i
].
name
);
void
*
ptr
=
&
keywordTable
[
i
];
taosAddStrHash
(
KeywordHashTable
,
(
char
*
)
keywordTable
[
i
].
name
,
(
void
*
)
&
ptr
);
}
needInit
=
0
;
}
pthread_mutex_unlock
(
&
mutex
);
static
void
doInitKeywordsTable
()
{
int
numOfEntries
=
tListLen
(
keywordTable
);
KeywordHashTable
=
taosInitHashTable
(
numOfEntries
,
MurmurHash3_32
,
false
);
for
(
int32_t
i
=
0
;
i
<
numOfEntries
;
i
++
)
{
keywordTable
[
i
].
len
=
strlen
(
keywordTable
[
i
].
name
);
void
*
ptr
=
&
keywordTable
[
i
];
taosAddToHashTable
(
KeywordHashTable
,
keywordTable
[
i
].
name
,
keywordTable
[
i
].
len
,
(
void
*
)
&
ptr
,
POINTER_BYTES
);
}
}
static
pthread_once_t
keywordsHashTableInit
=
PTHREAD_ONCE_INIT
;
int
tSQLKeywordCode
(
const
char
*
z
,
int
n
)
{
pthread_once
(
&
keywordsHashTableInit
,
doInitKeywordsTable
);
char
key
[
128
]
=
{
0
};
for
(
int32_t
j
=
0
;
j
<
n
;
++
j
)
{
...
...
@@ -274,7 +269,7 @@ int tSQLKeywordCode(const char* z, int n) {
}
}
SKeyword
**
pKey
=
(
SKeyword
**
)
taosGet
StrHashData
(
KeywordHashTable
,
key
);
SKeyword
**
pKey
=
(
SKeyword
**
)
taosGet
DataFromHash
(
KeywordHashTable
,
key
,
n
);
if
(
pKey
!=
NULL
)
{
return
(
*
pKey
)
->
type
;
}
else
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录