Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a4a7022c
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a4a7022c
编写于
8月 11, 2022
作者:
C
Cary Xu
提交者:
GitHub
8月 11, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15976 from taosdata/feature/3.0_interval_hash_optimize
enh: rsma fetch optimization
上级
22b214af
64a92ff8
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
257 addition
and
85 deletion
+257
-85
include/common/tdatablock.h
include/common/tdatablock.h
+1
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+3
-3
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+1
-1
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+7
-7
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+41
-42
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+3
-3
source/libs/executor/inc/tsimplehash.h
source/libs/executor/inc/tsimplehash.h
+11
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+0
-1
source/libs/executor/src/tsimplehash.c
source/libs/executor/src/tsimplehash.c
+98
-27
source/libs/executor/test/CMakeLists.txt
source/libs/executor/test/CMakeLists.txt
+16
-1
source/libs/executor/test/tSimpleHashTests.cpp
source/libs/executor/test/tSimpleHashTests.cpp
+75
-0
tests/script/tsim/sma/tsmaCreateInsertQuery.sim
tests/script/tsim/sma/tsmaCreateInsertQuery.sim
+1
-0
未找到文件。
include/common/tdatablock.h
浏览文件 @
a4a7022c
...
...
@@ -249,6 +249,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq
**
pReq
,
const
SSDataBlock
*
pDataBlocks
,
STSchema
*
pTSchema
,
int32_t
vgId
,
tb_uid_t
suid
);
char
*
buildCtbNameByGroupId
(
const
char
*
stbName
,
uint64_t
groupId
);
static
FORCE_INLINE
int32_t
blockGetEncodeSize
(
const
SSDataBlock
*
pBlock
)
{
...
...
source/common/src/tdatablock.c
浏览文件 @
a4a7022c
...
...
@@ -1713,7 +1713,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
char
pBuf
[
128
]
=
{
0
};
int32_t
sz
=
taosArrayGetSize
(
dataBlocks
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
dataBlocks
,
i
);
SSDataBlock
*
pDataBlock
=
taosArrayGet
P
(
dataBlocks
,
i
);
size_t
numOfCols
=
taosArrayGetSize
(
pDataBlock
->
pDataBlock
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
...
...
@@ -1870,10 +1870,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
* @brief TODO: Assume that the final generated result it less than 3M
*
* @param pReq
* @param pDataBlock
* @param pDataBlock
s
* @param vgId
* @param suid
*
*
*/
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq
**
pReq
,
const
SSDataBlock
*
pDataBlock
,
STSchema
*
pTSchema
,
int32_t
vgId
,
tb_uid_t
suid
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
a4a7022c
...
...
@@ -337,6 +337,7 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_QUERY
,
vmPutMsgToQueryQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_MERGE_QUERY
,
vmPutMsgToQueryQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_QUERY_CONTINUE
,
vmPutMsgToQueryQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_FETCH_RSMA
,
vmPutMsgToQueryQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_FETCH
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_MERGE_FETCH
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_TABLE
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
@@ -347,7 +348,6 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TABLES_META
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_CANCEL_TASK
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_DROP_TASK
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_FETCH_RSMA
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CREATE_STB
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_TTL_TABLE
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_STB
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
a4a7022c
...
...
@@ -54,7 +54,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if
(
IsReq
(
pMsg
))
{
if
(
code
!=
0
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
dGError
(
"msg:%p, failed to process since %s"
,
pMsg
,
terrstr
());
dGError
(
"msg:%p, failed to process since %s"
,
pMsg
,
terrstr
(
code
));
}
vmSendRsp
(
pMsg
,
code
);
}
...
...
@@ -72,7 +72,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t
code
=
vnodeProcessQueryMsg
(
pVnode
->
pImpl
,
pMsg
);
if
(
code
!=
0
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
dGError
(
"vgId:%d, msg:%p failed to query since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
dGError
(
"vgId:%d, msg:%p failed to query since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
(
code
));
vmSendRsp
(
pMsg
,
code
);
}
...
...
@@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
pMsg
,
pInfo
);
if
(
code
!=
0
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
dGError
(
"vgId:%d, msg:%p failed to process stream since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
dGError
(
"vgId:%d, msg:%p failed to process stream since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
(
code
));
vmSendRsp
(
pMsg
,
code
);
}
...
...
@@ -110,7 +110,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
pMsg
,
pInfo
);
if
(
code
!=
0
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
dGError
(
"vgId:%d, msg:%p failed to fetch since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
dGError
(
"vgId:%d, msg:%p failed to fetch since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
(
code
));
vmSendRsp
(
pMsg
,
code
);
}
...
...
@@ -156,7 +156,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
if
((
pMsg
->
msgType
==
TDMT_SCH_QUERY
)
&&
(
grantCheck
(
TSDB_GRANT_TIME
)
!=
TSDB_CODE_SUCCESS
))
{
terrno
=
TSDB_CODE_GRANT_EXPIRED
;
code
=
terrno
;
dDebug
(
"vgId:%d, msg:%p put into vnode-query queue failed since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
dDebug
(
"vgId:%d, msg:%p put into vnode-query queue failed since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
(
code
));
}
else
{
vnodePreprocessQueryMsg
(
pVnode
->
pImpl
,
pMsg
);
dGTrace
(
"vgId:%d, msg:%p put into vnode-query queue"
,
pVnode
->
vgId
,
pMsg
);
...
...
@@ -179,11 +179,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
if
(
!
osDataSpaceAvailable
())
{
terrno
=
TSDB_CODE_VND_NO_DISKSPACE
;
code
=
terrno
;
dError
(
"vgId:%d, msg:%p put into vnode-write queue failed since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
dError
(
"vgId:%d, msg:%p put into vnode-write queue failed since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
(
code
));
}
else
if
((
pMsg
->
msgType
==
TDMT_VND_SUBMIT
)
&&
(
grantCheck
(
TSDB_GRANT_STORAGE
)
!=
TSDB_CODE_SUCCESS
))
{
terrno
=
TSDB_CODE_VND_NO_WRITE_AUTH
;
code
=
terrno
;
dDebug
(
"vgId:%d, msg:%p put into vnode-write queue failed since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
dDebug
(
"vgId:%d, msg:%p put into vnode-write queue failed since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
(
code
));
}
else
{
dGTrace
(
"vgId:%d, msg:%p put into vnode-write queue"
,
pVnode
->
vgId
,
pMsg
);
taosWriteQitem
(
pVnode
->
pWriteQ
,
pMsg
);
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
a4a7022c
...
...
@@ -293,7 +293,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
if
(
pItem
->
maxDelay
>
TSDB_MAX_ROLLUP_MAX_DELAY
)
{
pItem
->
maxDelay
=
TSDB_MAX_ROLLUP_MAX_DELAY
;
}
pItem
->
level
=
idx
==
0
?
TSDB_RETENTION_L1
:
TSDB_RETENTION_L2
;
taosTmrReset
(
tdRSmaFetchTrigger
,
pItem
->
maxDelay
,
pItem
,
smaMgmt
.
tmrHandle
,
&
pItem
->
tmrId
);
smaInfo
(
"vgId:%d, table:%"
PRIi64
" level:%"
PRIi8
" maxdelay:%"
PRIi64
" watermark:%"
PRIi64
", finally maxdelay:%"
PRIi32
,
TD_VID
(
pVnode
),
pRSmaInfo
->
suid
,
idx
+
1
,
param
->
maxdelay
[
idx
],
param
->
watermark
[
idx
],
pItem
->
maxDelay
);
...
...
@@ -613,34 +615,38 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
while
(
1
)
{
uint64_t
ts
;
int32_t
code
=
qExecTaskOpt
(
taskInfo
,
pResList
,
&
ts
);
if
(
code
<
0
)
{
smaError
(
"vgId:%d, qExecTask for rsma table %"
PRIi64
" level %"
PRIi8
" failed since %s"
,
SMA_VID
(
pSma
),
suid
,
pItem
->
level
,
terrstr
(
code
));
goto
_err
;
if
(
code
<
0
)
{
if
(
code
==
TSDB_CODE_QRY_IN_EXEC
)
{
break
;
}
else
{
smaError
(
"vgId:%d, qExecTask for rsma table %"
PRIi64
" level %"
PRIi8
" failed since %s"
,
SMA_VID
(
pSma
),
suid
,
pItem
->
level
,
terrstr
(
code
));
goto
_err
;
}
}
if
(
taosArrayGetSize
(
pResList
)
==
0
)
{
if
(
terrno
==
0
)
{
smaDebug
(
"vgId:%d, no rsma %"
PRIi8
" data fetched yet"
,
SMA_VID
(
pSma
),
pItem
->
level
);
//
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
}
else
{
smaDebug
(
"vgId:%d, no rsma %"
PRIi8
" data fetched since %s"
,
SMA_VID
(
pSma
),
pItem
->
level
,
terrstr
());
goto
_err
;
}
break
;
}
else
{
smaDebug
(
"vgId:%d, rsma %"
PRIi8
" data fetched"
,
SMA_VID
(
pSma
),
pItem
->
level
);
}
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pResList
);
++
i
)
{
SSDataBlock
*
output
=
taosArrayGetP
(
pResList
,
i
);
#if 1
char
flag
[
10
]
=
{
0
};
snprintf
(
flag
,
10
,
"level %"
PRIi8
,
pItem
->
level
);
// blockDebugShowDataBlocks(output, flag);
// taosArrayDestroy(pResult);
char
flag
[
10
]
=
{
0
};
snprintf
(
flag
,
10
,
"level %"
PRIi8
,
pItem
->
level
);
blockDebugShowDataBlocks
(
pResList
,
flag
);
#endif
STsdb
*
sinkTsdb
=
(
pItem
->
level
==
TSDB_RETENTION_L1
?
pSma
->
pRSmaTsdb
[
0
]
:
pSma
->
pRSmaTsdb
[
1
]);
SSubmitReq
*
pReq
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pResList
);
++
i
)
{
SSDataBlock
*
output
=
taosArrayGetP
(
pResList
,
i
);
STsdb
*
sinkTsdb
=
(
pItem
->
level
==
TSDB_RETENTION_L1
?
pSma
->
pRSmaTsdb
[
0
]
:
pSma
->
pRSmaTsdb
[
1
]);
SSubmitReq
*
pReq
=
NULL
;
// TODO: the schema update should be handled later(TD-17965)
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
output
,
pTSchema
,
SMA_VID
(
pSma
),
suid
)
<
0
)
{
...
...
@@ -655,11 +661,11 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
SMA_VID
(
pSma
),
suid
,
pItem
->
level
,
terrstr
());
goto
_err
;
}
taosMemoryFreeClear
(
pReq
);
smaDebug
(
"vgId:%d, process submit req for rsma table %"
PRIi64
" level %"
PRIi8
" version:%"
PRIi64
,
SMA_VID
(
pSma
),
suid
,
pItem
->
level
,
output
->
info
.
version
);
taosMemoryFreeClear
(
pReq
);
}
}
...
...
@@ -692,15 +698,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
}
SRSmaInfoItem
*
pItem
=
RSMA_INFO_ITEM
(
pInfo
,
idx
);
tdRSmaFetchAndSubmitResult
(
pSma
,
RSMA_INFO_QTASK
(
pInfo
,
idx
),
pItem
,
pInfo
->
pTSchema
,
suid
,
STREAM_INPUT__DATA_SUBMIT
);
atomic_store_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
);
if
(
smaMgmt
.
tmrHandle
)
{
taosTmrReset
(
tdRSmaFetchTrigger
,
pItem
->
maxDelay
,
pItem
,
smaMgmt
.
tmrHandle
,
&
pItem
->
tmrId
);
}
else
{
ASSERT
(
0
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -746,7 +749,6 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return
NULL
;
}
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
SRSmaInfo
*
pCowRSmaInfo
=
NULL
;
// lock
...
...
@@ -793,13 +795,7 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
static
int32_t
tdExecuteRSma
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
tb_uid_t
suid
)
{
SRSmaInfo
*
pRSmaInfo
=
tdAcquireRSmaInfoBySuid
(
pSma
,
suid
);
if
(
!
pRSmaInfo
)
{
smaDebug
(
"vgId:%d, execute rsma, no rsma info for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
!
RSMA_INFO_QTASK
(
pRSmaInfo
,
0
))
{
tdReleaseRSmaInfo
(
pSma
,
pRSmaInfo
);
smaDebug
(
"vgId:%d, execute rsma, no rsma qTaskInfo for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
smaError
(
"vgId:%d, execute rsma, no rsma info for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1331,14 +1327,16 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfo
*
pRSmaInfo
=
tdGetRSmaInfoByItem
(
pItem
);
if
(
RSMA_INFO_IS_DEL
(
pRSmaInfo
))
{
smaDebug
(
"rsma fetch task not start since rsma info already deleted, rsetId:%"
PRIi64
" refId:%d)"
,
smaMgmt
.
rsetId
,
pRSmaInfo
->
refId
);
return
;
}
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
tdAcquireSmaRef
(
smaMgmt
.
rsetId
,
pRSmaInfo
->
refId
);
if
(
!
pStat
)
{
smaDebug
(
"rsma fetch task not start since
already destroyed, rsetId rsetId:%"
PRIi64
" refId:%d)"
,
smaMgmt
.
rsetId
,
pRSmaInfo
->
refId
);
smaDebug
(
"rsma fetch task not start since
rsma stat already destroyed, rsetId:%"
PRIi64
" refId:%d)"
,
smaMgmt
.
rsetId
,
pRSmaInfo
->
refId
);
return
;
}
...
...
@@ -1350,8 +1348,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
case
TASK_TRIGGER_STAT_PAUSED
:
case
TASK_TRIGGER_STAT_CANCELLED
:
{
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pRSmaInfo
->
refId
);
smaDebug
(
"vgId:%d,
not fetch rsma level %"
PRIi8
" data since stat is %"
PRIi8
", rsetId rsetId:%"
PRIi64
" refId:%d"
,
smaDebug
(
"vgId:%d,
rsma fetch task not start for level %"
PRIi8
" since stat is %"
PRIi8
"
, rsetId rsetId:%"
PRIi64
"
refId:%d"
,
SMA_VID
(
pSma
),
pItem
->
level
,
rsmaTriggerStat
,
smaMgmt
.
rsetId
,
pRSmaInfo
->
refId
);
if
(
rsmaTriggerStat
==
TASK_TRIGGER_STAT_PAUSED
)
{
taosTmrReset
(
tdRSmaFetchTrigger
,
5000
,
pItem
,
smaMgmt
.
tmrHandle
,
&
pItem
->
tmrId
);
...
...
@@ -1366,30 +1364,31 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
atomic_val_compare_exchange_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
switch
(
fetchTriggerStat
)
{
case
TASK_TRIGGER_STAT_ACTIVE
:
{
smaDebug
(
"vgId:%d,
fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is active"
,
SMA_VID
(
pSma
)
,
pItem
->
level
,
pRSmaInfo
->
suid
);
smaDebug
(
"vgId:%d,
rsma fetch task started for level:%"
PRIi8
" suid:%"
PRIi64
" since stat is active"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
// async process
tdRSmaFetchSend
(
pSma
,
pRSmaInfo
,
pItem
->
level
);
}
break
;
case
TASK_TRIGGER_STAT_PAUSED
:
{
smaDebug
(
"vgId:%d,
not fetch rsma level %"
PRIi8
" data for table
:%"
PRIi64
" since stat is paused"
,
smaDebug
(
"vgId:%d,
rsma fetch task not start for level:%"
PRIi8
" suid
:%"
PRIi64
" since stat is paused"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
case
TASK_TRIGGER_STAT_INACTIVE
:
{
smaDebug
(
"vgId:%d,
not fetch rsma level %"
PRIi8
" data for table
:%"
PRIi64
" since stat is inactive"
,
smaDebug
(
"vgId:%d,
rsma fetch task not start for level:%"
PRIi8
" suid
:%"
PRIi64
" since stat is inactive"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
case
TASK_TRIGGER_STAT_INIT
:
{
smaDebug
(
"vgId:%d,
not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is init"
,
SMA_VID
(
pSma
)
,
pItem
->
level
,
pRSmaInfo
->
suid
);
smaDebug
(
"vgId:%d,
rsma fetch task not start for level:%"
PRIi8
" suid::%"
PRIi64
" since stat is init"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
default:
{
smaWarn
(
"vgId:%d,
not fetch rsma level %"
PRIi8
" data for table
:%"
PRIi64
" since stat is unknown"
,
smaWarn
(
"vgId:%d,
rsma fetch task not start for level:%"
PRIi8
" suid
:%"
PRIi64
" since stat is unknown"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
}
_end:
// taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pRSmaInfo
->
refId
);
}
...
...
@@ -1402,7 +1401,7 @@ _end:
* @return int32_t
*/
int32_t
tdRSmaFetchSend
(
SSma
*
pSma
,
SRSmaInfo
*
pInfo
,
int8_t
level
)
{
SRSmaFetchMsg
fetchMsg
=
{
.
suid
=
pInfo
->
suid
,
.
level
=
level
};
SRSmaFetchMsg
fetchMsg
=
{.
suid
=
pInfo
->
suid
,
.
level
=
level
};
int32_t
ret
=
0
;
int32_t
contLen
=
0
;
SEncoder
encoder
=
{
0
};
...
...
@@ -1431,7 +1430,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
.
contLen
=
contLen
,
};
if
((
terrno
=
tmsgPutToQueue
(
&
pSma
->
pVnode
->
msgCb
,
FETCH
_QUEUE
,
&
rpcMsg
))
!=
0
)
{
if
((
terrno
=
tmsgPutToQueue
(
&
pSma
->
pVnode
->
msgCb
,
QUERY
_QUEUE
,
&
rpcMsg
))
!=
0
)
{
smaError
(
"vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%"
PRIi64
" level:%"
PRIi8
" since %s"
,
SMA_VID
(
pSma
),
pInfo
->
suid
,
level
,
terrstr
());
goto
_err
;
...
...
@@ -1462,7 +1461,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
if
(
!
pRpcMsg
||
pRpcMsg
->
contLen
<
sizeof
(
SMsgHead
))
{
terrno
=
TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP
;
return
-
1
;
goto
_err
;
}
pBuf
=
POINTER_SHIFT
(
pRpcMsg
->
pCont
,
sizeof
(
SMsgHead
));
...
...
@@ -1479,7 +1478,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
terrno
=
TSDB_CODE_RSMA_EMPTY_INFO
;
}
smaWarn
(
"vgId:%d, failed to process rsma fetch msg for suid:%"
PRIi64
" level:%"
PRIi8
" since %s"
,
SMA_VID
(
pSma
),
req
.
suid
,
req
.
level
,
terrstr
());
req
.
suid
,
req
.
level
,
terrstr
());
goto
_err
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
a4a7022c
...
...
@@ -293,6 +293,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return
qWorkerProcessQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
,
0
);
case
TDMT_SCH_QUERY_CONTINUE
:
return
qWorkerProcessCQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
,
0
);
case
TDMT_VND_FETCH_RSMA
:
return
smaProcessFetch
(
pVnode
->
pSma
,
pMsg
);
default:
vError
(
"unknown msg type:%d in query queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
...
...
@@ -329,8 +331,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return
vnodeGetTableCfg
(
pVnode
,
pMsg
,
true
);
case
TDMT_VND_BATCH_META
:
return
vnodeGetBatchMeta
(
pVnode
,
pMsg
);
case
TDMT_VND_FETCH_RSMA
:
return
smaProcessFetch
(
pVnode
->
pSma
,
pMsg
);
case
TDMT_VND_CONSUME
:
return
tqProcessPollReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_TASK_RUN
:
...
...
@@ -357,7 +357,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// TODO: remove the function
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
)
{
// TODO
blockDebugShowDataBlocks
(
data
,
__func__
);
//
blockDebugShowDataBlocks(data, __func__);
tdProcessTSmaInsert
(((
SVnode
*
)
pVnode
)
->
pSma
,
smaId
,
(
const
char
*
)
data
);
}
...
...
source/libs/executor/inc/tsimplehash.h
浏览文件 @
a4a7022c
...
...
@@ -45,6 +45,8 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t
*/
int32_t
tSimpleHashGetSize
(
const
SSHashObj
*
pHashObj
);
int32_t
tSimpleHashPrint
(
const
SSHashObj
*
pHashObj
);
/**
* put element into hash table, if the element with the same key exists, update it
* @param pHashObj
...
...
@@ -98,6 +100,15 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
*/
void
*
tSimpleHashGetKey
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
size_t
*
keyLen
);
/**
* Create the hash table iterator
* @param pHashObj
* @param data
* @param iter
* @return void*
*/
void
*
tSimpleHashIterate
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
int32_t
*
iter
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/executor/src/executor.c
浏览文件 @
a4a7022c
...
...
@@ -529,7 +529,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
cleanUpUdfs
();
qDebug
(
"%s task abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
atomic_store_64
(
&
pTaskInfo
->
owner
,
0
);
return
pTaskInfo
->
code
;
}
...
...
source/libs/executor/src/tsimplehash.c
浏览文件 @
a4a7022c
...
...
@@ -62,7 +62,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t
}
SSHashObj
*
pHashObj
=
(
SSHashObj
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSHashObj
));
if
(
pHashObj
==
NULL
)
{
if
(
!
pHashObj
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
...
...
@@ -78,7 +78,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t
pHashObj
->
dataLen
=
dataLen
;
pHashObj
->
hashList
=
(
SHNode
**
)
taosMemoryCalloc
(
pHashObj
->
capacity
,
sizeof
(
void
*
));
if
(
pHashObj
->
hashList
==
NULL
)
{
if
(
!
pHashObj
->
hashList
)
{
taosMemoryFree
(
pHashObj
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -87,7 +87,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t
}
int32_t
tSimpleHashGetSize
(
const
SSHashObj
*
pHashObj
)
{
if
(
pHashObj
==
NULL
)
{
if
(
!
pHashObj
)
{
return
0
;
}
return
(
int32_t
)
atomic_load_64
((
int64_t
*
)
&
pHashObj
->
size
);
...
...
@@ -95,7 +95,7 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
static
SHNode
*
doCreateHashNode
(
const
void
*
key
,
size_t
keyLen
,
const
void
*
pData
,
size_t
dsize
,
uint32_t
hashVal
)
{
SHNode
*
pNewNode
=
taosMemoryMalloc
(
sizeof
(
SHNode
)
+
keyLen
+
dsize
);
if
(
pNewNode
==
NULL
)
{
if
(
!
pNewNode
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
...
...
@@ -120,7 +120,7 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
int64_t
st
=
taosGetTimestampUs
();
void
*
pNewEntryList
=
taosMemoryRealloc
(
pHashObj
->
hashList
,
sizeof
(
void
*
)
*
newCapacity
);
if
(
pNewEntryList
==
NULL
)
{
if
(
!
pNewEntryList
)
{
// qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return
;
}
...
...
@@ -133,22 +133,21 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
for
(
int32_t
idx
=
0
;
idx
<
pHashObj
->
capacity
;
++
idx
)
{
SHNode
*
pNode
=
pHashObj
->
hashList
[
idx
];
if
(
pNode
==
NULL
)
{
if
(
!
pNode
)
{
continue
;
}
SHNode
*
pNext
;
SHNode
*
pNext
=
NULL
;
SHNode
*
pPrev
=
NULL
;
while
(
pNode
!=
NULL
)
{
void
*
key
=
GET_SHASH_NODE_KEY
(
pNode
,
pHashObj
->
dataLen
);
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
pHashObj
->
data
Len
);
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
pHashObj
->
key
Len
);
int32_t
newIdx
=
HASH_INDEX
(
hashVal
,
pHashObj
->
capacity
);
pNext
=
pNode
->
next
;
if
(
newIdx
!=
idx
)
{
if
(
pPrev
==
NULL
)
{
if
(
!
pPrev
)
{
pHashObj
->
hashList
[
idx
]
=
pNext
;
}
else
{
pPrev
->
next
=
pNext
;
...
...
@@ -172,7 +171,7 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
}
int32_t
tSimpleHashPut
(
SSHashObj
*
pHashObj
,
const
void
*
key
,
const
void
*
data
)
{
if
(
pHashObj
==
NULL
||
key
==
NULL
)
{
if
(
!
pHashObj
||
!
key
)
{
return
-
1
;
}
...
...
@@ -186,13 +185,14 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
int32_t
slot
=
HASH_INDEX
(
hashVal
,
pHashObj
->
capacity
);
SHNode
*
pNode
=
pHashObj
->
hashList
[
slot
];
if
(
pNode
==
NULL
)
{
SHNode
*
pNewNode
=
doCreateHashNode
(
key
,
pHashObj
->
keyLen
,
data
,
pHashObj
->
size
,
hashVal
);
if
(
pNewNode
==
NULL
)
{
if
(
!
pNode
)
{
SHNode
*
pNewNode
=
doCreateHashNode
(
key
,
pHashObj
->
keyLen
,
data
,
pHashObj
->
dataLen
,
hashVal
);
if
(
!
pNewNode
)
{
return
-
1
;
}
pHashObj
->
hashList
[
slot
]
=
pNewNode
;
atomic_add_fetch_64
(
&
pHashObj
->
size
,
1
);
return
0
;
}
...
...
@@ -203,9 +203,9 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
pNode
=
pNode
->
next
;
}
if
(
pNode
==
NULL
)
{
SHNode
*
pNewNode
=
doCreateHashNode
(
key
,
pHashObj
->
keyLen
,
data
,
pHashObj
->
size
,
hashVal
);
if
(
pNewNode
==
NULL
)
{
if
(
!
pNode
)
{
SHNode
*
pNewNode
=
doCreateHashNode
(
key
,
pHashObj
->
keyLen
,
data
,
pHashObj
->
dataLen
,
hashVal
);
if
(
!
pNewNode
)
{
return
-
1
;
}
pNewNode
->
next
=
pHashObj
->
hashList
[
slot
];
...
...
@@ -234,7 +234,7 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void
static
FORCE_INLINE
bool
taosHashTableEmpty
(
const
SSHashObj
*
pHashObj
)
{
return
tSimpleHashGetSize
(
pHashObj
)
==
0
;
}
void
*
tSimpleHashGet
(
SSHashObj
*
pHashObj
,
const
void
*
key
)
{
if
(
pHashObj
==
NULL
||
taosHashTableEmpty
(
pHashObj
)
||
key
==
NULL
)
{
if
(
!
pHashObj
||
taosHashTableEmpty
(
pHashObj
)
||
!
key
)
{
return
NULL
;
}
...
...
@@ -242,7 +242,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
int32_t
slot
=
HASH_INDEX
(
hashVal
,
pHashObj
->
capacity
);
SHNode
*
pNode
=
pHashObj
->
hashList
[
slot
];
if
(
pNode
==
NULL
)
{
if
(
!
pNode
)
{
return
NULL
;
}
...
...
@@ -256,19 +256,43 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
}
int32_t
tSimpleHashRemove
(
SSHashObj
*
pHashObj
,
const
void
*
key
)
{
// todo
if
(
!
pHashObj
||
!
key
)
{
return
TSDB_CODE_FAILED
;
}
uint32_t
hashVal
=
(
*
pHashObj
->
hashFp
)(
key
,
(
uint32_t
)
pHashObj
->
keyLen
);
int32_t
slot
=
HASH_INDEX
(
hashVal
,
pHashObj
->
capacity
);
SHNode
*
pNode
=
pHashObj
->
hashList
[
slot
];
SHNode
*
pPrev
=
NULL
;
while
(
pNode
)
{
if
((
*
(
pHashObj
->
equalFp
))(
GET_SHASH_NODE_KEY
(
pNode
,
pHashObj
->
dataLen
),
key
,
pHashObj
->
keyLen
)
==
0
)
{
if
(
!
pPrev
)
{
pHashObj
->
hashList
[
slot
]
=
pNode
->
next
;
}
else
{
pPrev
->
next
=
pNode
->
next
;
}
FREE_HASH_NODE
(
pNode
);
atomic_sub_fetch_64
(
&
pHashObj
->
size
,
1
);
break
;
}
pPrev
=
pNode
;
pNode
=
pNode
->
next
;
}
return
TSDB_CODE_SUCCESS
;
}
void
tSimpleHashClear
(
SSHashObj
*
pHashObj
)
{
if
(
pHashObj
==
NULL
)
{
if
(
!
pHashObj
||
taosHashTableEmpty
(
pHashObj
)
)
{
return
;
}
SHNode
*
pNode
,
*
pNext
;
SHNode
*
pNode
=
NULL
,
*
pNext
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
capacity
;
++
i
)
{
pNode
=
pHashObj
->
hashList
[
i
];
if
(
pNode
==
NULL
)
{
if
(
!
pNode
)
{
continue
;
}
...
...
@@ -278,11 +302,11 @@ void tSimpleHashClear(SSHashObj *pHashObj) {
pNode
=
pNext
;
}
}
pHashObj
->
size
=
0
;
atomic_store_64
(
&
pHashObj
->
size
,
0
)
;
}
void
tSimpleHashCleanup
(
SSHashObj
*
pHashObj
)
{
if
(
pHashObj
==
NULL
)
{
if
(
!
pHashObj
)
{
return
;
}
...
...
@@ -291,7 +315,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) {
}
size_t
tSimpleHashGetMemSize
(
const
SSHashObj
*
pHashObj
)
{
if
(
pHashObj
==
NULL
)
{
if
(
!
pHashObj
)
{
return
0
;
}
...
...
@@ -299,11 +323,58 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
}
void
*
tSimpleHashGetKey
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
size_t
*
keyLen
)
{
#if 0
int32_t offset = offsetof(SHNode, data);
SHNode *node = ((SHNode *)(char *)data - offset);
if
(
keyLen
!=
NULL
)
{
if (keyLen) {
*keyLen = pHashObj->keyLen;
}
return POINTER_SHIFT(data, pHashObj->dataLen);
return GET_SHASH_NODE_KEY(node, pHashObj->dataLen);
#endif
if
(
keyLen
)
{
*
keyLen
=
pHashObj
->
keyLen
;
}
return
POINTER_SHIFT
(
data
,
pHashObj
->
dataLen
);
}
void
*
tSimpleHashIterate
(
const
SSHashObj
*
pHashObj
,
void
*
data
,
int32_t
*
iter
)
{
if
(
!
pHashObj
)
{
return
NULL
;
}
SHNode
*
pNode
=
NULL
;
if
(
!
data
)
{
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
capacity
;
++
i
)
{
pNode
=
pHashObj
->
hashList
[
i
];
if
(
!
pNode
)
{
continue
;
}
*
iter
=
i
;
return
GET_SHASH_NODE_DATA
(
pNode
);
}
return
NULL
;
}
pNode
=
(
SHNode
*
)((
char
*
)
data
-
offsetof
(
SHNode
,
data
));
if
(
pNode
->
next
)
{
return
GET_SHASH_NODE_DATA
(
pNode
->
next
);
}
++
(
*
iter
);
for
(
int32_t
i
=
*
iter
;
i
<
pHashObj
->
capacity
;
++
i
)
{
pNode
=
pHashObj
->
hashList
[
i
];
if
(
!
pNode
)
{
continue
;
}
*
iter
=
i
;
return
GET_SHASH_NODE_DATA
(
pNode
);
}
return
NULL
;
}
\ No newline at end of file
source/libs/executor/test/CMakeLists.txt
浏览文件 @
a4a7022c
...
...
@@ -17,4 +17,19 @@ IF(NOT TD_DARWIN)
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/executor/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/executor/inc"
)
ENDIF
()
\ No newline at end of file
ENDIF
()
# SET(CMAKE_CXX_STANDARD 11)
# AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# ADD_EXECUTABLE(tSimpleHashTest tSimpleHashTests.cpp)
# TARGET_LINK_LIBRARIES(
# tSimpleHashTest
# PRIVATE os util common executor gtest_main
# )
# TARGET_INCLUDE_DIRECTORIES(
# tSimpleHashTest
# PUBLIC "${TD_SOURCE_DIR}/include/common"
# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
# )
\ No newline at end of file
source/libs/executor/test/tSimpleHashTests.cpp
0 → 100644
浏览文件 @
a4a7022c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <iostream>
#include "taos.h"
#include "thash.h"
#include "tsimplehash.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
// int main(int argc, char **argv) {
// testing::InitGoogleTest(&argc, argv);
// return RUN_ALL_TESTS();
// }
TEST
(
testCase
,
tSimpleHashTest
)
{
SSHashObj
*
pHashObj
=
tSimpleHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
sizeof
(
int64_t
),
sizeof
(
int64_t
));
assert
(
pHashObj
!=
nullptr
);
ASSERT_EQ
(
0
,
tSimpleHashGetSize
(
pHashObj
));
int64_t
originKeySum
=
0
;
for
(
int64_t
i
=
1
;
i
<=
100
;
++
i
)
{
originKeySum
+=
i
;
tSimpleHashPut
(
pHashObj
,
(
const
void
*
)
&
i
,
(
const
void
*
)
&
i
);
ASSERT_EQ
(
i
,
tSimpleHashGetSize
(
pHashObj
));
}
for
(
int64_t
i
=
1
;
i
<=
100
;
++
i
)
{
void
*
data
=
tSimpleHashGet
(
pHashObj
,
(
const
void
*
)
&
i
);
ASSERT_EQ
(
i
,
*
(
int64_t
*
)
data
);
}
void
*
data
=
NULL
;
int32_t
iter
=
0
;
int64_t
keySum
=
0
;
int64_t
dataSum
=
0
;
while
((
data
=
tSimpleHashIterate
(
pHashObj
,
data
,
&
iter
)))
{
void
*
key
=
tSimpleHashGetKey
(
pHashObj
,
data
,
NULL
);
keySum
+=
*
(
int64_t
*
)
key
;
dataSum
+=
*
(
int64_t
*
)
data
;
}
ASSERT_EQ
(
keySum
,
dataSum
);
ASSERT_EQ
(
keySum
,
originKeySum
);
for
(
int64_t
i
=
1
;
i
<=
100
;
++
i
)
{
tSimpleHashRemove
(
pHashObj
,
(
const
void
*
)
&
i
);
ASSERT_EQ
(
100
-
i
,
tSimpleHashGetSize
(
pHashObj
));
}
tSimpleHashCleanup
(
pHashObj
);
}
#pragma GCC diagnostic pop
\ No newline at end of file
tests/script/tsim/sma/tsmaCreateInsertQuery.sim
浏览文件 @
a4a7022c
...
...
@@ -61,6 +61,7 @@ endi
print =============== select * from stb from memory in designated vgroup
sql select _wstart, _wend, min(c1),max(c2),max(c1) from stb interval(5m,10s) sliding(5m);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
if $rows != 1 then
print rows $rows != 1
return -1
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录