Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ae7550eb
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ae7550eb
编写于
8月 12, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: rsma exec in async mode
上级
46a6b377
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
271 addition
and
154 deletion
+271
-154
include/common/tmsg.h
include/common/tmsg.h
+4
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-0
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+7
-5
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-1
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+1
-4
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+66
-14
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+0
-26
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+19
-0
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+3
-1
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+2
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/inc/tsimplehash.h
source/libs/executor/inc/tsimplehash.h
+30
-12
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+5
-4
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+12
-10
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-2
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+29
-25
source/libs/executor/src/tsimplehash.c
source/libs/executor/src/tsimplehash.c
+80
-42
source/libs/executor/test/tSimpleHashTests.cpp
source/libs/executor/test/tSimpleHashTests.cpp
+8
-6
未找到文件。
include/common/tmsg.h
浏览文件 @
ae7550eb
...
...
@@ -2658,6 +2658,10 @@ typedef struct {
SEpSet
epSet
;
}
SVgEpSet
;
typedef
struct
{
int32_t
padding
;
}
SRSmaExecMsg
;
typedef
struct
{
int64_t
suid
;
int8_t
level
;
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
ae7550eb
...
...
@@ -63,6 +63,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t
vnodeProcessCreateTSma
(
SVnode
*
pVnode
,
void
*
pCont
,
uint32_t
contLen
);
int32_t
vnodeGetAllTableList
(
SVnode
*
pVnode
,
uint64_t
uid
,
SArray
*
list
);
int32_t
vnodeGetCtbIdList
(
SVnode
*
pVnode
,
int64_t
suid
,
SArray
*
list
);
int32_t
vnodeGetStbIdList
(
SVnode
*
pVnode
,
int64_t
suid
,
SArray
*
list
);
void
*
vnodeGetIdx
(
SVnode
*
pVnode
);
void
*
vnodeGetIvtIdx
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
ae7550eb
...
...
@@ -122,17 +122,19 @@ struct SRSmaInfoItem {
};
struct
SRSmaInfo
{
STSchema
*
pTSchema
;
int64_t
suid
;
int64_t
refId
;
// refId of SRSmaStat
int8_t
delFlag
;
STSchema
*
pTSchema
;
STaosQueue
*
queue
;
// buffer queue of SubmitReq
STaosQall
*
qall
;
int64_t
suid
;
int64_t
refId
;
// refId of SRSmaStat
int8_t
delFlag
;
T_REF_DECLARE
()
SRSmaInfoItem
items
[
TSDB_RETENTION_L2
];
void
*
taskInfo
[
TSDB_RETENTION_L2
];
// qTaskInfo_t
void
*
iTaskInfo
[
TSDB_RETENTION_L2
];
// immutable
};
#define RSMA_INFO_HEAD_LEN
32
#define RSMA_INFO_HEAD_LEN
offsetof(SRSmaInfo, items)
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
ae7550eb
...
...
@@ -169,7 +169,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t
tqProcessTaskRecoverRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRetrieveReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRetrieveRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tsdbGetStbIdList
(
SMeta
*
pMeta
,
int64_t
suid
,
SArray
*
list
);
SSubmitReq
*
tdBlockToSubmit
(
SVnode
*
pVnode
,
const
SArray
*
pBlocks
,
const
STSchema
*
pSchema
,
bool
createTb
,
int64_t
suid
,
const
char
*
stbFullName
,
int32_t
vgId
,
SBatchDeleteReq
*
pDeleteReq
);
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
ae7550eb
...
...
@@ -264,8 +264,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
atomic_store_8
(
RSMA_TRIGGER_STAT
(
pStat
),
TASK_TRIGGER_STAT_CANCELLED
);
// step 2: destroy the rsma info and associated fetch tasks
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
#if 1
if
(
taosHashGetSize
(
RSMA_INFO_HASH
(
pStat
))
>
0
)
{
void
*
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pStat
),
NULL
);
while
(
infoHash
)
{
...
...
@@ -274,7 +272,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pStat
),
infoHash
);
}
}
#endif
taosHashCleanup
(
RSMA_INFO_HASH
(
pStat
));
// step 3: wait all triggered fetch tasks finished
...
...
@@ -292,7 +289,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
nLoops
=
0
;
}
}
// step 4: free pStat
taosMemoryFreeClear
(
pStat
);
}
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
ae7550eb
...
...
@@ -139,6 +139,14 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if
(
isDeepFree
)
{
taosMemoryFreeClear
(
pInfo
->
pTSchema
);
}
if
(
isDeepFree
)
{
if
(
pInfo
->
queue
)
taosCloseQueue
(
pInfo
->
queue
);
if
(
pInfo
->
qall
)
taosFreeQall
(
pInfo
->
qall
);
pInfo
->
queue
=
NULL
;
pInfo
->
qall
=
NULL
;
}
taosMemoryFree
(
pInfo
);
}
...
...
@@ -179,7 +187,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
if
(
pRSmaInfo
->
taskInfo
[
i
])
{
if
((
qUpdateQualifiedTableId
(
pRSmaInfo
->
taskInfo
[
i
],
tbUids
,
true
)
<
0
))
{
if
((
(
terrno
=
qUpdateQualifiedTableId
(
pRSmaInfo
->
taskInfo
[
i
],
tbUids
,
true
)
)
<
0
))
{
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
smaError
(
"vgId:%d, update tbUidList failed for uid:%"
PRIi64
" level %d since %s"
,
SMA_VID
(
pSma
),
*
suid
,
i
,
terrstr
());
...
...
@@ -351,6 +359,12 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto
_err
;
}
pRSmaInfo
->
pTSchema
=
pTSchema
;
if
(
!
(
pRSmaInfo
->
queue
=
taosOpenQueue
()))
{
goto
_err
;
}
if
(
!
(
pRSmaInfo
->
qall
=
taosAllocateQall
()))
{
goto
_err
;
}
pRSmaInfo
->
suid
=
suid
;
pRSmaInfo
->
refId
=
RSMA_REF_ID
(
pStat
);
T_REF_INIT_VAL
(
pRSmaInfo
,
1
);
...
...
@@ -615,7 +629,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
while
(
1
)
{
uint64_t
ts
;
int32_t
code
=
qExecTaskOpt
(
taskInfo
,
pResList
,
&
ts
);
if
(
code
<
0
)
{
if
(
code
<
0
)
{
if
(
code
==
TSDB_CODE_QRY_IN_EXEC
)
{
break
;
}
else
{
...
...
@@ -662,10 +676,9 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
goto
_err
;
}
taosMemoryFreeClear
(
pReq
);
smaDebug
(
"vgId:%d, process submit req for rsma table %"
PRIi64
" level %"
PRIi8
" version:%"
PRIi64
,
SMA_VID
(
pSma
),
suid
,
pItem
->
level
,
output
->
info
.
version
);
}
}
...
...
@@ -841,30 +854,45 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief retrieve rsma meta and init
*
* @param pSma
* @param nTables number of tables of rsma
* @return int32_t
*/
static
int32_t
tdRSmaRestoreQTaskInfoInit
(
SSma
*
pSma
,
int64_t
*
nTables
)
{
SVnode
*
pVnode
=
pSma
->
pVnode
;
SVnode
*
pVnode
=
pSma
->
pVnode
;
SArray
*
suidList
=
NULL
;
STbUidStore
uidStore
=
{
0
};
SMetaReader
mr
=
{
0
};
SArray
*
suidList
=
taosArrayInit
(
1
,
sizeof
(
tb_uid_t
));
if
(
tsdbGetStbIdList
(
SMA_META
(
pSma
),
0
,
suidList
)
<
0
)
{
taosArrayDestroy
(
suidList
);
if
(
!
(
suidList
=
taosArrayInit
(
1
,
sizeof
(
tb_uid_t
))))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
if
(
vnodeGetStbIdList
(
pSma
->
pVnode
,
0
,
suidList
)
<
0
)
{
smaError
(
"vgId:%d, failed to restore rsma env since get stb id list error: %s"
,
TD_VID
(
pVnode
),
terrstr
());
return
TSDB_CODE_FAILED
;
goto
_err
;
}
int64_t
arrSize
=
taosArrayGetSize
(
suidList
);
if
(
nTables
)
{
*
nTables
=
arrSize
;
}
if
(
arrSize
==
0
)
{
if
(
nTables
)
{
*
nTables
=
0
;
}
taosArrayDestroy
(
suidList
);
smaDebug
(
"vgId:%d, no need to restore rsma env since empty stb id list"
,
TD_VID
(
pVnode
));
return
TSDB_CODE_SUCCESS
;
}
SMetaReader
mr
=
{
0
}
;
int64_t
nRsmaTables
=
0
;
metaReaderInit
(
&
mr
,
SMA_META
(
pSma
),
0
);
if
(
!
(
uidStore
.
tbUids
=
taosArrayInit
(
1024
,
sizeof
(
tb_uid_t
))))
{
goto
_err
;
}
for
(
int64_t
i
=
0
;
i
<
arrSize
;
++
i
)
{
tb_uid_t
suid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
suidList
,
i
);
smaDebug
(
"vgId:%d, rsma restore, suid is %"
PRIi64
,
TD_VID
(
pVnode
),
suid
);
...
...
@@ -877,6 +905,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
ASSERT
(
mr
.
me
.
type
==
TSDB_SUPER_TABLE
);
ASSERT
(
mr
.
me
.
uid
==
suid
);
if
(
TABLE_IS_ROLLUP
(
mr
.
me
.
flags
))
{
++
nRsmaTables
;
SRSmaParam
*
param
=
&
mr
.
me
.
stbEntry
.
rsmaParam
;
for
(
int
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
smaDebug
(
"vgId:%d, rsma restore, table:%"
PRIi64
" level:%d, maxdelay:%"
PRIi64
" watermark:%"
PRIi64
...
...
@@ -887,17 +916,40 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
smaError
(
"vgId:%d, rsma restore env failed for %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
goto
_err
;
}
// reload all ctbUids for suid
uidStore
.
suid
=
suid
;
if
(
vnodeGetCtbIdList
(
pVnode
,
suid
,
uidStore
.
tbUids
)
<
0
)
{
smaError
(
"vgId:%d, rsma restore, get ctb idlist failed for %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
goto
_err
;
}
if
(
tdUpdateTbUidList
(
pVnode
->
pSma
,
&
uidStore
)
<
0
)
{
smaError
(
"vgId:%d, rsma restore, update tb uid list failed for %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
suid
,
terrstr
());
goto
_err
;
}
taosArrayClear
(
uidStore
.
tbUids
);
smaDebug
(
"vgId:%d, rsma restore env success for %"
PRIi64
,
TD_VID
(
pVnode
),
suid
);
}
}
metaReaderClear
(
&
mr
);
taosArrayDestroy
(
suidList
);
tdUidStoreDestory
(
&
uidStore
);
if
(
nTables
)
{
*
nTables
=
nRsmaTables
;
}
return
TSDB_CODE_SUCCESS
;
_err:
metaReaderClear
(
&
mr
);
taosArrayDestroy
(
suidList
);
tdUidStoreDestory
(
&
uidStore
);
return
TSDB_CODE_FAILED
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
ae7550eb
...
...
@@ -2585,32 +2585,6 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
uint64_t
getReaderMaxVersion
(
STsdbReader
*
pReader
)
{
return
pReader
->
verRange
.
maxVer
;
}
/**
* @brief Get all suids since suid
*
* @param pMeta
* @param suid return all suids in one vnode if suid is 0
* @param list
* @return int32_t
*/
int32_t
tsdbGetStbIdList
(
SMeta
*
pMeta
,
int64_t
suid
,
SArray
*
list
)
{
SMStbCursor
*
pCur
=
metaOpenStbCursor
(
pMeta
,
suid
);
if
(
!
pCur
)
{
return
TSDB_CODE_FAILED
;
}
while
(
1
)
{
tb_uid_t
id
=
metaStbCursorNext
(
pCur
);
if
(
id
==
0
)
{
break
;
}
taosArrayPush
(
list
,
&
id
);
}
metaCloseStbCursor
(
pCur
);
return
TSDB_CODE_SUCCESS
;
}
// ====================================== EXPOSED APIs ======================================
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
SArray
*
pTableList
,
STsdbReader
**
ppReader
,
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
ae7550eb
...
...
@@ -424,6 +424,25 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeGetStbIdList
(
SVnode
*
pVnode
,
int64_t
suid
,
SArray
*
list
)
{
SMStbCursor
*
pCur
=
metaOpenStbCursor
(
pVnode
->
pMeta
,
suid
);
if
(
!
pCur
)
{
return
TSDB_CODE_FAILED
;
}
while
(
1
)
{
tb_uid_t
id
=
metaStbCursorNext
(
pCur
);
if
(
id
==
0
)
{
break
;
}
taosArrayPush
(
list
,
&
id
);
}
metaCloseStbCursor
(
pCur
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeGetCtbNum
(
SVnode
*
pVnode
,
int64_t
suid
,
int64_t
*
num
)
{
SMCtbCursor
*
pCur
=
metaOpenCtbCursor
(
pVnode
->
pMeta
,
suid
);
if
(
!
pCur
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
ae7550eb
...
...
@@ -522,7 +522,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
}
tqUpdateTbUidList
(
pVnode
->
pTq
,
tbUids
,
true
);
tdUpdateTbUidList
(
pVnode
->
pSma
,
pStore
);
if
(
tdUpdateTbUidList
(
pVnode
->
pSma
,
pStore
)
<
0
)
{
goto
_exit
;
}
tdUidStoreFree
(
pStore
);
// prepare rsp
...
...
source/libs/executor/inc/executil.h
浏览文件 @
ae7550eb
...
...
@@ -22,6 +22,7 @@
#include "tbuffer.h"
#include "tcommon.h"
#include "tpagedbuf.h"
#include "tsimplehash.h"
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \
...
...
@@ -102,7 +103,7 @@ static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPo
setBufPageDirty
(
pPage
,
true
);
}
void
initGroupedResultInfo
(
SGroupResInfo
*
pGroupResInfo
,
SHashObj
*
pHashmap
,
int32_t
order
);
void
initGroupedResultInfo
(
SGroupResInfo
*
pGroupResInfo
,
S
S
HashObj
*
pHashmap
,
int32_t
order
);
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
);
void
initMultiResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
ae7550eb
...
...
@@ -296,7 +296,7 @@ enum {
};
typedef
struct
SAggSupporter
{
S
HashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
S
SHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
char
*
keyBuf
;
// window key buffer
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
...
...
source/libs/executor/inc/tsimplehash.h
浏览文件 @
ae7550eb
...
...
@@ -17,7 +17,6 @@
#define TDENGINE_TSIMPLEHASH_H
#include "tarray.h"
#include "tlockfree.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -27,6 +26,10 @@ typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
typedef
int32_t
(
*
_equal_fn_t
)(
const
void
*
,
const
void
*
,
size_t
len
);
typedef
void
(
*
_hash_free_fn_t
)(
void
*
);
/**
* @brief single thread hash
*
*/
typedef
struct
SSHashObj
SSHashObj
;
/**
...
...
@@ -36,7 +39,7 @@ typedef struct SSHashObj SSHashObj;
* @param fn hash function to generate the hash value
* @return
*/
SSHashObj
*
tSimpleHashInit
(
size_t
capacity
,
_hash_fn_t
fn
,
size_t
keyLen
,
size_t
dataLen
);
SSHashObj
*
tSimpleHashInit
(
size_t
capacity
,
_hash_fn_t
fn
);
/**
* return the size of hash table
...
...
@@ -48,22 +51,26 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj);
int32_t
tSimpleHashPrint
(
const
SSHashObj
*
pHashObj
);
/**
* put element into hash table, if the element with the same key exists, update it
* @param pHashObj
* @param key
* @param data
* @return
* @brief put element into hash table, if the element with the same key exists, update it
*
* @param pHashObj
* @param key
* @param keyLen
* @param data
* @param dataLen
* @return int32_t
*/
int32_t
tSimpleHashPut
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
const
void
*
data
);
int32_t
tSimpleHashPut
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
const
void
*
data
,
size_t
dataLen
);
/**
* return the payload data with the specified key
*
* @param pHashObj
* @param key
* @param keyLen
* @return
*/
void
*
tSimpleHashGet
(
SSHashObj
*
pHashObj
,
const
void
*
key
);
void
*
tSimpleHashGet
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
);
/**
* remove item with the specified key
...
...
@@ -71,7 +78,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key);
* @param key
* @param keyLen
*/
int32_t
tSimpleHashRemove
(
SSHashObj
*
pHashObj
,
const
void
*
key
);
int32_t
tSimpleHashRemove
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
);
/**
* Clear the hash table.
...
...
@@ -98,7 +105,7 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
* @param keyLen
* @return
*/
void
*
tSimpleHashGetKey
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
size_t
*
keyLen
);
void
*
tSimpleHashGetKey
(
void
*
data
,
size_t
*
keyLen
);
/**
* Create the hash table iterator
...
...
@@ -109,7 +116,18 @@ void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen);
*/
void
*
tSimpleHashIterate
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
int32_t
*
iter
);
/**
* Create the hash table iterator
*
* @param pHashObj
* @param data
* @param key
* @param iter
* @return void*
*/
void
*
tSimpleHashIterateKV
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
void
**
key
,
int32_t
*
iter
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSIMPLEHASH_H
#endif // TDENGINE_TSIMPLEHASH_H
\ No newline at end of file
source/libs/executor/src/executil.c
浏览文件 @
ae7550eb
...
...
@@ -97,7 +97,7 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) {
static
int32_t
resultrowComparDesc
(
const
void
*
p1
,
const
void
*
p2
)
{
return
resultrowComparAsc
(
p2
,
p1
);
}
void
initGroupedResultInfo
(
SGroupResInfo
*
pGroupResInfo
,
SHashObj
*
pHashmap
,
int32_t
order
)
{
void
initGroupedResultInfo
(
SGroupResInfo
*
pGroupResInfo
,
S
S
HashObj
*
pHashmap
,
int32_t
order
)
{
if
(
pGroupResInfo
->
pRows
!=
NULL
)
{
taosArrayDestroy
(
pGroupResInfo
->
pRows
);
}
...
...
@@ -106,9 +106,10 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
void
*
pData
=
NULL
;
pGroupResInfo
->
pRows
=
taosArrayInit
(
10
,
POINTER_BYTES
);
size_t
keyLen
=
0
;
while
((
pData
=
taosHashIterate
(
pHashmap
,
pData
))
!=
NULL
)
{
void
*
key
=
taosHashGetKey
(
pData
,
&
keyLen
);
size_t
keyLen
=
0
;
int32_t
iter
=
0
;
while
((
pData
=
tSimpleHashIterate
(
pHashmap
,
pData
,
&
iter
))
!=
NULL
)
{
void
*
key
=
tSimpleHashGetKey
(
pData
,
&
keyLen
);
SResKeyPos
*
p
=
taosMemoryMalloc
(
keyLen
+
sizeof
(
SResultRowPosition
));
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
ae7550eb
...
...
@@ -250,7 +250,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
t
aos
HashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
(
SResultRowPosition
*
)
t
Simple
HashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
SResultRow
*
pResult
=
NULL
;
...
...
@@ -292,7 +292,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// add a new result set for a new group
SResultRowPosition
pos
=
{.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
t
aos
HashPut
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
&
pos
,
t
Simple
HashPut
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
&
pos
,
sizeof
(
SResultRowPosition
));
}
...
...
@@ -301,7 +301,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// too many time window in query
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_BATCH
&&
t
aos
HashGetSize
(
pSup
->
pResultRowHashTable
)
>
MAX_INTERVAL_TIME_WINDOW
)
{
t
Simple
HashGetSize
(
pSup
->
pResultRowHashTable
)
>
MAX_INTERVAL_TIME_WINDOW
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
);
}
...
...
@@ -3013,7 +3013,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
}
SOptrBasicInfo
*
pInfo
=
(
SOptrBasicInfo
*
)(
pOperator
->
info
);
SAggSupporter
*
pSup
=
(
SAggSupporter
*
)
POINTER_SHIFT
(
pOperator
->
info
,
sizeof
(
SOptrBasicInfo
));
int32_t
size
=
t
aos
HashGetSize
(
pSup
->
pResultRowHashTable
);
int32_t
size
=
t
Simple
HashGetSize
(
pSup
->
pResultRowHashTable
);
size_t
keyLen
=
sizeof
(
uint64_t
)
*
2
;
// estimate the key length
int32_t
totalSize
=
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
size
*
(
sizeof
(
int32_t
)
+
keyLen
+
sizeof
(
int32_t
)
+
pSup
->
resultRowSize
);
...
...
@@ -3041,9 +3041,10 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
setBufPageDirty
(
pPage
,
true
);
releaseBufPage
(
pSup
->
pResultBuf
,
pPage
);
void
*
pIter
=
taosHashIterate
(
pSup
->
pResultRowHashTable
,
NULL
);
int32_t
iter
=
0
;
void
*
pIter
=
tSimpleHashIterate
(
pSup
->
pResultRowHashTable
,
NULL
,
&
iter
);
while
(
pIter
)
{
void
*
key
=
t
aos
HashGetKey
(
pIter
,
&
keyLen
);
void
*
key
=
t
Simple
HashGetKey
(
pIter
,
&
keyLen
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
pIter
;
pPage
=
(
SFilePage
*
)
getBufPage
(
pSup
->
pResultBuf
,
p1
->
pageId
);
...
...
@@ -3075,7 +3076,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
memcpy
(
*
result
+
offset
,
pRow
,
pSup
->
resultRowSize
);
offset
+=
pSup
->
resultRowSize
;
pIter
=
t
aosHashIterate
(
pSup
->
pResultRowHashTable
,
pI
ter
);
pIter
=
t
SimpleHashIterate
(
pSup
->
pResultRowHashTable
,
pIter
,
&
i
ter
);
}
*
(
int32_t
*
)(
*
result
)
=
offset
;
...
...
@@ -3110,7 +3111,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
// add a new result set for a new group
SResultRowPosition
pos
=
{.
pageId
=
resultRow
->
pageId
,
.
offset
=
resultRow
->
offset
};
t
aos
HashPut
(
pSup
->
pResultRowHashTable
,
result
+
offset
,
keyLen
,
&
pos
,
sizeof
(
SResultRowPosition
));
t
Simple
HashPut
(
pSup
->
pResultRowHashTable
,
result
+
offset
,
keyLen
,
&
pos
,
sizeof
(
SResultRowPosition
));
offset
+=
keyLen
;
int32_t
valueLen
=
*
(
int32_t
*
)(
result
+
offset
);
...
...
@@ -3407,7 +3408,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup
->
resultRowSize
=
getResultRowSize
(
pCtx
,
numOfOutput
);
pAggSup
->
keyBuf
=
taosMemoryCalloc
(
1
,
keyBufSize
+
POINTER_BYTES
+
sizeof
(
int64_t
));
pAggSup
->
pResultRowHashTable
=
taosHashInit
(
10
,
hashFn
,
true
,
HASH_NO_LOCK
);
// pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
pAggSup
->
pResultRowHashTable
=
tSimpleHashInit
(
100000
,
hashFn
);
if
(
pAggSup
->
keyBuf
==
NULL
||
pAggSup
->
pResultRowHashTable
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -3433,7 +3435,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
void
cleanupAggSup
(
SAggSupporter
*
pAggSup
)
{
taosMemoryFreeClear
(
pAggSup
->
keyBuf
);
t
aos
HashCleanup
(
pAggSup
->
pResultRowHashTable
);
t
Simple
HashCleanup
(
pAggSup
->
pResultRowHashTable
);
destroyDiskbasedBuf
(
pAggSup
->
pResultBuf
);
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
ae7550eb
...
...
@@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pTableScanInfo
->
pdInfo
.
pAggSup
->
pResultRowHashTable
,
buf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
groupId
)));
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
tSimpleHashGet
(
pTableScanInfo
->
pdInfo
.
pAggSup
->
pResultRowHashTable
,
buf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
groupId
)));
if
(
p1
==
NULL
)
{
return
NULL
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
ae7550eb
...
...
@@ -1383,7 +1383,7 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t
int32_t
numOfOutput
)
{
SET_RES_WINDOW_KEY
(
pAggSup
->
keyBuf
,
pData
,
bytes
,
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
t
aos
HashGet
(
pAggSup
->
pResultRowHashTable
,
pAggSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
(
SResultRowPosition
*
)
t
Simple
HashGet
(
pAggSup
->
pResultRowHashTable
,
pAggSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
if
(
!
p1
)
{
// window has been closed
return
false
;
...
...
@@ -1396,14 +1396,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
size_t
bytes
=
sizeof
(
TSKEY
);
SET_RES_WINDOW_KEY
(
pAggSup
->
keyBuf
,
&
ts
,
bytes
,
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
t
aos
HashGet
(
pAggSup
->
pResultRowHashTable
,
pAggSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
(
SResultRowPosition
*
)
t
Simple
HashGet
(
pAggSup
->
pResultRowHashTable
,
pAggSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
if
(
!
p1
)
{
// window has been closed
return
false
;
}
// SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId);
// dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage);
t
aos
HashRemove
(
pAggSup
->
pResultRowHashTable
,
pAggSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
t
Simple
HashRemove
(
pAggSup
->
pResultRowHashTable
,
pAggSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
return
true
;
}
...
...
@@ -1453,11 +1453,12 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
}
}
static
int32_t
getAllIntervalWindow
(
SHashObj
*
pHashMap
,
SHashObj
*
resWins
)
{
static
int32_t
getAllIntervalWindow
(
S
S
HashObj
*
pHashMap
,
SHashObj
*
resWins
)
{
void
*
pIte
=
NULL
;
size_t
keyLen
=
0
;
while
((
pIte
=
taosHashIterate
(
pHashMap
,
pIte
))
!=
NULL
)
{
void
*
key
=
taosHashGetKey
(
pIte
,
&
keyLen
);
int32_t
iter
=
0
;
while
((
pIte
=
tSimpleHashIterate
(
pHashMap
,
pIte
,
&
iter
))
!=
NULL
)
{
void
*
key
=
tSimpleHashGetKey
(
pIte
,
&
keyLen
);
uint64_t
groupId
=
*
(
uint64_t
*
)
key
;
ASSERT
(
keyLen
==
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
)));
TSKEY
ts
=
*
(
int64_t
*
)((
char
*
)
key
+
sizeof
(
uint64_t
));
...
...
@@ -1470,16 +1471,18 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
closeIntervalWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pSup
,
SInterval
*
pInterval
,
static
int32_t
closeIntervalWindow
(
S
S
HashObj
*
pHashMap
,
STimeWindowAggSupp
*
pSup
,
SInterval
*
pInterval
,
SHashObj
*
pPullDataMap
,
SHashObj
*
closeWins
,
SArray
*
pRecyPages
,
SDiskbasedBuf
*
pDiscBuf
)
{
qDebug
(
"===stream===close interval window"
);
void
*
pIte
=
NULL
;
size_t
keyLen
=
0
;
while
((
pIte
=
taosHashIterate
(
pHashMap
,
pIte
))
!=
NULL
)
{
void
*
key
=
taosHashGetKey
(
pIte
,
&
keyLen
);
void
*
key
=
NULL
;
size_t
keyLen
=
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
));
int32_t
iter
=
0
;
while
((
pIte
=
tSimpleHashIterateKV
(
pHashMap
,
pIte
,
&
key
,
&
iter
))
!=
NULL
)
{
// void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t
groupId
=
*
(
uint64_t
*
)
key
;
ASSERT
(
keyLen
==
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
)));
//
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY
ts
=
*
(
int64_t
*
)((
char
*
)
key
+
sizeof
(
uint64_t
));
STimeWindow
win
;
win
.
skey
=
ts
;
...
...
@@ -1515,7 +1518,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
}
char
keyBuf
[
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
))];
SET_RES_WINDOW_KEY
(
keyBuf
,
&
ts
,
sizeof
(
TSKEY
),
groupId
);
t
aos
HashRemove
(
pHashMap
,
keyBuf
,
keyLen
);
t
Simple
HashRemove
(
pHashMap
,
keyBuf
,
keyLen
);
}
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2853,7 +2856,7 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) {
int32_t
bytes
=
sizeof
(
TSKEY
);
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
&
ts
,
bytes
,
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
t
aos
HashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
(
SResultRowPosition
*
)
t
Simple
HashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
return
p1
!=
NULL
;
}
...
...
@@ -2894,8 +2897,9 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
bool
isDeletedWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SAggSupporter
*
pSup
)
{
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
&
pWin
->
skey
,
sizeof
(
int64_t
),
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
int64_t
)));
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
tSimpleHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
int64_t
)));
return
p1
==
NULL
;
}
...
...
@@ -3023,7 +3027,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
}
static
void
clearStreamIntervalOperator
(
SStreamFinalIntervalOperatorInfo
*
pInfo
)
{
t
aos
HashClear
(
pInfo
->
aggSup
.
pResultRowHashTable
);
t
Simple
HashClear
(
pInfo
->
aggSup
.
pResultRowHashTable
);
clearDiskbasedBuf
(
pInfo
->
aggSup
.
pResultBuf
);
cleanupResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
...
...
@@ -4938,14 +4942,14 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
SET_RES_WINDOW_KEY
(
iaInfo
->
aggSup
.
keyBuf
,
&
wstartTs
,
TSDB_KEYSIZE
,
tableGroupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
t
aosHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
t
SimpleHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
p1
!=
NULL
);
finalizeResultRowIntoResultDataBlock
(
iaInfo
->
aggSup
.
pResultBuf
,
p1
,
pSup
->
pCtx
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pSup
->
rowEntryInfoOffset
,
pResultBlock
,
pTaskInfo
);
t
aos
HashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
t
aos
HashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
0
);
t
Simple
HashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
t
Simple
HashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
0
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4968,7 +4972,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
// there is an result exists
if
(
miaInfo
->
curTs
!=
INT64_MIN
)
{
ASSERT
(
t
aos
HashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
1
);
ASSERT
(
t
Simple
HashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
1
);
if
(
ts
!=
miaInfo
->
curTs
)
{
outputMergeAlignedIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
miaInfo
->
curTs
);
...
...
@@ -4976,7 +4980,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
}
}
else
{
miaInfo
->
curTs
=
ts
;
ASSERT
(
t
aos
HashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
0
);
ASSERT
(
t
Simple
HashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
0
);
}
STimeWindow
win
=
{
0
};
...
...
@@ -5053,7 +5057,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
if
(
pBlock
==
NULL
)
{
// close last unfinalized time window
if
(
miaInfo
->
curTs
!=
INT64_MIN
)
{
ASSERT
(
t
aos
HashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
1
);
ASSERT
(
t
Simple
HashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
1
);
outputMergeAlignedIntervalResult
(
pOperator
,
miaInfo
->
groupId
,
pRes
,
miaInfo
->
curTs
);
miaInfo
->
curTs
=
INT64_MIN
;
}
...
...
@@ -5231,12 +5235,12 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SExprSupp
*
pExprSup
=
&
pOperatorInfo
->
exprSupp
;
SET_RES_WINDOW_KEY
(
iaInfo
->
aggSup
.
keyBuf
,
&
win
->
skey
,
TSDB_KEYSIZE
,
tableGroupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
t
aos
HashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
t
Simple
HashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
p1
!=
NULL
);
finalizeResultRowIntoResultDataBlock
(
iaInfo
->
aggSup
.
pResultBuf
,
p1
,
pExprSup
->
pCtx
,
pExprSup
->
pExprInfo
,
pExprSup
->
numOfExprs
,
pExprSup
->
rowEntryInfoOffset
,
pResultBlock
,
pTaskInfo
);
t
aos
HashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
t
Simple
HashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/executor/src/tsimplehash.c
浏览文件 @
ae7550eb
...
...
@@ -14,7 +14,6 @@
*/
#include "tsimplehash.h"
#include "os.h"
#include "taoserror.h"
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
...
...
@@ -31,19 +30,21 @@
taosMemoryFreeClear(_n); \
} while (0);
#pragma pack(push, 4)
typedef
struct
SHNode
{
struct
SHNode
*
next
;
uint32_t
keyLen
:
20
;
uint32_t
dataLen
:
12
;
char
data
[];
}
SHNode
;
#pragma pack(pop)
struct
SSHashObj
{
SHNode
**
hashList
;
size_t
capacity
;
// number of slots
int64_t
size
;
// number of elements in hash table
_hash_fn_t
hashFp
;
// hash function
_equal_fn_t
equalFp
;
// equal function
int32_t
keyLen
;
int32_t
dataLen
;
int64_t
size
;
// number of elements in hash table
_hash_fn_t
hashFp
;
// hash function
_equal_fn_t
equalFp
;
// equal function
};
static
FORCE_INLINE
int32_t
taosHashCapacity
(
int32_t
length
)
{
...
...
@@ -54,7 +55,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
return
i
;
}
SSHashObj
*
tSimpleHashInit
(
size_t
capacity
,
_hash_fn_t
fn
,
size_t
keyLen
,
size_t
dataLen
)
{
SSHashObj
*
tSimpleHashInit
(
size_t
capacity
,
_hash_fn_t
fn
)
{
ASSERT
(
fn
!=
NULL
);
if
(
capacity
==
0
)
{
...
...
@@ -74,8 +75,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t
pHashObj
->
hashFp
=
fn
;
ASSERT
((
pHashObj
->
capacity
&
(
pHashObj
->
capacity
-
1
))
==
0
);
pHashObj
->
keyLen
=
keyLen
;
pHashObj
->
dataLen
=
dataLen
;
pHashObj
->
hashList
=
(
SHNode
**
)
taosMemoryCalloc
(
pHashObj
->
capacity
,
sizeof
(
void
*
));
if
(
!
pHashObj
->
hashList
)
{
...
...
@@ -93,16 +92,17 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
return
(
int32_t
)
atomic_load_64
((
int64_t
*
)
&
pHashObj
->
size
);
}
static
SHNode
*
doCreateHashNode
(
const
void
*
key
,
size_t
keyLen
,
const
void
*
pData
,
size_t
dsize
,
uint32_t
hashVal
)
{
SHNode
*
pNewNode
=
taosMemoryMalloc
(
sizeof
(
SHNode
)
+
keyLen
+
d
size
);
static
SHNode
*
doCreateHashNode
(
const
void
*
key
,
size_t
keyLen
,
const
void
*
data
,
size_t
dataLen
,
uint32_t
hashVal
)
{
SHNode
*
pNewNode
=
taosMemoryMalloc
(
sizeof
(
SHNode
)
+
keyLen
+
d
ataLen
);
if
(
!
pNewNode
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pNewNode
->
keyLen
=
keyLen
;
pNewNode
->
dataLen
=
dataLen
;
pNewNode
->
next
=
NULL
;
memcpy
(
GET_SHASH_NODE_DATA
(
pNewNode
),
pData
,
dsize
);
memcpy
(
GET_SHASH_NODE_KEY
(
pNewNode
,
d
size
),
key
,
keyLen
);
memcpy
(
GET_SHASH_NODE_DATA
(
pNewNode
),
data
,
dataLen
);
memcpy
(
GET_SHASH_NODE_KEY
(
pNewNode
,
d
ataLen
),
key
,
keyLen
);
return
pNewNode
;
}
...
...
@@ -141,8 +141,8 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
SHNode
*
pPrev
=
NULL
;
while
(
pNode
!=
NULL
)
{
void
*
key
=
GET_SHASH_NODE_KEY
(
pNode
,
p
HashObj
->
dataLen
);
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
p
HashObj
->
keyLen
);
void
*
key
=
GET_SHASH_NODE_KEY
(
pNode
,
p
Node
->
dataLen
);
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
p
Node
->
keyLen
);
int32_t
newIdx
=
HASH_INDEX
(
hashVal
,
pHashObj
->
capacity
);
pNext
=
pNode
->
next
;
...
...
@@ -170,12 +170,12 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
}
int32_t
tSimpleHashPut
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
const
void
*
data
)
{
int32_t
tSimpleHashPut
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
const
void
*
data
,
size_t
dataLen
)
{
if
(
!
pHashObj
||
!
key
)
{
return
-
1
;
}
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
pHashObj
->
keyLen
);
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
keyLen
);
// need the resize process, write lock applied
if
(
SHASH_NEED_RESIZE
(
pHashObj
))
{
...
...
@@ -186,7 +186,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
SHNode
*
pNode
=
pHashObj
->
hashList
[
slot
];
if
(
!
pNode
)
{
SHNode
*
pNewNode
=
doCreateHashNode
(
key
,
pHashObj
->
keyLen
,
data
,
pHashObj
->
dataLen
,
hashVal
);
SHNode
*
pNewNode
=
doCreateHashNode
(
key
,
keyLen
,
data
,
dataLen
,
hashVal
);
if
(
!
pNewNode
)
{
return
-
1
;
}
...
...
@@ -197,14 +197,14 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
}
while
(
pNode
)
{
if
((
*
(
pHashObj
->
equalFp
))(
GET_SHASH_NODE_KEY
(
pNode
,
p
HashObj
->
dataLen
),
key
,
pHashObj
->
keyLen
)
==
0
)
{
if
((
*
(
pHashObj
->
equalFp
))(
GET_SHASH_NODE_KEY
(
pNode
,
p
Node
->
dataLen
),
key
,
keyLen
)
==
0
)
{
break
;
}
pNode
=
pNode
->
next
;
}
if
(
!
pNode
)
{
SHNode
*
pNewNode
=
doCreateHashNode
(
key
,
pHashObj
->
keyLen
,
data
,
pHashObj
->
dataLen
,
hashVal
);
SHNode
*
pNewNode
=
doCreateHashNode
(
key
,
keyLen
,
data
,
dataLen
,
hashVal
);
if
(
!
pNewNode
)
{
return
-
1
;
}
...
...
@@ -212,16 +212,16 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
pHashObj
->
hashList
[
slot
]
=
pNewNode
;
atomic_add_fetch_64
(
&
pHashObj
->
size
,
1
);
}
else
{
// update data
memcpy
(
GET_SHASH_NODE_DATA
(
pNode
),
data
,
pHashObj
->
dataLen
);
memcpy
(
GET_SHASH_NODE_DATA
(
pNode
),
data
,
dataLen
);
}
return
0
;
}
static
FORCE_INLINE
SHNode
*
doSearchInEntryList
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
int32_t
index
)
{
static
FORCE_INLINE
SHNode
*
doSearchInEntryList
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
int32_t
index
)
{
SHNode
*
pNode
=
pHashObj
->
hashList
[
index
];
while
(
pNode
)
{
if
((
*
(
pHashObj
->
equalFp
))(
GET_SHASH_NODE_KEY
(
pNode
,
p
HashObj
->
dataLen
),
key
,
pHashObj
->
keyLen
)
==
0
)
{
if
((
*
(
pHashObj
->
equalFp
))(
GET_SHASH_NODE_KEY
(
pNode
,
p
Node
->
dataLen
),
key
,
keyLen
)
==
0
)
{
break
;
}
...
...
@@ -233,12 +233,12 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void
static
FORCE_INLINE
bool
taosHashTableEmpty
(
const
SSHashObj
*
pHashObj
)
{
return
tSimpleHashGetSize
(
pHashObj
)
==
0
;
}
void
*
tSimpleHashGet
(
SSHashObj
*
pHashObj
,
const
void
*
key
)
{
void
*
tSimpleHashGet
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
if
(
!
pHashObj
||
taosHashTableEmpty
(
pHashObj
)
||
!
key
)
{
return
NULL
;
}
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
pHashObj
->
keyLen
);
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
keyLen
);
int32_t
slot
=
HASH_INDEX
(
hashVal
,
pHashObj
->
capacity
);
SHNode
*
pNode
=
pHashObj
->
hashList
[
slot
];
...
...
@@ -247,7 +247,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
}
char
*
data
=
NULL
;
pNode
=
doSearchInEntryList
(
pHashObj
,
key
,
slot
);
pNode
=
doSearchInEntryList
(
pHashObj
,
key
,
keyLen
,
slot
);
if
(
pNode
!=
NULL
)
{
data
=
GET_SHASH_NODE_DATA
(
pNode
);
}
...
...
@@ -255,19 +255,19 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
return
data
;
}
int32_t
tSimpleHashRemove
(
SSHashObj
*
pHashObj
,
const
void
*
key
)
{
int32_t
tSimpleHashRemove
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
if
(
!
pHashObj
||
!
key
)
{
return
TSDB_CODE_FAILED
;
}
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
pHashObj
->
keyLen
);
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
keyLen
);
int32_t
slot
=
HASH_INDEX
(
hashVal
,
pHashObj
->
capacity
);
SHNode
*
pNode
=
pHashObj
->
hashList
[
slot
];
SHNode
*
pPrev
=
NULL
;
while
(
pNode
)
{
if
((
*
(
pHashObj
->
equalFp
))(
GET_SHASH_NODE_KEY
(
pNode
,
p
HashObj
->
dataLen
),
key
,
pHashObj
->
keyLen
)
==
0
)
{
if
((
*
(
pHashObj
->
equalFp
))(
GET_SHASH_NODE_KEY
(
pNode
,
p
Node
->
dataLen
),
key
,
keyLen
)
==
0
)
{
if
(
!
pPrev
)
{
pHashObj
->
hashList
[
slot
]
=
pNode
->
next
;
}
else
{
...
...
@@ -312,6 +312,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) {
tSimpleHashClear
(
pHashObj
);
taosMemoryFreeClear
(
pHashObj
->
hashList
);
taosMemoryFree
(
pHashObj
);
}
size_t
tSimpleHashGetMemSize
(
const
SSHashObj
*
pHashObj
)
{
...
...
@@ -322,26 +323,54 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
return
(
pHashObj
->
capacity
*
sizeof
(
void
*
))
+
sizeof
(
SHNode
)
*
tSimpleHashGetSize
(
pHashObj
)
+
sizeof
(
SSHashObj
);
}
void
*
tSimpleHashGetKey
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
size_t
*
keyLen
)
{
#if 0
int32_t offset = offsetof(SHNode, data);
SHNode *node = ((SHNode *)(char *)data - offset);
void
*
tSimpleHashGetKey
(
void
*
data
,
size_t
*
keyLen
)
{
SHNode
*
node
=
(
SHNode
*
)((
char
*
)
data
-
offsetof
(
SHNode
,
data
));
if
(
keyLen
)
{
*keyLen =
pHashObj
->keyLen;
*
keyLen
=
node
->
keyLen
;
}
return POINTER_SHIFT(data, pHashObj->dataLen);
return
POINTER_SHIFT
(
data
,
node
->
dataLen
);
}
return GET_SHASH_NODE_KEY(node, pHashObj->dataLen);
#endif
if
(
keyLen
)
{
*
keyLen
=
pHashObj
->
keyLen
;
void
*
tSimpleHashIterate
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
int32_t
*
iter
)
{
if
(
!
pHashObj
)
{
return
NULL
;
}
return
POINTER_SHIFT
(
data
,
pHashObj
->
dataLen
);
SHNode
*
pNode
=
NULL
;
if
(
!
data
)
{
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
capacity
;
++
i
)
{
pNode
=
pHashObj
->
hashList
[
i
];
if
(
!
pNode
)
{
continue
;
}
*
iter
=
i
;
return
GET_SHASH_NODE_DATA
(
pNode
);
}
return
NULL
;
}
pNode
=
(
SHNode
*
)((
char
*
)
data
-
offsetof
(
SHNode
,
data
));
if
(
pNode
->
next
)
{
return
GET_SHASH_NODE_DATA
(
pNode
->
next
);
}
++
(
*
iter
);
for
(
int32_t
i
=
*
iter
;
i
<
pHashObj
->
capacity
;
++
i
)
{
pNode
=
pHashObj
->
hashList
[
i
];
if
(
!
pNode
)
{
continue
;
}
*
iter
=
i
;
return
GET_SHASH_NODE_DATA
(
pNode
);
}
return
NULL
;
}
void
*
tSimpleHashIterate
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
int32_t
*
iter
)
{
void
*
tSimpleHashIterate
KV
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
void
**
key
,
int32_t
*
iter
)
{
if
(
!
pHashObj
)
{
return
NULL
;
}
...
...
@@ -355,6 +384,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
continue
;
}
*
iter
=
i
;
if
(
key
)
{
*
key
=
GET_SHASH_NODE_KEY
(
pNode
,
pNode
->
dataLen
);
}
return
GET_SHASH_NODE_DATA
(
pNode
);
}
return
NULL
;
...
...
@@ -363,6 +395,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
pNode
=
(
SHNode
*
)((
char
*
)
data
-
offsetof
(
SHNode
,
data
));
if
(
pNode
->
next
)
{
if
(
key
)
{
*
key
=
GET_SHASH_NODE_KEY
(
pNode
->
next
,
pNode
->
next
->
dataLen
);
}
return
GET_SHASH_NODE_DATA
(
pNode
->
next
);
}
...
...
@@ -373,6 +408,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
continue
;
}
*
iter
=
i
;
if
(
key
)
{
*
key
=
GET_SHASH_NODE_KEY
(
pNode
,
pNode
->
dataLen
);
}
return
GET_SHASH_NODE_DATA
(
pNode
);
}
...
...
source/libs/executor/test/tSimpleHashTests.cpp
浏览文件 @
ae7550eb
...
...
@@ -32,31 +32,33 @@
TEST
(
testCase
,
tSimpleHashTest
)
{
SSHashObj
*
pHashObj
=
tSimpleHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
)
,
sizeof
(
int64_t
),
sizeof
(
int64_t
)
);
tSimpleHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
));
assert
(
pHashObj
!=
nullptr
);
ASSERT_EQ
(
0
,
tSimpleHashGetSize
(
pHashObj
));
size_t
keyLen
=
sizeof
(
int64_t
);
size_t
dataLen
=
sizeof
(
int64_t
);
int64_t
originKeySum
=
0
;
for
(
int64_t
i
=
1
;
i
<=
100
;
++
i
)
{
originKeySum
+=
i
;
tSimpleHashPut
(
pHashObj
,
(
const
void
*
)
&
i
,
(
const
void
*
)
&
i
);
tSimpleHashPut
(
pHashObj
,
(
const
void
*
)
&
i
,
keyLen
,
(
const
void
*
)
&
i
,
dataLen
);
ASSERT_EQ
(
i
,
tSimpleHashGetSize
(
pHashObj
));
}
for
(
int64_t
i
=
1
;
i
<=
100
;
++
i
)
{
void
*
data
=
tSimpleHashGet
(
pHashObj
,
(
const
void
*
)
&
i
);
void
*
data
=
tSimpleHashGet
(
pHashObj
,
(
const
void
*
)
&
i
,
keyLen
);
ASSERT_EQ
(
i
,
*
(
int64_t
*
)
data
);
}
void
*
data
=
NULL
;
int32_t
iter
=
0
;
int64_t
keySum
=
0
;
int64_t
dataSum
=
0
;
while
((
data
=
tSimpleHashIterate
(
pHashObj
,
data
,
&
iter
)))
{
void
*
key
=
tSimpleHashGetKey
(
pHashObj
,
data
,
NULL
);
void
*
key
=
tSimpleHashGetKey
(
data
,
NULL
);
keySum
+=
*
(
int64_t
*
)
key
;
dataSum
+=
*
(
int64_t
*
)
data
;
}
...
...
@@ -65,7 +67,7 @@ TEST(testCase, tSimpleHashTest) {
ASSERT_EQ
(
keySum
,
originKeySum
);
for
(
int64_t
i
=
1
;
i
<=
100
;
++
i
)
{
tSimpleHashRemove
(
pHashObj
,
(
const
void
*
)
&
i
);
tSimpleHashRemove
(
pHashObj
,
(
const
void
*
)
&
i
,
keyLen
);
ASSERT_EQ
(
100
-
i
,
tSimpleHashGetSize
(
pHashObj
));
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录