Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e435bcc6
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e435bcc6
编写于
5月 27, 2022
作者:
C
Cary Xu
提交者:
GitHub
5月 27, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13104 from taosdata/feature/TD-11274-3.0
feat: build submit blk by group id in rsma result
上级
38d69cc1
1051277a
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
72 addition
and
26 deletion
+72
-26
include/common/tdatablock.h
include/common/tdatablock.h
+1
-2
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+10
-11
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+8
-10
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+2
-2
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/script/tsim/sma/rsmaCreateInsertQuery.sim
tests/script/tsim/sma/rsmaCreateInsertQuery.sim
+43
-0
tests/script/tsim/sma/tsmaCreateInsertData.sim
tests/script/tsim/sma/tsmaCreateInsertData.sim
+7
-0
未找到文件。
include/common/tdatablock.h
浏览文件 @
e435bcc6
...
...
@@ -230,7 +230,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void
blockDebugShowData
(
const
SArray
*
dataBlocks
);
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq
**
pReq
,
const
SArray
*
pDataBlocks
,
STSchema
*
pTSchema
,
int32_t
vgId
,
tb_uid_t
uid
,
tb_uid_t
suid
);
tb_uid_t
suid
);
SSubmitReq
*
tdBlockToSubmit
(
const
SArray
*
pBlocks
,
const
STSchema
*
pSchema
,
bool
createTb
,
int64_t
suid
,
const
char
*
stbFullName
,
int32_t
vgId
);
...
...
@@ -299,4 +299,3 @@ static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* da
#endif
#endif
/*_TD_COMMON_EP_H_*/
source/common/src/tdatablock.c
浏览文件 @
e435bcc6
...
...
@@ -1508,14 +1508,11 @@ void blockDebugShowData(const SArray* dataBlocks) {
* @param pReq
* @param pDataBlocks
* @param vgId
* @param uid set as parameter temporarily // TODO: remove this parameter, and the executor should set uid in
* SDataBlock->info.uid
* @param suid // TODO: check with Liao whether suid response is reasonable
*
* TODO: colId should be set
*/
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq
**
pReq
,
const
SArray
*
pDataBlocks
,
STSchema
*
pTSchema
,
int32_t
vgId
,
tb_uid_t
uid
,
tb_uid_t
suid
)
{
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq
**
pReq
,
const
SArray
*
pDataBlocks
,
STSchema
*
pTSchema
,
int32_t
vgId
,
tb_uid_t
suid
)
{
int32_t
sz
=
taosArrayGetSize
(
pDataBlocks
);
int32_t
bufSize
=
sizeof
(
SSubmitReq
);
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
...
...
@@ -1551,7 +1548,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
SSubmitBlk
*
pSubmitBlk
=
POINTER_SHIFT
(
pDataBuf
,
msgLen
);
pSubmitBlk
->
suid
=
suid
;
pSubmitBlk
->
uid
=
ui
d
;
pSubmitBlk
->
uid
=
pDataBlock
->
info
.
groupI
d
;
pSubmitBlk
->
numOfRows
=
rows
;
++
numOfBlks
;
...
...
@@ -1562,6 +1559,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
tdSRowResetBuf
(
&
rb
,
POINTER_SHIFT
(
pDataBuf
,
msgLen
));
// set row buf
printf
(
"|"
);
bool
isStartKey
=
false
;
int32_t
offset
=
0
;
for
(
int32_t
k
=
0
;
k
<
colNum
;
++
k
)
{
// iterate by column
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
...
...
@@ -1570,18 +1568,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
if
(
!
isStartKey
)
{
isStartKey
=
true
;
tdAppendColValToRow
(
&
rb
,
PRIMARYKEY_TIMESTAMP_COL_ID
,
TSDB_DATA_TYPE_TIMESTAMP
,
TD_VTYPE_NORM
,
var
,
true
,
0
,
0
);
offset
,
k
);
}
else
{
tdAppendColValToRow
(
&
rb
,
2
,
TSDB_DATA_TYPE_TIMESTAMP
,
TD_VTYPE_NORM
,
var
,
true
,
8
,
k
);
break
;
tdAppendColValToRow
(
&
rb
,
2
,
TSDB_DATA_TYPE_TIMESTAMP
,
TD_VTYPE_NORM
,
var
,
true
,
offset
,
k
);
}
break
;
case
TSDB_DATA_TYPE_NCHAR
:
{
tdAppendColValToRow
(
&
rb
,
2
,
TSDB_DATA_TYPE_NCHAR
,
TD_VTYPE_NORM
,
var
,
true
,
8
,
k
);
tdAppendColValToRow
(
&
rb
,
2
,
TSDB_DATA_TYPE_NCHAR
,
TD_VTYPE_NORM
,
var
,
true
,
offset
,
k
);
break
;
}
case
TSDB_DATA_TYPE_VARCHAR
:
{
// TSDB_DATA_TYPE_BINARY
tdAppendColValToRow
(
&
rb
,
2
,
TSDB_DATA_TYPE_VARCHAR
,
TD_VTYPE_NORM
,
var
,
true
,
8
,
k
);
tdAppendColValToRow
(
&
rb
,
2
,
TSDB_DATA_TYPE_VARCHAR
,
TD_VTYPE_NORM
,
var
,
true
,
offset
,
k
);
break
;
}
case
TSDB_DATA_TYPE_VARBINARY
:
...
...
@@ -1593,13 +1591,14 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
break
;
default:
if
(
pColInfoData
->
info
.
type
<
TSDB_DATA_TYPE_MAX
&&
pColInfoData
->
info
.
type
>
TSDB_DATA_TYPE_NULL
)
{
tdAppendColValToRow
(
&
rb
,
2
,
pColInfoData
->
info
.
type
,
TD_VTYPE_NORM
,
var
,
true
,
8
,
k
);
tdAppendColValToRow
(
&
rb
,
2
,
pColInfoData
->
info
.
type
,
TD_VTYPE_NORM
,
var
,
true
,
offset
,
k
);
}
else
{
printf
(
"the column type %"
PRIi16
" is undefined
\n
"
,
pColInfoData
->
info
.
type
);
TASSERT
(
0
);
}
break
;
}
offset
+=
TYPE_BYTES
[
pColInfoData
->
info
.
type
];
}
dataLen
+=
TD_ROW_LEN
(
rb
.
pBuf
);
}
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
e435bcc6
...
...
@@ -18,7 +18,7 @@
static
FORCE_INLINE
int32_t
tdUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
FORCE_INLINE
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
static
FORCE_INLINE
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
qTaskInfo_t
*
taskInfo
,
STSchema
*
pTSchema
,
tb_uid_t
suid
,
tb_uid_t
uid
,
int8_t
level
);
STSchema
*
pTSchema
,
tb_uid_t
suid
,
int8_t
level
);
struct
SRSmaInfo
{
void
*
taskInfo
[
TSDB_RETENTION_L2
];
// qTaskInfo_t
...
...
@@ -364,7 +364,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
}
static
FORCE_INLINE
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
qTaskInfo_t
*
taskInfo
,
STSchema
*
pTSchema
,
tb_uid_t
suid
,
tb_uid_t
uid
,
int8_t
level
)
{
STSchema
*
pTSchema
,
tb_uid_t
suid
,
int8_t
level
)
{
SArray
*
pResult
=
NULL
;
if
(
!
taskInfo
)
{
...
...
@@ -399,7 +399,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
blockDebugShowData
(
pResult
);
STsdb
*
sinkTsdb
=
(
level
==
TSDB_RETENTION_L1
?
pSma
->
pRSmaTsdb1
:
pSma
->
pRSmaTsdb2
);
SSubmitReq
*
pReq
=
NULL
;
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
pTSchema
,
SMA_VID
(
pSma
),
uid
,
suid
)
!=
0
)
{
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
pTSchema
,
SMA_VID
(
pSma
),
suid
)
!=
0
)
{
taosArrayDestroy
(
pResult
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -418,15 +418,13 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tdExecuteRSma
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
tb_uid_t
suid
,
tb_uid_t
uid
)
{
static
int32_t
tdExecuteRSma
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
tb_uid_t
suid
)
{
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
if
(
!
pEnv
)
{
// only applicable when rsma env exists
return
TSDB_CODE_SUCCESS
;
}
ASSERT
(
uid
!=
0
);
// TODO: remove later
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
...
...
@@ -448,8 +446,8 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
terrno
=
TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION
;
return
TSDB_CODE_FAILED
;
}
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
pRSmaInfo
->
taskInfo
[
0
],
pTSchema
,
suid
,
uid
,
TSDB_RETENTION_L1
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
pRSmaInfo
->
taskInfo
[
1
],
pTSchema
,
suid
,
uid
,
TSDB_RETENTION_L2
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
pRSmaInfo
->
taskInfo
[
0
],
pTSchema
,
suid
,
TSDB_RETENTION_L1
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
pRSmaInfo
->
taskInfo
[
1
],
pTSchema
,
suid
,
TSDB_RETENTION_L2
);
taosMemoryFree
(
pTSchema
);
}
...
...
@@ -468,12 +466,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
tdFetchSubmitReqSuids
(
pMsg
,
&
uidStore
);
if
(
uidStore
.
suid
!=
0
)
{
tdExecuteRSma
(
pSma
,
pMsg
,
inputType
,
uidStore
.
suid
,
uidStore
.
uid
);
tdExecuteRSma
(
pSma
,
pMsg
,
inputType
,
uidStore
.
suid
);
void
*
pIter
=
taosHashIterate
(
uidStore
.
uidHash
,
NULL
);
while
(
pIter
)
{
tb_uid_t
*
pTbSuid
=
(
tb_uid_t
*
)
taosHashGetKey
(
pIter
,
NULL
);
tdExecuteRSma
(
pSma
,
pMsg
,
inputType
,
*
pTbSuid
,
0
);
tdExecuteRSma
(
pSma
,
pMsg
,
inputType
,
*
pTbSuid
);
pIter
=
taosHashIterate
(
uidStore
.
uidHash
,
pIter
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
e435bcc6
...
...
@@ -2040,7 +2040,7 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
blockDebugShowData
(
pResult
);
STsdb
*
sinkTsdb
=
(
level
==
TSDB_RETENTION_L1
?
pTsdb
->
pVnode
->
pRSma1
:
pTsdb
->
pVnode
->
pRSma2
);
SSubmitReq
*
pReq
=
NULL
;
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
pTSchema
,
TD_VID
(
pTsdb
->
pVnode
),
uid
,
suid
)
!=
0
)
{
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
pTSchema
,
TD_VID
(
pTsdb
->
pVnode
),
suid
)
!=
0
)
{
taosArrayDestroy
(
pResult
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -2083,7 +2083,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType
}
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
// TODO: use the proper schema instead of
0
, and cache STSchema in cache
// TODO: use the proper schema instead of
1
, and cache STSchema in cache
STSchema
*
pTSchema
=
metaGetTbTSchema
(
pTsdb
->
pVnode
->
pMeta
,
suid
,
1
);
if
(
!
pTSchema
)
{
terrno
=
TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION
;
...
...
tests/script/jenkins/basic.txt
浏览文件 @
e435bcc6
...
...
@@ -118,7 +118,7 @@
#./test.sh -f tsim/mnode/basic1.sim -m
# --- sma
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
#
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
# --- valgrind
...
...
tests/script/tsim/sma/rsmaCreateInsertQuery.sim
浏览文件 @
e435bcc6
...
...
@@ -37,6 +37,15 @@ if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print retention level 2 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 1 from memory
sql select * from ct1 where ts > now-8d;
print $data00 $data01
...
...
@@ -44,15 +53,30 @@ if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print retention level 1 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 0 from memory
sql select * from ct1 where ts > now-3d;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
#===================================================================
...
...
@@ -68,6 +92,13 @@ if $rows > 2 then
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print retention level 2 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 1 from file
sql select * from ct1 where ts > now-8d;
print $data00 $data01
...
...
@@ -76,6 +107,13 @@ if $rows > 2 then
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print retention level 1 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 0 from file
sql select * from ct1 where ts > now-3d;
print $data00 $data01
...
...
@@ -86,4 +124,9 @@ if $rows < 1 then
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/tsim/sma/tsmaCreateInsertData.sim
浏览文件 @
e435bcc6
...
...
@@ -37,5 +37,12 @@ print =============== trigger stream to execute sma aggr task and insert sma dat
sql insert into ct1 values(now+5s, 20, 20.0, 30.0)
#===================================================================
print =============== select * from ct1 from memory
sql select * from ct1;
print $data00 $data01
if $rows != 5 then
print rows $rows != 5
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录