Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4473d236
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4473d236
编写于
4月 14, 2022
作者:
C
Cary Xu
提交者:
GitHub
4月 14, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11465 from taosdata/feature/TD-11463-3.0
feat: add version for tsma expired window updating
上级
9fed9752
401d1fe8
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
22 addition
and
34 deletion
+22
-34
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+3
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+17
-30
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+1
-1
未找到文件。
source/dnode/vnode/inc/vnode.h
浏览文件 @
4473d236
...
...
@@ -393,10 +393,11 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
* @brief When submit msg received, update the relative expired window synchronously.
*
* @param pTsdb
* @param msg
* @param pMsg
* @param version
* @return int32_t
*/
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
/**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
4473d236
...
...
@@ -82,7 +82,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
memcpy
(
data
,
msg
,
msgLen
);
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
tsdbUpdateSmaWindow
(
pTq
->
pVnode
->
pTsdb
,
msg
)
!=
0
)
{
if
(
tsdbUpdateSmaWindow
(
pTq
->
pVnode
->
pTsdb
,
msg
,
version
)
!=
0
)
{
return
-
1
;
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
4473d236
...
...
@@ -105,8 +105,8 @@ struct SSmaStat {
// declaration of static functions
// expired window
static
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
);
static
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
,
int64_t
version
);
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
);
static
void
*
tsdbFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
);
...
...
@@ -544,7 +544,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return
TSDB_CODE_SUCCESS
;
};
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
)
{
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
,
int64_t
version
)
{
SSmaStatItem
*
pItem
=
taosHashGet
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
if
(
pItem
==
NULL
)
{
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
...
...
@@ -578,8 +578,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
return
TSDB_CODE_FAILED
;
}
int8_t
state
=
TSDB_SMA_STAT_EXPIRED
;
if
(
taosHashPut
(
pItem
->
expiredWindows
,
&
winSKey
,
sizeof
(
TSKEY
),
&
state
,
sizeof
(
state
))
!=
0
)
{
if
(
taosHashPut
(
pItem
->
expiredWindows
,
&
winSKey
,
sizeof
(
TSKEY
),
&
version
,
sizeof
(
version
))
!=
0
)
{
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
// tell query module to query raw TS data.
// N.B.
...
...
@@ -606,7 +605,8 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
* @param msg SSubmitReq
* @return int32_t
*/
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
)
{
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
)
{
// no time-range-sma, just return success
if
(
atomic_load_16
(
&
REPO_TSMA_NUM
(
pTsdb
))
<=
0
)
{
tsdbTrace
(
"vgId:%d not update expire window since no tSma"
,
REPO_ID
(
pTsdb
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -621,20 +621,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
return
TSDB_CODE_FAILED
;
}
// TODO: decode the msg from Stream Computing module => start
#ifdef TSDB_SMA_TESTx
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
const
int32_t
SMA_TEST_EXPIRED_WINDOW_SIZE
=
10
;
TSKEY
expiredWindows
[
SMA_TEST_EXPIRED_WINDOW_SIZE
];
TSKEY
skey1
=
1646987196
*
1e3
;
for
(
int32_t
i
=
0
;
i
<
SMA_TEST_EXPIRED_WINDOW_SIZE
;
++
i
)
{
expiredWindows
[
i
]
=
skey1
+
i
;
}
#else
#endif
// TODO: decode the msg <= end
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
...
...
@@ -700,7 +686,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
TSKEY
winSKey
=
taosTimeTruncate
(
TD_ROW_KEY
(
row
),
&
interval
,
interval
.
precision
);
tsdbSetExpiredWindow
(
pTsdb
,
pItemsHash
,
pTSma
->
indexUid
,
winSKey
);
tsdbSetExpiredWindow
(
pTsdb
,
pItemsHash
,
pTSma
->
indexUid
,
winSKey
,
version
);
// TODO: release only when suid changes.
tdDestroyTSmaWrapper
(
pSW
);
...
...
@@ -975,7 +961,7 @@ static int tsdbSmaBeginCommit(SSmaEnv *pEnv) {
// start a new txn
tdbTxnOpen
(
pTxn
,
0
,
poolMalloc
,
poolFree
,
pEnv
->
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
if
(
tdbBegin
(
pEnv
->
dbEnv
,
pTxn
)
!=
0
)
{
tsdbWarn
(
"tsdbSma tdb
restart txn
fail"
);
tsdbWarn
(
"tsdbSma tdb
begin commit
fail"
);
return
-
1
;
}
return
0
;
...
...
@@ -986,7 +972,7 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) {
// Commit current txn
if
(
tdbCommit
(
pEnv
->
dbEnv
,
pTxn
)
!=
0
)
{
tsdbWarn
(
"tsdbSma tdb commit fail"
);
tsdbWarn
(
"tsdbSma tdb
end
commit fail"
);
return
-
1
;
}
tdbTxnClose
(
pTxn
);
...
...
@@ -1009,12 +995,12 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) {
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
const
SArray
*
pDataBlocks
=
(
const
SArray
*
)
msg
;
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
REPO_TSMA_ENV
(
pTsdb
));
if
(
pEnv
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d insert tSma data failed since pTSmaEnv is NULL"
,
REPO_ID
(
pTsdb
));
return
terrno
;
// For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
// the sma data would arrive ahead of the update-expired-window msg.
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
if
(
pDataBlocks
==
NULL
)
{
...
...
@@ -1029,6 +1015,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
return
TSDB_CODE_FAILED
;
}
SSmaEnv
*
pEnv
=
REPO_TSMA_ENV
(
pTsdb
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SSmaStatItem
*
pItem
=
NULL
;
...
...
@@ -1683,9 +1670,9 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) {
return
code
;
}
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
)
{
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbUpdateExpiredWindowImpl
(
pTsdb
,
pMsg
))
<
0
)
{
if
((
code
=
tsdbUpdateExpiredWindowImpl
(
pTsdb
,
pMsg
,
version
))
<
0
)
{
tsdbWarn
(
"vgId:%d update expired sma window failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
4473d236
...
...
@@ -409,7 +409,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
EXPECT_EQ
(
tdScanAndConvertSubmitMsg
(
pMsg
),
TSDB_CODE_SUCCESS
);
EXPECT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
pMsg
),
0
);
EXPECT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
pMsg
,
0
),
0
);
// init
const
int32_t
tSmaGroupSize
=
4
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录