Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
03d170b7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
03d170b7
编写于
3月 26, 2022
作者:
C
Cary Xu
提交者:
GitHub
3月 26, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11000 from taosdata/feature/TD-11463-3.0
Feature/td 11463 3.0
上级
e06edeaf
2dfecab0
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
117 addition
and
54 deletion
+117
-54
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+2
-0
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+29
-1
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+19
-21
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+67
-32
未找到文件。
source/dnode/vnode/inc/tsdb.h
浏览文件 @
03d170b7
...
...
@@ -276,6 +276,8 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro
*/
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
int32_t
tdScanAndConvertSubmitMsg
(
SSubmitReq
*
pMsg
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
03d170b7
...
...
@@ -227,7 +227,35 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return
0
;
}
static
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
)
{
int32_t
tdScanAndConvertSubmitMsg
(
SSubmitReq
*
pMsg
)
{
ASSERT
(
pMsg
!=
NULL
);
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitBlk
*
pBlock
=
NULL
;
SSubmitBlkIter
blkIter
=
{
0
};
STSRow
*
row
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
pMsg
->
length
=
htonl
(
pMsg
->
length
);
pMsg
->
numOfBlocks
=
htonl
(
pMsg
->
numOfBlocks
);
if
(
tInitSubmitMsgIter
(
pMsg
,
&
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
if
(
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
)
<
0
)
return
-
1
;
if
(
pBlock
==
NULL
)
break
;
pBlock
->
uid
=
htobe64
(
pBlock
->
uid
);
pBlock
->
suid
=
htobe64
(
pBlock
->
suid
);
pBlock
->
sversion
=
htonl
(
pBlock
->
sversion
);
pBlock
->
dataLen
=
htonl
(
pBlock
->
dataLen
);
pBlock
->
schemaLen
=
htonl
(
pBlock
->
schemaLen
);
pBlock
->
numOfRows
=
htons
(
pBlock
->
numOfRows
);
}
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
return
-
1
;
return
0
;
}
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
)
{
ASSERT
(
pMsg
!=
NULL
);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter
msgIter
=
{
0
};
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
03d170b7
...
...
@@ -81,6 +81,7 @@ struct SSmaStat {
// expired window
static
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
const
char
*
msg
);
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
);
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
);
static
void
*
tsdbFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
);
...
...
@@ -384,17 +385,12 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return
TSDB_CODE_SUCCESS
;
};
static
STimeWindow
getActiveTimeWindowX
(
int64_t
ts
,
SInterval
*
pInterval
)
{
STimeWindow
tw
=
{
0
};
tw
.
skey
=
100
;
tw
.
ekey
=
1000
;
return
tw
;
}
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
)
{
SSmaStatItem
*
pItem
=
taosHashGet
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
if
(
pItem
==
NULL
)
{
pItem
=
tsdbNewSmaStatItem
(
TSDB_SMA_STAT_EXPIRED
);
// TODO use the real state
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
pItem
=
tsdbNewSmaStatItem
(
TSDB_SMA_STAT_OK
);
// TODO use the real state
if
(
pItem
==
NULL
)
{
// Response to stream computing: OOM
// For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
...
...
@@ -419,6 +415,9 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
taosMemoryFree
(
pItem
);
return
TSDB_CODE_FAILED
;
}
}
else
if
((
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
}
int8_t
state
=
TSDB_SMA_STAT_EXPIRED
;
...
...
@@ -491,41 +490,39 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) {
TASSERT
(
pEnv
!=
NULL
&&
pStat
!=
NULL
&&
pItemsHash
!=
NULL
);
// basic procedure
// TODO: optimization
tsdbRefSmaStat
(
pTsdb
,
pStat
);
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitBlk
*
pBlock
=
NULL
;
SInterval
interval
=
{
0
};
if
(
tInitSubmitMsgIter
(
pMsg
,
&
msgIter
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_FAILED
;
}
// basic procedure
// TODO: optimization
tsdbRefSmaStat
(
pTsdb
,
pStat
);
while
(
true
)
{
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
if
(
pBlock
==
NULL
)
break
;
int64_t
suid
=
htobe64
(
pBlock
->
uid
);
STSmaWrapper
*
pSW
=
NULL
;
STSma
*
pTSma
=
NULL
;
SSubmitBlkIter
blkIter
=
{
0
};
if
(
tInitSubmitBlkIter
(
pBlock
,
&
blkIter
)
!=
TSDB_CODE_SUCCESS
)
{
tdFreeTSmaWrapper
(
pSW
);
break
;
}
while
(
true
)
{
SSubmitBlkIter
blkIter
=
{
0
};
if
(
tInitSubmitBlkIter
(
pBlock
,
&
blkIter
)
!=
TSDB_CODE_SUCCESS
)
{
tdFreeTSmaWrapper
(
pSW
);
break
;
}
STSRow
*
row
=
tGetSubmitBlkNext
(
&
blkIter
);
if
(
row
==
NULL
)
{
tdFreeTSmaWrapper
(
pSW
);
break
;
}
if
(
pSW
==
NULL
)
{
if
((
pSW
=
metaGetSmaInfoByTable
(
REPO_META
(
pTsdb
),
suid
))
==
NULL
)
{
if
((
pSW
=
metaGetSmaInfoByTable
(
REPO_META
(
pTsdb
),
pBlock
->
suid
))
==
NULL
)
{
break
;
}
if
((
pSW
->
number
)
<=
0
||
(
pSW
->
tSma
==
NULL
))
{
...
...
@@ -542,8 +539,9 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) {
interval
.
sliding
=
pTSma
->
sliding
;
interval
.
slidingUnit
=
pTSma
->
slidingUnit
;
STimeWindow
tw
=
getActiveTimeWindowX
(
TD_ROW_KEY
(
row
),
&
interval
);
tsdbSetExpiredWindow
(
pTsdb
,
pItemsHash
,
pTSma
->
indexUid
,
TD_ROW_KEY
(
row
));
TSKEY
winSKey
=
taosTimeTruncate
(
TD_ROW_KEY
(
row
),
&
interval
,
interval
.
precision
);
tsdbSetExpiredWindow
(
pTsdb
,
pItemsHash
,
pTSma
->
indexUid
,
winSKey
);
}
}
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
03d170b7
...
...
@@ -43,7 +43,7 @@ TEST(testCase, unionEncodeDecodeTest) {
};
};
col_id_t
nBSmaCols
;
col_id_t
*
pBSmaCols
;
col_id_t
*
pBSmaCols
;
}
SUnionTest
;
SUnionTest
sut
=
{
0
};
...
...
@@ -51,13 +51,13 @@ TEST(testCase, unionEncodeDecodeTest) {
sut
.
type
=
1
;
sut
.
nBSmaCols
=
2
;
sut
.
pBSmaCols
=
(
col_id_t
*
)
taosMemoryMalloc
(
sut
.
nBSmaCols
*
sizeof
(
col_id_t
));
sut
.
pBSmaCols
=
(
col_id_t
*
)
taosMemoryMalloc
(
sut
.
nBSmaCols
*
sizeof
(
col_id_t
));
for
(
col_id_t
i
=
0
;
i
<
sut
.
nBSmaCols
;
++
i
)
{
sut
.
pBSmaCols
[
i
]
=
i
+
100
;
}
void
*
buf
=
taosMemoryMalloc
(
1024
);
void
*
pBuf
=
buf
;
void
*
buf
=
taosMemoryMalloc
(
1024
);
void
*
pBuf
=
buf
;
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedU8
(
&
buf
,
sut
.
info
);
tlen
+=
taosEncodeFixedI16
(
&
buf
,
sut
.
nBSmaCols
);
...
...
@@ -68,9 +68,9 @@ TEST(testCase, unionEncodeDecodeTest) {
SUnionTest
dut
=
{
0
};
pBuf
=
taosDecodeFixedU8
(
pBuf
,
&
dut
.
info
);
pBuf
=
taosDecodeFixedI16
(
pBuf
,
&
dut
.
nBSmaCols
);
if
(
dut
.
nBSmaCols
>
0
)
{
dut
.
pBSmaCols
=
(
col_id_t
*
)
taosMemoryMalloc
(
dut
.
nBSmaCols
*
sizeof
(
col_id_t
));
for
(
col_id_t
i
=
0
;
i
<
dut
.
nBSmaCols
;
++
i
)
{
if
(
dut
.
nBSmaCols
>
0
)
{
dut
.
pBSmaCols
=
(
col_id_t
*
)
taosMemoryMalloc
(
dut
.
nBSmaCols
*
sizeof
(
col_id_t
));
for
(
col_id_t
i
=
0
;
i
<
dut
.
nBSmaCols
;
++
i
)
{
pBuf
=
taosDecodeFixedI16
(
pBuf
,
dut
.
pBSmaCols
+
i
);
}
}
else
{
...
...
@@ -83,9 +83,9 @@ TEST(testCase, unionEncodeDecodeTest) {
ASSERT_EQ
(
sut
.
rollup
,
dut
.
rollup
);
ASSERT_EQ
(
sut
.
type
,
dut
.
type
);
ASSERT_EQ
(
sut
.
nBSmaCols
,
dut
.
nBSmaCols
);
for
(
col_id_t
i
=
0
;
i
<
sut
.
nBSmaCols
;
++
i
)
{
ASSERT_EQ
(
*
(
col_id_t
*
)(
sut
.
pBSmaCols
+
i
),
sut
.
pBSmaCols
[
i
]);
ASSERT_EQ
(
*
(
col_id_t
*
)(
sut
.
pBSmaCols
+
i
),
dut
.
pBSmaCols
[
i
]);
for
(
col_id_t
i
=
0
;
i
<
sut
.
nBSmaCols
;
++
i
)
{
ASSERT_EQ
(
*
(
col_id_t
*
)(
sut
.
pBSmaCols
+
i
),
sut
.
pBSmaCols
[
i
]);
ASSERT_EQ
(
*
(
col_id_t
*
)(
sut
.
pBSmaCols
+
i
),
dut
.
pBSmaCols
[
i
]);
}
}
#if 1
...
...
@@ -115,7 +115,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
// decode
STSmaWrapper
dstTSmaWrapper
=
{
0
};
void
*
result
=
tDecodeTSmaWrapper
(
pSW
,
&
dstTSmaWrapper
);
void
*
result
=
tDecodeTSmaWrapper
(
pSW
,
&
dstTSmaWrapper
);
ASSERT_NE
(
result
,
nullptr
);
ASSERT_EQ
(
tSmaWrapper
.
number
,
dstTSmaWrapper
.
number
);
...
...
@@ -148,12 +148,12 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
#if 1
TEST
(
testCase
,
tSma_metaDB_Put_Get_Del_Test
)
{
const
char
*
smaIndexName1
=
"sma_index_test_1"
;
const
char
*
smaIndexName2
=
"sma_index_test_2"
;
const
char
*
smaIndexName1
=
"sma_index_test_1"
;
const
char
*
smaIndexName2
=
"sma_index_test_2"
;
int8_t
timezone
=
8
;
const
char
*
expr
=
"select count(a,b, top 20), from table interval 1d, sliding 1h;"
;
const
char
*
tagsFilter
=
"I'm tags filter"
;
const
char
*
smaTestDir
=
"./smaTest"
;
const
char
*
expr
=
"select count(a,b, top 20), from table interval 1d, sliding 1h;"
;
const
char
*
tagsFilter
=
"I'm tags filter"
;
const
char
*
smaTestDir
=
"./smaTest"
;
const
tb_uid_t
tbUid
=
1234567890
;
const
int64_t
indexUid1
=
2000000001
;
const
int64_t
indexUid2
=
2000000002
;
...
...
@@ -180,8 +180,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
ASSERT_NE
(
tSma
.
tagsFilter
,
nullptr
);
tstrncpy
(
tSma
.
tagsFilter
,
tagsFilter
,
tSma
.
tagsFilterLen
+
1
);
SMeta
*
pMeta
=
NULL
;
STSma
*
pSmaCfg
=
&
tSma
;
SMeta
*
pMeta
=
NULL
;
STSma
*
pSmaCfg
=
&
tSma
;
const
SMetaCfg
*
pMetaCfg
=
&
defaultMetaOptions
;
taosRemoveDir
(
smaTestDir
);
...
...
@@ -283,11 +283,11 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
#if 1
TEST
(
testCase
,
tSma_Data_Insert_Query_Test
)
{
// step 1: prepare meta
const
char
*
smaIndexName1
=
"sma_index_test_1"
;
const
char
*
smaIndexName1
=
"sma_index_test_1"
;
const
int8_t
timezone
=
8
;
const
char
*
expr
=
"select count(a,b, top 20), from table interval 1d, sliding 1h;"
;
const
char
*
tagsFilter
=
"where tags.location='Beijing' and tags.district='ChaoYang'"
;
const
char
*
smaTestDir
=
"./smaTest"
;
const
char
*
expr
=
"select count(a,b, top 20), from table interval 1d, sliding 1h;"
;
const
char
*
tagsFilter
=
"where tags.location='Beijing' and tags.district='ChaoYang'"
;
const
char
*
smaTestDir
=
"./smaTest"
;
const
tb_uid_t
tbUid
=
1234567890
;
const
int64_t
indexUid1
=
2000000001
;
const
int64_t
interval1
=
1
;
...
...
@@ -302,7 +302,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
tSma
.
intervalUnit
=
TIME_UNIT_DAY
;
tSma
.
interval
=
1
;
tSma
.
slidingUnit
=
TIME_UNIT_HOUR
;
tSma
.
sliding
=
0
;
tSma
.
sliding
=
1
;
// sliding = interval when it's convert window
tSma
.
indexUid
=
indexUid1
;
tstrncpy
(
tSma
.
indexName
,
smaIndexName1
,
TSDB_INDEX_NAME_LEN
);
tSma
.
timezoneInt
=
timezone
;
...
...
@@ -318,8 +318,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
ASSERT_NE
(
tSma
.
tagsFilter
,
nullptr
);
tstrncpy
(
tSma
.
tagsFilter
,
tagsFilter
,
tSma
.
tagsFilterLen
+
1
);
SMeta
*
pMeta
=
NULL
;
STSma
*
pSmaCfg
=
&
tSma
;
SMeta
*
pMeta
=
NULL
;
STSma
*
pSmaCfg
=
&
tSma
;
const
SMetaCfg
*
pMetaCfg
=
&
defaultMetaOptions
;
taosRemoveDir
(
smaTestDir
);
...
...
@@ -331,8 +331,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
// step 2: insert data
STSmaDataWrapper
*
pSmaData
=
NULL
;
STsdb
*
pTsdb
=
(
STsdb
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STsdb
));
STsdbCfg
*
pCfg
=
&
pTsdb
->
config
;
STsdb
*
pTsdb
=
(
STsdb
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STsdb
));
STsdbCfg
*
pCfg
=
&
pTsdb
->
config
;
pTsdb
->
pMeta
=
pMeta
;
pTsdb
->
vgId
=
2
;
...
...
@@ -367,15 +367,49 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
pTsdb
->
pTfs
=
tfsOpen
(
&
pDisks
,
numOfDisks
);
ASSERT_NE
(
pTsdb
->
pTfs
,
nullptr
);
char
*
msg
=
(
char
*
)
taosMemoryCalloc
(
1
,
100
);
ASSERT_NE
(
msg
,
nullptr
);
ASSERT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
msg
),
0
);
// generate SSubmitReq msg and update expired window
int16_t
schemaVer
=
0
;
uint32_t
mockRowLen
=
sizeof
(
STSRow
);
uint32_t
mockRowNum
=
2
;
uint32_t
mockBlkNum
=
2
;
uint32_t
msgLen
=
sizeof
(
SSubmitReq
)
+
mockBlkNum
*
sizeof
(
SSubmitBlk
)
+
mockBlkNum
*
mockRowNum
*
mockRowLen
;
SSubmitReq
*
pMsg
=
(
SSubmitReq
*
)
taosMemoryCalloc
(
1
,
msgLen
);
ASSERT_NE
(
pMsg
,
nullptr
);
pMsg
->
version
=
htobe64
(
schemaVer
);
pMsg
->
numOfBlocks
=
htonl
(
mockBlkNum
);
pMsg
->
length
=
htonl
(
msgLen
);
SSubmitBlk
*
pBlk
=
NULL
;
STSRow
*
pRow
=
NULL
;
TSKEY
now
=
taosGetTimestamp
(
pTsdb
->
config
.
precision
);
for
(
uint32_t
b
=
0
;
b
<
mockBlkNum
;
++
b
)
{
pBlk
=
(
SSubmitBlk
*
)
POINTER_SHIFT
(
pMsg
,
sizeof
(
SSubmitReq
)
+
b
*
(
sizeof
(
SSubmitBlk
)
+
mockRowNum
*
mockRowLen
));
pBlk
->
uid
=
htobe64
(
tbUid
);
pBlk
->
suid
=
htobe64
(
tbUid
);
pBlk
->
sversion
=
htonl
(
schemaVer
);
pBlk
->
padding
=
htonl
(
0
);
pBlk
->
schemaLen
=
htonl
(
0
);
pBlk
->
numOfRows
=
htons
(
mockRowNum
);
pBlk
->
dataLen
=
htonl
(
mockRowNum
*
mockRowLen
);
for
(
uint32_t
r
=
0
;
r
<
mockRowNum
;
++
r
)
{
pRow
=
(
STSRow
*
)
POINTER_SHIFT
(
pBlk
,
sizeof
(
SSubmitBlk
)
+
r
*
mockRowLen
);
pRow
->
len
=
mockRowLen
;
pRow
->
ts
=
now
+
b
*
1000
+
r
*
1000
;
pRow
->
sver
=
schemaVer
;
}
}
ASSERT_EQ
(
tdScanAndConvertSubmitMsg
(
pMsg
),
TSDB_CODE_SUCCESS
);
ASSERT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
(
const
char
*
)
pMsg
),
0
);
// init
int32_t
allocCnt
=
0
;
int32_t
allocStep
=
16384
;
int32_t
buffer
=
1024
;
void
*
buf
=
NULL
;
void
*
buf
=
NULL
;
ASSERT_EQ
(
tsdbMakeRoom
(
&
buf
,
allocStep
),
0
);
int32_t
bufSize
=
taosTSizeof
(
buf
);
int32_t
numOfTables
=
10
;
...
...
@@ -443,7 +477,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
printf
(
"%s:%d The sma data check count for insert and query is %"
PRIu32
"
\n
"
,
__FILE__
,
__LINE__
,
checkDataCnt
);
// release data
taosMemoryFreeClear
(
m
sg
);
taosMemoryFreeClear
(
pM
sg
);
taosTZfree
(
buf
);
// release meta
tdDestroyTSma
(
&
tSma
);
...
...
@@ -451,6 +485,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
tsdbClose
(
pTsdb
);
metaClose
(
pMeta
);
}
#endif
#pragma GCC diagnostic pop
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录