Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bb0b1fda
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bb0b1fda
编写于
9月 08, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into refact/tsdb_new_snapshot
上级
255f42e0
f9d73f69
变更
16
显示空白变更内容
内联
并排
Showing
16 changed file
with
856 addition
and
95 deletion
+856
-95
docs/zh/05-get-started/01-docker.md
docs/zh/05-get-started/01-docker.md
+1
-1
docs/zh/28-releases/02-tools.md
docs/zh/28-releases/02-tools.md
+0
-4
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+7
-6
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+5
-1
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+31
-15
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+29
-24
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+44
-43
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+6
-0
tests/script/tsim/column/table.sim
tests/script/tsim/column/table.sim
+1
-0
tests/script/tsim/stream/deleteSession.sim
tests/script/tsim/stream/deleteSession.sim
+532
-0
tests/script/tsim/stream/deleteState.sim
tests/script/tsim/stream/deleteState.sim
+198
-0
tests/script/tsim/stream/partitionbyColumnInterval.sim
tests/script/tsim/stream/partitionbyColumnInterval.sim
+0
-0
tests/script/tsim/stream/partitionbyColumnSession.sim
tests/script/tsim/stream/partitionbyColumnSession.sim
+0
-0
tests/script/tsim/stream/partitionbyColumnState.sim
tests/script/tsim/stream/partitionbyColumnState.sim
+0
-0
未找到文件。
docs/zh/05-get-started/01-docker.md
浏览文件 @
bb0b1fda
...
@@ -62,7 +62,7 @@ taos>
...
@@ -62,7 +62,7 @@ taos>
## 体验查询
## 体验查询
使用上述 taosBenchmark 插入数据后,可以在 TDengine CLI 输入查询命令,体验查询速度。
。
使用上述 taosBenchmark 插入数据后,可以在 TDengine CLI 输入查询命令,体验查询速度。
查询超级表下记录总条数:
查询超级表下记录总条数:
...
...
docs/zh/28-releases/02-tools.md
浏览文件 @
bb0b1fda
...
@@ -9,7 +9,3 @@ import Release from "/components/ReleaseV3";
...
@@ -9,7 +9,3 @@ import Release from "/components/ReleaseV3";
## 2.1.3
## 2.1.3
<Release
type=
"tools"
version=
"2.1.3"
/>
<Release
type=
"tools"
version=
"2.1.3"
/>
## 2.1.2
<Release
type=
"tools"
version=
"2.1.2"
/>
\ No newline at end of file
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
bb0b1fda
...
@@ -15,6 +15,7 @@
...
@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "mndDb.h"
#include "mndDb.h"
#include "mndCluster.h"
#include "mndDnode.h"
#include "mndDnode.h"
#include "mndOffset.h"
#include "mndOffset.h"
#include "mndPrivilege.h"
#include "mndPrivilege.h"
...
@@ -1714,18 +1715,18 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
...
@@ -1714,18 +1715,18 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
taosMemoryFree
(
buf
);
taosMemoryFree
(
buf
);
}
}
static
void
setInformationSchemaDbCfg
(
SDbObj
*
pDbObj
)
{
static
void
setInformationSchemaDbCfg
(
S
Mnode
*
pMnode
,
S
DbObj
*
pDbObj
)
{
tstrncpy
(
pDbObj
->
name
,
TSDB_INFORMATION_SCHEMA_DB
,
tListLen
(
pDbObj
->
name
));
tstrncpy
(
pDbObj
->
name
,
TSDB_INFORMATION_SCHEMA_DB
,
tListLen
(
pDbObj
->
name
));
pDbObj
->
createdTime
=
0
;
pDbObj
->
createdTime
=
mndGetClusterCreateTime
(
pMnode
)
;
pDbObj
->
cfg
.
numOfVgroups
=
0
;
pDbObj
->
cfg
.
numOfVgroups
=
0
;
pDbObj
->
cfg
.
strict
=
1
;
pDbObj
->
cfg
.
strict
=
1
;
pDbObj
->
cfg
.
replications
=
1
;
pDbObj
->
cfg
.
replications
=
1
;
pDbObj
->
cfg
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
pDbObj
->
cfg
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
}
}
static
void
setPerfSchemaDbCfg
(
SDbObj
*
pDbObj
)
{
static
void
setPerfSchemaDbCfg
(
S
Mnode
*
pMnode
,
S
DbObj
*
pDbObj
)
{
tstrncpy
(
pDbObj
->
name
,
TSDB_PERFORMANCE_SCHEMA_DB
,
tListLen
(
pDbObj
->
name
));
tstrncpy
(
pDbObj
->
name
,
TSDB_PERFORMANCE_SCHEMA_DB
,
tListLen
(
pDbObj
->
name
));
pDbObj
->
createdTime
=
0
;
pDbObj
->
createdTime
=
mndGetClusterCreateTime
(
pMnode
)
;
pDbObj
->
cfg
.
numOfVgroups
=
0
;
pDbObj
->
cfg
.
numOfVgroups
=
0
;
pDbObj
->
cfg
.
strict
=
1
;
pDbObj
->
cfg
.
strict
=
1
;
pDbObj
->
cfg
.
replications
=
1
;
pDbObj
->
cfg
.
replications
=
1
;
...
@@ -1756,7 +1757,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
...
@@ -1756,7 +1757,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
// Append the information_schema database into the result.
// Append the information_schema database into the result.
if
(
!
pShow
->
sysDbRsp
)
{
if
(
!
pShow
->
sysDbRsp
)
{
SDbObj
infoschemaDb
=
{
0
};
SDbObj
infoschemaDb
=
{
0
};
setInformationSchemaDbCfg
(
&
infoschemaDb
);
setInformationSchemaDbCfg
(
pMnode
,
&
infoschemaDb
);
size_t
numOfTables
=
0
;
size_t
numOfTables
=
0
;
getVisibleInfosTablesNum
(
sysinfo
,
&
numOfTables
);
getVisibleInfosTablesNum
(
sysinfo
,
&
numOfTables
);
mndDumpDbInfoData
(
pMnode
,
pBlock
,
&
infoschemaDb
,
pShow
,
numOfRows
,
numOfTables
,
true
,
0
,
1
);
mndDumpDbInfoData
(
pMnode
,
pBlock
,
&
infoschemaDb
,
pShow
,
numOfRows
,
numOfTables
,
true
,
0
,
1
);
...
@@ -1764,7 +1765,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
...
@@ -1764,7 +1765,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
numOfRows
+=
1
;
numOfRows
+=
1
;
SDbObj
perfschemaDb
=
{
0
};
SDbObj
perfschemaDb
=
{
0
};
setPerfSchemaDbCfg
(
&
perfschemaDb
);
setPerfSchemaDbCfg
(
pMnode
,
&
perfschemaDb
);
numOfTables
=
0
;
numOfTables
=
0
;
getPerfDbMeta
(
NULL
,
&
numOfTables
);
getPerfDbMeta
(
NULL
,
&
numOfTables
);
mndDumpDbInfoData
(
pMnode
,
pBlock
,
&
perfschemaDb
,
pShow
,
numOfRows
,
numOfTables
,
true
,
0
,
1
);
mndDumpDbInfoData
(
pMnode
,
pBlock
,
&
perfschemaDb
,
pShow
,
numOfRows
,
numOfTables
,
true
,
0
,
1
);
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
bb0b1fda
...
@@ -650,6 +650,8 @@ typedef struct SSttBlockLoadInfo {
...
@@ -650,6 +650,8 @@ typedef struct SSttBlockLoadInfo {
SArray
*
aSttBlk
;
SArray
*
aSttBlk
;
int32_t
blockIndex
[
2
];
// to denote the loaded block in the corresponding position.
int32_t
blockIndex
[
2
];
// to denote the loaded block in the corresponding position.
int32_t
currentLoadBlockIndex
;
int32_t
currentLoadBlockIndex
;
int32_t
loadBlocks
;
double
elapsedTime
;
}
SSttBlockLoadInfo
;
}
SSttBlockLoadInfo
;
typedef
struct
SMergeTree
{
typedef
struct
SMergeTree
{
...
@@ -659,6 +661,7 @@ typedef struct SMergeTree {
...
@@ -659,6 +661,7 @@ typedef struct SMergeTree {
SLDataIter
*
pIter
;
SLDataIter
*
pIter
;
bool
destroyLoadInfo
;
bool
destroyLoadInfo
;
SSttBlockLoadInfo
*
pLoadInfo
;
SSttBlockLoadInfo
*
pLoadInfo
;
const
char
*
idStr
;
}
SMergeTree
;
}
SMergeTree
;
typedef
struct
{
typedef
struct
{
...
@@ -668,7 +671,7 @@ typedef struct {
...
@@ -668,7 +671,7 @@ typedef struct {
}
SSkmInfo
;
}
SSkmInfo
;
int32_t
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
suid
,
uint64_t
uid
,
int32_t
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
suid
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
,
void
*
pLoadInfo
);
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
,
void
*
pLoadInfo
,
const
char
*
idStr
);
void
tMergeTreeAddIter
(
SMergeTree
*
pMTree
,
SLDataIter
*
pIter
);
void
tMergeTreeAddIter
(
SMergeTree
*
pMTree
,
SLDataIter
*
pIter
);
bool
tMergeTreeNext
(
SMergeTree
*
pMTree
);
bool
tMergeTreeNext
(
SMergeTree
*
pMTree
);
TSDBROW
tMergeTreeGetRow
(
SMergeTree
*
pMTree
);
TSDBROW
tMergeTreeGetRow
(
SMergeTree
*
pMTree
);
...
@@ -676,6 +679,7 @@ void tMergeTreeClose(SMergeTree *pMTree);
...
@@ -676,6 +679,7 @@ void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo
*
tCreateLastBlockLoadInfo
();
SSttBlockLoadInfo
*
tCreateLastBlockLoadInfo
();
void
resetLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
);
void
resetLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
);
void
getLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
,
int64_t
*
blocks
,
double
*
el
);
void
*
destroyLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
);
void
*
destroyLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
);
// ========== inline functions ==========
// ========== inline functions ==========
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
bb0b1fda
...
@@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
...
@@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
tMergeTreeOpen
(
&
state
->
mergeTree
,
1
,
state
->
pDataFReader
,
state
->
suid
,
state
->
uid
,
tMergeTreeOpen
(
&
state
->
mergeTree
,
1
,
state
->
pDataFReader
,
state
->
suid
,
state
->
uid
,
&
(
STimeWindow
){.
skey
=
TSKEY_MIN
,
.
ekey
=
TSKEY_MAX
},
&
(
STimeWindow
){.
skey
=
TSKEY_MIN
,
.
ekey
=
TSKEY_MAX
},
&
(
SVersionRange
){.
minVer
=
0
,
.
maxVer
=
UINT64_MAX
},
NULL
);
&
(
SVersionRange
){.
minVer
=
0
,
.
maxVer
=
UINT64_MAX
},
NULL
,
NULL
);
bool
hasVal
=
tMergeTreeNext
(
&
state
->
mergeTree
);
bool
hasVal
=
tMergeTreeNext
(
&
state
->
mergeTree
);
if
(
!
hasVal
)
{
if
(
!
hasVal
)
{
state
->
state
=
SFSLASTNEXTROW_FILESET
;
state
->
state
=
SFSLASTNEXTROW_FILESET
;
...
...
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
浏览文件 @
bb0b1fda
...
@@ -67,6 +67,16 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
...
@@ -67,6 +67,16 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
pLoadInfo
[
i
].
blockIndex
[
1
]
=
-
1
;
pLoadInfo
[
i
].
blockIndex
[
1
]
=
-
1
;
taosArrayClear
(
pLoadInfo
[
i
].
aSttBlk
);
taosArrayClear
(
pLoadInfo
[
i
].
aSttBlk
);
pLoadInfo
[
i
].
elapsedTime
=
0
;
pLoadInfo
[
i
].
loadBlocks
=
0
;
}
}
void
getLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
,
int64_t
*
blocks
,
double
*
el
)
{
for
(
int32_t
i
=
0
;
i
<
TSDB_DEFAULT_STT_FILE
;
++
i
)
{
*
el
+=
pLoadInfo
[
i
].
elapsedTime
;
*
blocks
+=
pLoadInfo
[
i
].
loadBlocks
;
}
}
}
}
...
@@ -86,7 +96,7 @@ void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
...
@@ -86,7 +96,7 @@ void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
return
NULL
;
return
NULL
;
}
}
static
SBlockData
*
load
BlockIfMissing
(
SLDataIter
*
pIte
r
)
{
static
SBlockData
*
load
LastBlock
(
SLDataIter
*
pIter
,
const
char
*
idSt
r
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SSttBlockLoadInfo
*
pInfo
=
pIter
->
pBlockLoadInfo
;
SSttBlockLoadInfo
*
pInfo
=
pIter
->
pBlockLoadInfo
;
...
@@ -100,8 +110,13 @@ static SBlockData* loadBlockIfMissing(SLDataIter *pIter) {
...
@@ -100,8 +110,13 @@ static SBlockData* loadBlockIfMissing(SLDataIter *pIter) {
pInfo
->
currentLoadBlockIndex
^=
1
;
pInfo
->
currentLoadBlockIndex
^=
1
;
if
(
pIter
->
pSttBlk
!=
NULL
)
{
// current block not loaded yet
if
(
pIter
->
pSttBlk
!=
NULL
)
{
// current block not loaded yet
int64_t
st
=
taosGetTimestampUs
();
code
=
tsdbReadSttBlock
(
pIter
->
pReader
,
pIter
->
iStt
,
pIter
->
pSttBlk
,
&
pInfo
->
blockData
[
pInfo
->
currentLoadBlockIndex
]);
code
=
tsdbReadSttBlock
(
pIter
->
pReader
,
pIter
->
iStt
,
pIter
->
pSttBlk
,
&
pInfo
->
blockData
[
pInfo
->
currentLoadBlockIndex
]);
tsdbDebug
(
"read last block, index:%d, last file index:%d"
,
pIter
->
iSttBlk
,
pIter
->
iStt
);
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pInfo
->
elapsedTime
+=
el
;
pInfo
->
loadBlocks
+=
1
;
tsdbDebug
(
"read last block, index:%d, last file index:%d, elapsed time:%.2f ms, %s"
,
pIter
->
iSttBlk
,
pIter
->
iStt
,
el
,
idStr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_exit
;
goto
_exit
;
}
}
...
@@ -245,9 +260,8 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
...
@@ -245,9 +260,8 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
size_t
size
=
taosArrayGetSize
(
pBlockLoadInfo
->
aSttBlk
);
size_t
size
=
taosArrayGetSize
(
pBlockLoadInfo
->
aSttBlk
);
// find the start block
// find the start block
int32_t
index
=
binarySearchForStartBlock
(
pBlockLoadInfo
->
aSttBlk
->
pData
,
size
,
uid
,
backward
);
(
*
pIter
)
->
iSttBlk
=
binarySearchForStartBlock
(
pBlockLoadInfo
->
aSttBlk
->
pData
,
size
,
uid
,
backward
);
(
*
pIter
)
->
iSttBlk
=
index
;
if
((
*
pIter
)
->
iSttBlk
!=
-
1
)
{
if
(
index
!=
-
1
)
{
(
*
pIter
)
->
pSttBlk
=
taosArrayGet
(
pBlockLoadInfo
->
aSttBlk
,
(
*
pIter
)
->
iSttBlk
);
(
*
pIter
)
->
pSttBlk
=
taosArrayGet
(
pBlockLoadInfo
->
aSttBlk
,
(
*
pIter
)
->
iSttBlk
);
(
*
pIter
)
->
iRow
=
((
*
pIter
)
->
backward
)
?
(
*
pIter
)
->
pSttBlk
->
nRow
:
-
1
;
(
*
pIter
)
->
iRow
=
((
*
pIter
)
->
backward
)
?
(
*
pIter
)
->
pSttBlk
->
nRow
:
-
1
;
}
}
...
@@ -265,7 +279,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
...
@@ -265,7 +279,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
pIter
->
iSttBlk
+=
step
;
pIter
->
iSttBlk
+=
step
;
int32_t
index
=
-
1
;
int32_t
index
=
-
1
;
size_t
size
=
pIter
->
pBlockLoadInfo
->
aSttBlk
->
size
;
//taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk);
size_t
size
=
pIter
->
pBlockLoadInfo
->
aSttBlk
->
size
;
for
(
int32_t
i
=
pIter
->
iSttBlk
;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
for
(
int32_t
i
=
pIter
->
iSttBlk
;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
SSttBlk
*
p
=
taosArrayGet
(
pIter
->
pBlockLoadInfo
->
aSttBlk
,
i
);
SSttBlk
*
p
=
taosArrayGet
(
pIter
->
pBlockLoadInfo
->
aSttBlk
,
i
);
if
((
!
pIter
->
backward
)
&&
p
->
minUid
>
pIter
->
uid
)
{
if
((
!
pIter
->
backward
)
&&
p
->
minUid
>
pIter
->
uid
)
{
...
@@ -310,13 +324,13 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
...
@@ -310,13 +324,13 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
}
}
}
}
static
void
findNextValidRow
(
SLDataIter
*
pIter
)
{
static
void
findNextValidRow
(
SLDataIter
*
pIter
,
const
char
*
idStr
)
{
int32_t
step
=
pIter
->
backward
?
-
1
:
1
;
int32_t
step
=
pIter
->
backward
?
-
1
:
1
;
bool
hasVal
=
false
;
bool
hasVal
=
false
;
int32_t
i
=
pIter
->
iRow
;
int32_t
i
=
pIter
->
iRow
;
SBlockData
*
pBlockData
=
load
BlockIfMissing
(
pIte
r
);
SBlockData
*
pBlockData
=
load
LastBlock
(
pIter
,
idSt
r
);
// mostly we only need to find the start position for a given table
// mostly we only need to find the start position for a given table
if
((((
i
==
0
)
&&
(
!
pIter
->
backward
))
||
(
i
==
pBlockData
->
nRow
-
1
&&
pIter
->
backward
))
&&
pBlockData
->
aUid
!=
NULL
)
{
if
((((
i
==
0
)
&&
(
!
pIter
->
backward
))
||
(
i
==
pBlockData
->
nRow
-
1
&&
pIter
->
backward
))
&&
pBlockData
->
aUid
!=
NULL
)
{
...
@@ -376,7 +390,7 @@ static void findNextValidRow(SLDataIter *pIter) {
...
@@ -376,7 +390,7 @@ static void findNextValidRow(SLDataIter *pIter) {
pIter
->
iRow
=
(
hasVal
)
?
i
:
-
1
;
pIter
->
iRow
=
(
hasVal
)
?
i
:
-
1
;
}
}
bool
tLDataIterNextRow
(
SLDataIter
*
pIter
)
{
bool
tLDataIterNextRow
(
SLDataIter
*
pIter
,
const
char
*
idStr
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
step
=
pIter
->
backward
?
-
1
:
1
;
int32_t
step
=
pIter
->
backward
?
-
1
:
1
;
...
@@ -386,11 +400,11 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
...
@@ -386,11 +400,11 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
}
}
int32_t
iBlockL
=
pIter
->
iSttBlk
;
int32_t
iBlockL
=
pIter
->
iSttBlk
;
SBlockData
*
pBlockData
=
load
BlockIfMissing
(
pIte
r
);
SBlockData
*
pBlockData
=
load
LastBlock
(
pIter
,
idSt
r
);
pIter
->
iRow
+=
step
;
pIter
->
iRow
+=
step
;
while
(
1
)
{
while
(
1
)
{
findNextValidRow
(
pIter
);
findNextValidRow
(
pIter
,
idStr
);
if
(
pIter
->
iRow
>=
pBlockData
->
nRow
||
pIter
->
iRow
<
0
)
{
if
(
pIter
->
iRow
>=
pBlockData
->
nRow
||
pIter
->
iRow
<
0
)
{
tLDataIterNextBlock
(
pIter
);
tLDataIterNextBlock
(
pIter
);
...
@@ -402,7 +416,7 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
...
@@ -402,7 +416,7 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
}
}
if
(
iBlockL
!=
pIter
->
iSttBlk
)
{
if
(
iBlockL
!=
pIter
->
iSttBlk
)
{
pBlockData
=
load
BlockIfMissing
(
pIte
r
);
pBlockData
=
load
LastBlock
(
pIter
,
idSt
r
);
pIter
->
iRow
+=
step
;
pIter
->
iRow
+=
step
;
}
}
}
}
...
@@ -445,7 +459,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
...
@@ -445,7 +459,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
}
}
int32_t
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
suid
,
uint64_t
uid
,
int32_t
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
suid
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
,
void
*
pBlockLoadInfo
)
{
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
,
void
*
pBlockLoadInfo
,
const
char
*
idStr
)
{
pMTree
->
backward
=
backward
;
pMTree
->
backward
=
backward
;
pMTree
->
pIter
=
NULL
;
pMTree
->
pIter
=
NULL
;
pMTree
->
pIterList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pMTree
->
pIterList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
...
@@ -453,6 +467,8 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
...
@@ -453,6 +467,8 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
pMTree
->
idStr
=
idStr
;
tRBTreeCreate
(
&
pMTree
->
rbt
,
tLDataIterCmprFn
);
tRBTreeCreate
(
&
pMTree
->
rbt
,
tLDataIterCmprFn
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
@@ -475,7 +491,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
...
@@ -475,7 +491,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
goto
_end
;
goto
_end
;
}
}
bool
hasVal
=
tLDataIterNextRow
(
pIter
);
bool
hasVal
=
tLDataIterNextRow
(
pIter
,
pMTree
->
idStr
);
if
(
hasVal
)
{
if
(
hasVal
)
{
taosArrayPush
(
pMTree
->
pIterList
,
&
pIter
);
taosArrayPush
(
pMTree
->
pIterList
,
&
pIter
);
tMergeTreeAddIter
(
pMTree
,
pIter
);
tMergeTreeAddIter
(
pMTree
,
pIter
);
...
@@ -498,7 +514,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
...
@@ -498,7 +514,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
if
(
pMTree
->
pIter
)
{
if
(
pMTree
->
pIter
)
{
SLDataIter
*
pIter
=
pMTree
->
pIter
;
SLDataIter
*
pIter
=
pMTree
->
pIter
;
bool
hasVal
=
tLDataIterNextRow
(
pIter
);
bool
hasVal
=
tLDataIterNextRow
(
pIter
,
pMTree
->
idStr
);
if
(
!
hasVal
)
{
if
(
!
hasVal
)
{
pMTree
->
pIter
=
NULL
;
pMTree
->
pIter
=
NULL
;
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
bb0b1fda
...
@@ -70,6 +70,8 @@ typedef struct SIOCostSummary {
...
@@ -70,6 +70,8 @@ typedef struct SIOCostSummary {
double
smaLoadTime
;
double
smaLoadTime
;
int64_t
lastBlockLoad
;
int64_t
lastBlockLoad
;
double
lastBlockLoadTime
;
double
lastBlockLoadTime
;
int64_t
composedBlocks
;
double
buildComposedBlockTime
;
}
SIOCostSummary
;
}
SIOCostSummary
;
typedef
struct
SBlockLoadSuppInfo
{
typedef
struct
SBlockLoadSuppInfo
{
...
@@ -365,6 +367,9 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
...
@@ -365,6 +367,9 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
return
false
;
return
false
;
}
}
SIOCostSummary
*
pSum
=
&
pReader
->
cost
;
getLastBlockLoadInfo
(
pIter
->
pLastBlockReader
->
pInfo
,
&
pSum
->
lastBlockLoad
,
&
pReader
->
cost
.
lastBlockLoadTime
);
pIter
->
pLastBlockReader
->
uid
=
0
;
pIter
->
pLastBlockReader
->
uid
=
0
;
tMergeTreeClose
(
&
pIter
->
pLastBlockReader
->
mergeTree
);
tMergeTreeClose
(
&
pIter
->
pLastBlockReader
->
mergeTree
);
resetLastBlockLoadInfo
(
pIter
->
pLastBlockReader
->
pInfo
);
resetLastBlockLoadInfo
(
pIter
->
pLastBlockReader
->
pInfo
);
...
@@ -1434,11 +1439,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
...
@@ -1434,11 +1439,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
tRowMerge
(
&
merge
,
&
fRow1
);
tRowMerge
(
&
merge
,
&
fRow1
);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
tsLastBlock
,
&
merge
);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
tsLastBlock
,
&
merge
);
// merge with block data if ts == key
if
(
mergeBlockData
&&
(
tsLastBlock
==
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
]))
{
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
}
int32_t
code
=
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
int32_t
code
=
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
...
@@ -1452,9 +1452,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
...
@@ -1452,9 +1452,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
}
else
{
// not merge block data
}
else
{
// not merge block data
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
tsLastBlock
,
&
merge
);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
tsLastBlock
,
&
merge
);
ASSERT
(
mergeBlockData
);
// merge with block data if ts == key
// merge with block data if ts == key
if
(
mergeBlockData
&&
(
tsLastBlock
==
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
])
)
{
if
(
tsLastBlock
==
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
]
)
{
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
}
}
...
@@ -1942,7 +1943,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
...
@@ -1942,7 +1943,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
int32_t
code
=
int32_t
code
=
tMergeTreeOpen
(
&
pLBlockReader
->
mergeTree
,
(
pLBlockReader
->
order
==
TSDB_ORDER_DESC
),
pReader
->
pFileReader
,
tMergeTreeOpen
(
&
pLBlockReader
->
mergeTree
,
(
pLBlockReader
->
order
==
TSDB_ORDER_DESC
),
pReader
->
pFileReader
,
pReader
->
suid
,
pScanInfo
->
uid
,
&
w
,
&
pLBlockReader
->
verRange
,
pLBlockReader
->
pInfo
);
pReader
->
suid
,
pScanInfo
->
uid
,
&
w
,
&
pLBlockReader
->
verRange
,
pLBlockReader
->
pInfo
,
pReader
->
idStr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
false
;
return
false
;
}
}
...
@@ -1982,8 +1983,6 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
...
@@ -1982,8 +1983,6 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
tRowMergerClear
(
&
merge
);
tRowMergerClear
(
&
merge
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
buildComposedDataBlockImpl
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
static
int32_t
buildComposedDataBlockImpl
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
...
@@ -2076,13 +2075,16 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
...
@@ -2076,13 +2075,16 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
blockDataUpdateTsWindow
(
pResBlock
,
0
);
blockDataUpdateTsWindow
(
pResBlock
,
0
);
setComposedBlockFlag
(
pReader
,
true
);
setComposedBlockFlag
(
pReader
,
true
);
int64_t
et
=
taosGetTimestampUs
();
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pReader
->
cost
.
composedBlocks
+=
1
;
pReader
->
cost
.
buildComposedBlockTime
+=
el
;
if
(
pResBlock
->
info
.
rows
>
0
)
{
if
(
pResBlock
->
info
.
rows
>
0
)
{
tsdbDebug
(
"%p uid:%"
PRIu64
", composed data block created, brange:%"
PRIu64
"-%"
PRIu64
tsdbDebug
(
"%p uid:%"
PRIu64
", composed data block created, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d, elapsed time:%.2f ms %s"
,
" rows:%d, elapsed time:%.2f ms %s"
,
pReader
,
pBlockScanInfo
->
uid
,
pResBlock
->
info
.
window
.
skey
,
pResBlock
->
info
.
window
.
ekey
,
pReader
,
pBlockScanInfo
->
uid
,
pResBlock
->
info
.
window
.
skey
,
pResBlock
->
info
.
window
.
ekey
,
pResBlock
->
info
.
rows
,
(
et
-
st
)
/
1000
.
0
,
pReader
->
idStr
);
pResBlock
->
info
.
rows
,
el
,
pReader
->
idStr
);
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -3364,24 +3366,27 @@ void tsdbReaderClose(STsdbReader* pReader) {
...
@@ -3364,24 +3366,27 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbUntakeReadSnap
(
pReader
->
pTsdb
,
pReader
->
pReadSnap
);
tsdbUntakeReadSnap
(
pReader
->
pTsdb
,
pReader
->
pReadSnap
);
taosMemoryFree
(
pReader
->
status
.
uidCheckInfo
.
tableUidList
);
taosMemoryFree
(
pReader
->
status
.
uidCheckInfo
.
tableUidList
);
SIOCostSummary
*
pCost
=
&
pReader
->
cost
;
SFilesetIter
*
pFilesetIter
=
&
pReader
->
status
.
fileIter
;
SFilesetIter
*
pFilesetIter
=
&
pReader
->
status
.
fileIter
;
if
(
pFilesetIter
->
pLastBlockReader
!=
NULL
)
{
if
(
pFilesetIter
->
pLastBlockReader
!=
NULL
)
{
tMergeTreeClose
(
&
pFilesetIter
->
pLastBlockReader
->
mergeTree
);
SLastBlockReader
*
pLReader
=
pFilesetIter
->
pLastBlockReader
;
pFilesetIter
->
pLastBlockReader
->
pInfo
=
destroyLastBlockLoadInfo
(
pFilesetIter
->
pLastBlockReader
->
pInfo
);
tMergeTreeClose
(
&
pLReader
->
mergeTree
);
taosMemoryFree
(
pFilesetIter
->
pLastBlockReader
);
}
SIOCostSummary
*
pCost
=
&
pReader
->
cost
;
getLastBlockLoadInfo
(
pLReader
->
pInfo
,
&
pCost
->
lastBlockLoad
,
&
pCost
->
lastBlockLoadTime
);
pLReader
->
pInfo
=
destroyLastBlockLoadInfo
(
pLReader
->
pInfo
);
taosMemoryFree
(
pLReader
);
}
tsdbDebug
(
"%p :io-cost summary: head-file:%"
PRIu64
", head-file time:%.2f ms, SMA:%"
PRId64
tsdbDebug
(
" SMA-time:%.2f ms, fileBlocks
:%"
PRId64
"%p :io-cost summary: head-file:%"
PRIu64
", head-file time:%.2f ms, SMA
:%"
PRId64
", fileBlocks
-time:%.2f ms, "
" SMA-time:%.2f ms, fileBlocks:%"
PRId64
", fileBlocks-load
-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, lastB
locks:%"
PRId64
"build in-memory-block-time:%.2f ms, lastBlocks:%"
PRId64
", lastBlocks-time:%.2f ms, composed-b
locks:%"
PRId64
", lastBlocks-time:%.2f
ms, STableBlockScanInfo size:%.2f Kb %s"
,
", composed-blocks-time:%.2f
ms, STableBlockScanInfo size:%.2f Kb %s"
,
pReader
,
pCost
->
headFileLoad
,
pCost
->
headFileLoadTime
,
pCost
->
smaDataLoad
,
pCost
->
smaLoadTime
,
pReader
,
pCost
->
headFileLoad
,
pCost
->
headFileLoadTime
,
pCost
->
smaDataLoad
,
pCost
->
smaLoadTime
,
pCost
->
numOfBlocks
,
pCost
->
numOfBlocks
,
pCost
->
blockLoadTime
,
pCost
->
buildmemBlock
,
pCost
->
lastBlockLoad
,
pCost
->
blockLoadTime
,
pCost
->
buildmemBlock
,
pCost
->
lastBlockLoad
,
pCost
->
lastBlockLoadTime
,
pCost
->
composedBlocks
,
pCost
->
lastBlockLoad
Time
,
numOfTables
*
sizeof
(
STableBlockScanInfo
)
/
1000
.
0
,
pReader
->
idStr
);
pCost
->
buildComposedBlock
Time
,
numOfTables
*
sizeof
(
STableBlockScanInfo
)
/
1000
.
0
,
pReader
->
idStr
);
taosMemoryFree
(
pReader
->
idStr
);
taosMemoryFree
(
pReader
->
idStr
);
taosMemoryFree
(
pReader
->
pSchema
);
taosMemoryFree
(
pReader
->
pSchema
);
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
bb0b1fda
...
@@ -932,6 +932,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
...
@@ -932,6 +932,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case
STREAM_DELETE_DATA
:
{
case
STREAM_DELETE_DATA
:
{
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
return
pInfo
->
pDelRes
;
}
break
;
}
break
;
default:
default:
return
pBlock
;
return
pBlock
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
bb0b1fda
...
@@ -1424,7 +1424,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
...
@@ -1424,7 +1424,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
taosHashRemove
(
pUpdatedMap
,
&
winRes
,
sizeof
(
SWinKey
));
taosHashRemove
(
pUpdatedMap
,
&
winRes
,
sizeof
(
SWinKey
));
}
}
getNextTimeWindow
(
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
,
&
win
);
getNextTimeWindow
(
pInterval
,
pInterval
->
precision
,
TSDB_ORDER_ASC
,
&
win
);
}
while
(
win
.
skey
<
tsEnds
[
i
]);
}
while
(
win
.
skey
<
=
tsEnds
[
i
]);
}
}
}
}
...
@@ -3595,6 +3595,7 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) {
...
@@ -3595,6 +3595,7 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) {
// don't add new window
// don't add new window
SResultWindowInfo
*
getCurSessionWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
SResultWindowInfo
*
getCurSessionWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
)
{
int64_t
gap
,
int32_t
*
pIndex
)
{
STimeWindow
searchWin
=
{.
skey
=
startTs
,
.
ekey
=
endTs
};
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
pAggSup
->
pCurWins
=
pWinInfos
;
pAggSup
->
pCurWins
=
pWinInfos
;
...
@@ -3607,7 +3608,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
...
@@ -3607,7 +3608,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
SResultWindowInfo
*
pWin
=
NULL
;
SResultWindowInfo
*
pWin
=
NULL
;
if
(
index
>=
0
)
{
if
(
index
>=
0
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
);
pWin
=
taosArrayGet
(
pWinInfos
,
index
);
if
(
isInWindow
(
pWin
,
startTs
,
gap
))
{
if
(
isInWindow
(
pWin
,
startTs
,
gap
)
||
isInTimeWindow
(
&
searchWin
,
pWin
->
win
.
skey
,
gap
)
)
{
*
pIndex
=
index
;
*
pIndex
=
index
;
return
pWin
;
return
pWin
;
}
}
...
@@ -3615,7 +3616,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
...
@@ -3615,7 +3616,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
if
(
index
+
1
<
size
)
{
if
(
index
+
1
<
size
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
+
1
);
pWin
=
taosArrayGet
(
pWinInfos
,
index
+
1
);
if
(
isInWindow
(
pWin
,
startTs
,
gap
))
{
if
(
isInWindow
(
pWin
,
startTs
,
gap
)
||
isInTimeWindow
(
&
searchWin
,
pWin
->
win
.
skey
,
gap
)
)
{
*
pIndex
=
index
+
1
;
*
pIndex
=
index
+
1
;
return
pWin
;
return
pWin
;
}
else
if
(
endTs
!=
INT64_MIN
&&
isInWindow
(
pWin
,
endTs
,
gap
))
{
}
else
if
(
endTs
!=
INT64_MIN
&&
isInWindow
(
pWin
,
endTs
,
gap
))
{
...
@@ -3793,7 +3794,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
...
@@ -3793,7 +3794,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pCurWin
->
win
,
true
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pCurWin
->
win
,
true
);
compactFunctions
(
pSup
->
pCtx
,
pInfo
->
pDummyCtx
,
numOfOutput
,
pTaskInfo
,
&
pInfo
->
twAggSup
.
timeWindowData
);
compactFunctions
(
pSup
->
pCtx
,
pInfo
->
pDummyCtx
,
numOfOutput
,
pTaskInfo
,
&
pInfo
->
twAggSup
.
timeWindowData
);
taosHashRemove
(
pStUpdated
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
));
taosHashRemove
(
pStUpdated
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
));
if
(
pWinInfo
->
isOutput
)
{
if
(
pWinInfo
->
isOutput
&&
pStDeleted
)
{
SWinKey
res
=
{.
ts
=
pWinInfo
->
win
.
skey
,
.
groupId
=
groupId
};
SWinKey
res
=
{.
ts
=
pWinInfo
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStDeleted
,
&
res
,
sizeof
(
SWinKey
),
&
res
,
sizeof
(
SWinKey
));
taosHashPut
(
pStDeleted
,
&
res
,
sizeof
(
SWinKey
),
&
res
,
sizeof
(
SWinKey
));
pWinInfo
->
isOutput
=
false
;
pWinInfo
->
isOutput
=
false
;
...
@@ -3887,18 +3888,23 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
...
@@ -3887,18 +3888,23 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
uint64_t
*
gpDatas
=
(
uint64_t
*
)
pGroupCol
->
pData
;
uint64_t
*
gpDatas
=
(
uint64_t
*
)
pGroupCol
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
int32_t
winIndex
=
0
;
int32_t
winIndex
=
0
;
while
(
1
)
{
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
startDatas
[
i
],
endDatas
[
i
],
gpDatas
[
i
],
gap
,
&
winIndex
);
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
startDatas
[
i
],
endDatas
[
i
],
gpDatas
[
i
],
gap
,
&
winIndex
);
if
(
!
pCurWin
)
{
if
(
!
pCurWin
)
{
break
;
continue
;
}
}
do
{
SResultWindowInfo
delWin
=
*
pCurWin
;
SResultWindowInfo
delWin
=
*
pCurWin
;
deleteWindow
(
pAggSup
->
pCurWins
,
winIndex
,
fp
);
deleteWindow
(
pAggSup
->
pCurWins
,
winIndex
,
fp
);
if
(
result
)
{
if
(
result
)
{
delWin
.
groupId
=
gpDatas
[
i
];
delWin
.
groupId
=
gpDatas
[
i
];
taosArrayPush
(
result
,
&
delWin
);
taosArrayPush
(
result
,
&
delWin
);
}
}
if
(
winIndex
>=
taosArrayGetSize
(
pAggSup
->
pCurWins
))
{
break
;
}
}
pCurWin
=
taosArrayGet
(
pAggSup
->
pCurWins
,
winIndex
);
}
while
(
pCurWin
->
win
.
skey
<=
endDatas
[
i
]);
}
}
}
}
...
@@ -3979,26 +3985,16 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
...
@@ -3979,26 +3985,16 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
}
}
static
void
rebuildTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
SArray
*
pWinArray
,
int32_t
numOfOutput
,
static
void
rebuildTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
SArray
*
pWinArray
,
int32_t
numOfOutput
,
SOperatorInfo
*
pOperator
,
SHashObj
*
pStUpdated
,
bool
needCreate
)
{
SOperatorInfo
*
pOperator
,
SHashObj
*
pStUpdated
)
{
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
ASSERT
(
pInfo
->
pChildren
);
ASSERT
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SResultWindowInfo
*
pParentWin
=
taosArrayGet
(
pWinArray
,
i
);
SResultWindowInfo
*
pParentWin
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
uint64_t
groupId
=
pParentWin
->
groupId
;
uint64_t
groupId
=
pParentWin
->
groupId
;
int32_t
winIndex
=
0
;
if
(
needCreate
)
{
pParentWin
=
getSessionTimeWindow
(
&
pInfo
->
streamAggSup
,
pParentWin
->
win
.
skey
,
pParentWin
->
win
.
ekey
,
groupId
,
0
,
&
winIndex
);
}
setWindowOutputBuf
(
pParentWin
,
&
pCurResult
,
pSup
->
pCtx
,
groupId
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
streamAggSup
,
pTaskInfo
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
int32_t
num
=
0
;
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SStreamSessionAggOperatorInfo
*
pChInfo
=
pChild
->
info
;
SStreamSessionAggOperatorInfo
*
pChInfo
=
pChild
->
info
;
...
@@ -4011,31 +4007,36 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
...
@@ -4011,31 +4007,36 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
for
(
int32_t
k
=
index
;
k
<
chWinSize
;
k
++
)
{
for
(
int32_t
k
=
index
;
k
<
chWinSize
;
k
++
)
{
SResultWindowInfo
*
pChWin
=
taosArrayGet
(
pChWins
,
k
);
SResultWindowInfo
*
pChWin
=
taosArrayGet
(
pChWins
,
k
);
if
(
pParentWin
->
win
.
skey
<=
pChWin
->
win
.
skey
&&
pChWin
->
win
.
ekey
<=
pParentWin
->
win
.
ekey
)
{
if
(
pParentWin
->
win
.
skey
<=
pChWin
->
win
.
skey
&&
pChWin
->
win
.
ekey
<=
pParentWin
->
win
.
ekey
)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pNewParWin
=
getSessionTimeWindow
(
&
pInfo
->
streamAggSup
,
pChWin
->
win
.
skey
,
pChWin
->
win
.
ekey
,
groupId
,
0
,
&
winIndex
);
SResultRow
*
pPareResult
=
NULL
;
setWindowOutputBuf
(
pNewParWin
,
&
pPareResult
,
pSup
->
pCtx
,
groupId
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
streamAggSup
,
pTaskInfo
);
SResultRow
*
pChResult
=
NULL
;
SResultRow
*
pChResult
=
NULL
;
setWindowOutputBuf
(
pChWin
,
&
pChResult
,
pChild
->
exprSupp
.
pCtx
,
groupId
,
numOfOutput
,
setWindowOutputBuf
(
pChWin
,
&
pChResult
,
pChild
->
exprSupp
.
pCtx
,
groupId
,
numOfOutput
,
pChild
->
exprSupp
.
rowEntryInfoOffset
,
&
pChInfo
->
streamAggSup
,
pTaskInfo
);
pChild
->
exprSupp
.
rowEntryInfoOffset
,
&
pChInfo
->
streamAggSup
,
pTaskInfo
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
p
Ch
Win
->
win
,
true
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
p
NewPar
Win
->
win
,
true
);
compactFunctions
(
pSup
->
pCtx
,
pChild
->
exprSupp
.
pCtx
,
numOfOutput
,
pTaskInfo
,
&
pInfo
->
twAggSup
.
timeWindowData
);
compactFunctions
(
pSup
->
pCtx
,
pChild
->
exprSupp
.
pCtx
,
numOfOutput
,
pTaskInfo
,
&
pInfo
->
twAggSup
.
timeWindowData
);
int32_t
winNum
=
getNumCompactWindow
(
pInfo
->
streamAggSup
.
pCurWins
,
winIndex
,
pInfo
->
gap
);
if
(
winNum
>
0
)
{
compactTimeWindow
(
pInfo
,
winIndex
,
winNum
,
groupId
,
numOfOutput
,
pStUpdated
,
NULL
,
pOperator
);
}
SFilePage
*
bufPage
=
getBufPage
(
pChInfo
->
streamAggSup
.
pResultBuf
,
pChWin
->
pos
.
pageId
);
SFilePage
*
bufPage
=
getBufPage
(
pChInfo
->
streamAggSup
.
pResultBuf
,
pChWin
->
pos
.
pageId
);
releaseBufPage
(
pChInfo
->
streamAggSup
.
pResultBuf
,
bufPage
);
releaseBufPage
(
pChInfo
->
streamAggSup
.
pResultBuf
,
bufPage
);
num
++
;
continue
;
bufPage
=
getBufPage
(
pInfo
->
streamAggSup
.
pResultBuf
,
pNewParWin
->
pos
.
pageId
);
setBufPageDirty
(
bufPage
,
true
);
releaseBufPage
(
pInfo
->
streamAggSup
.
pResultBuf
,
bufPage
);
SWinKey
value
=
{.
ts
=
pNewParWin
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStUpdated
,
&
pNewParWin
->
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWinKey
));
}
else
if
(
!
pChWin
->
isClosed
)
{
}
else
if
(
!
pChWin
->
isClosed
)
{
break
;
break
;
}
}
}
}
}
}
if
(
num
==
0
&&
needCreate
)
{
deleteWindow
(
pInfo
->
streamAggSup
.
pCurWins
,
winIndex
,
NULL
);
}
if
(
pStUpdated
&&
num
>
0
)
{
SWinKey
value
=
{.
ts
=
pParentWin
->
win
.
skey
,
.
groupId
=
groupId
};
taosHashPut
(
pStUpdated
,
&
pParentWin
->
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWinKey
));
}
SFilePage
*
bufPage
=
getBufPage
(
pInfo
->
streamAggSup
.
pResultBuf
,
pParentWin
->
pos
.
pageId
);
ASSERT
(
size
>
0
);
setBufPageDirty
(
bufPage
,
true
);
releaseBufPage
(
pInfo
->
streamAggSup
.
pResultBuf
,
bufPage
);
}
}
}
}
...
@@ -4196,7 +4197,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
...
@@ -4196,7 +4197,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildOp
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildOp
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
pChildOp
->
exprSupp
.
numOfExprs
,
0
,
NULL
);
pChildOp
->
exprSupp
.
numOfExprs
,
0
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
,
NULL
,
false
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
,
pStUpdated
);
}
}
taosArrayDestroy
(
pWins
);
taosArrayDestroy
(
pWins
);
continue
;
continue
;
...
@@ -4210,7 +4211,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
...
@@ -4210,7 +4211,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
// gap must be 0
// gap must be 0
doDeleteTimeWindows
(
&
pChildInfo
->
streamAggSup
,
pBlock
,
0
,
NULL
,
NULL
);
doDeleteTimeWindows
(
&
pChildInfo
->
streamAggSup
,
pBlock
,
0
,
NULL
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
,
pStUpdated
,
true
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
,
pStUpdated
);
}
}
copyDeleteWindowInfo
(
pWins
,
pInfo
->
pStDeleted
);
copyDeleteWindowInfo
(
pWins
,
pInfo
->
pStDeleted
);
removeSessionResults
(
pStUpdated
,
pWins
);
removeSessionResults
(
pStUpdated
,
pWins
);
...
@@ -4747,7 +4748,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
...
@@ -4747,7 +4748,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
continue
;
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doDeleteTimeWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
pWins
,
destroyStateWinInfo
);
doDeleteTimeWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
pWins
,
destroyStateWinInfo
);
copyDeleteWindowInfo
(
pWins
,
pInfo
->
pSeDeleted
);
copyDeleteWindowInfo
(
pWins
,
pInfo
->
pSeDeleted
);
...
@@ -5674,7 +5675,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -5674,7 +5675,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
NULL
);
NULL
);
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
continue
;
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
doDeleteSpecifyIntervalWindow
(
&
pInfo
->
aggSup
,
pBlock
,
pInfo
->
pDelWins
,
&
pInfo
->
interval
,
pUpdatedMap
);
doDeleteSpecifyIntervalWindow
(
&
pInfo
->
aggSup
,
pBlock
,
pInfo
->
pDelWins
,
&
pInfo
->
interval
,
pUpdatedMap
);
continue
;
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
...
...
tests/script/jenkins/basic.txt
浏览文件 @
bb0b1fda
...
@@ -248,6 +248,12 @@
...
@@ -248,6 +248,12 @@
./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
./test.sh -f tsim/stream/sliding.sim
./test.sh -f tsim/stream/sliding.sim
#./test.sh -f tsim/stream/partitionbyColumnInterval.sim
#./test.sh -f tsim/stream/partitionbyColumnSession.sim
#./test.sh -f tsim/stream/partitionbyColumnState.sim
#./test.sh -f tsim/stream/deleteInterval.sim
#./test.sh -f tsim/stream/deleteSession.sim
#./test.sh -f tsim/stream/deleteState.sim
# ---- transaction ----
# ---- transaction ----
./test.sh -f tsim/trans/lossdata1.sim
./test.sh -f tsim/trans/lossdata1.sim
...
...
tests/script/tsim/column/table.sim
浏览文件 @
bb0b1fda
...
@@ -159,6 +159,7 @@ if $data01 != 10 then
...
@@ -159,6 +159,7 @@ if $data01 != 10 then
return -1
return -1
endi
endi
if $data02 != 4.500000000 then
if $data02 != 4.500000000 then
print expect 4.500000000, actual: $data02
return -1
return -1
endi
endi
if $data03 != 4.500000000 then
if $data03 != 4.500000000 then
...
...
tests/script/tsim/stream/deleteSession.sim
0 → 100644
浏览文件 @
bb0b1fda
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 200
sql connect
sql drop stream if exists streams0;
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop stream if exists streams4;
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from t1 session(ts, 5s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sleep 200
sql delete from t1 where ts = 1648791213000;
$loop_count = 0
loop0:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows
goto loop0
endi
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop1:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop1
endi
if $data02 != NULL then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213001,2,2,2,2.0);
sql insert into t1 values(1648791213002,3,3,3,3.0);
sql insert into t1 values(1648791213003,4,4,4,4.0);
sleep 200
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002;
$loop_count = 0
loop3:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
if $data02 != 4 then
print =====data02=$data02
goto loop3
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
$loop_count = 0
loop4:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop4
endi
sleep 200
sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003;
$loop_count = 0
loop5:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
if $data02 != 4 then
print =====data02=$data02
goto loop5
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213005,2,2,2,2.0);
sql insert into t1 values(1648791213006,3,3,3,3.0);
sql insert into t1 values(1648791213007,4,4,4,4.0);
sql insert into t1 values(1648791223000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,2.0);
sql insert into t1 values(1648791223002,3,3,3,3.0);
sql insert into t1 values(1648791223003,4,4,4,4.0);
sql insert into t1 values(1648791233000,1,1,1,1.0);
sql insert into t1 values(1648791233001,2,2,2,2.0);
sql insert into t1 values(1648791233008,3,3,3,3.0);
sql insert into t1 values(1648791233009,4,4,4,4.0);
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005;
$loop_count = 0
loop6:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop6
endi
if $data02 != 1 then
print =====data02=$data02
goto loop6
endi
if $data11 != 2 then
print =====data11=$data11
goto loop6
endi
if $data12 != 4 then
print =====data12=$data12
goto loop6
endi
sql drop stream if exists streams2;
sql drop database if exists test2;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st session(ts,5s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop7:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop7
endi
sleep 200
sql delete from t1 where ts = 1648791213000;
$loop_count = 0
loop8:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop8
endi
if $data02 != NULL then
print =====data02=$data02
goto loop8
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t2 values(1648791223000,1,2,3,1.0);
sql insert into t2 values(1648791223001,1,2,3,1.0);
sql insert into t2 values(1648791223002,3,2,3,1.0);
sql insert into t2 values(1648791223003,3,2,3,1.0);
sleep 200
sql delete from t2 where ts >= 1648791223000 and ts <= 1648791223001;
$loop_count = 0
loop11:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop11
endi
if $data02 != NULL then
print =====data02=$data02
goto loop11
endi
if $data11 != 6 then
print =====data11=$data11
goto loop11
endi
if $data12 != 3 then
print =====data12=$data12
goto loop11
endi
sleep 200
sql delete from st where ts >= 1648791223000 and ts <= 1648791223003;
$loop_count = 0
loop12:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop12
endi
if $data01 != 1 then
print =====data01=$data01
goto loop12
endi
if $data02 != NULL then
print =====data02=$data02
goto loop12
endi
sql insert into t1 values(1648791213004,3,2,3,1.0);
sql insert into t1 values(1648791213005,3,2,3,1.0);
sql insert into t1 values(1648791213006,3,2,3,1.0);
sql insert into t1 values(1648791223004,1,2,3,1.0);
sql insert into t2 values(1648791213004,3,2,3,1.0);
sql insert into t2 values(1648791213005,3,2,3,1.0);
sql insert into t2 values(1648791213006,3,2,3,1.0);
sql insert into t2 values(1648791223004,1,2,3,1.0);
sleep 200
sql delete from t2 where ts >= 1648791213004 and ts <= 1648791213006;
$loop_count = 0
loop13:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop13
endi
if $data01 != 4 then
print =====data01=$data01
goto loop13
endi
if $data02 != 3 then
print =====data02=$data02
goto loop13
endi
if $data11 != 2 then
print =====data11=$data11
goto loop13
endi
if $data12 != 1 then
print =====data12=$data12
goto loop13
endi
sql insert into t1 values(1648791223005,1,2,3,1.0);
sql insert into t1 values(1648791223006,1,2,3,1.0);
sql insert into t2 values(1648791223005,1,2,3,1.0);
sql insert into t2 values(1648791223006,1,2,3,1.0);
sql insert into t1 values(1648791233005,4,2,3,1.0);
sql insert into t1 values(1648791233006,2,2,3,1.0);
sql insert into t2 values(1648791233005,5,2,3,1.0);
sql insert into t2 values(1648791233006,3,2,3,1.0);
sleep 200
sql delete from st where ts >= 1648791213001 and ts <= 1648791233005;
$loop_count = 0
loop14:
sleep 200
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop14
endi
if $data01 != 1 then
print =====data01=$data01
goto loop14
endi
if $data02 != NULL then
print =====data02=$data02
goto loop14
endi
if $data11 != 2 then
print =====data11=$data11
goto loop14
endi
if $data12 != 3 then
print =====data12=$data12
goto loop14
endi
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop database if exists test3;
sql drop database if exists test;
sql create database test3 vgroups 4;
sql create database test vgroups 1;
sql use test3;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams3 trigger at_once into test.streamt3 as select _wstart c1, count(*) c2, max(a) c3 from st session(ts,5s);
sql insert into t1 values(1648791210000,1,1,1,NULL);
sql insert into t1 values(1648791210001,2,2,2,NULL);
sql insert into t2 values(1648791213001,3,3,3,NULL);
sql insert into t2 values(1648791213003,4,4,4,NULL);
sql insert into t1 values(1648791216000,5,5,5,NULL);
sql insert into t1 values(1648791216002,6,6,6,NULL);
sql insert into t1 values(1648791216004,7,7,7,NULL);
sql insert into t2 values(1648791218001,8,8,8,NULL);
sql insert into t2 values(1648791218003,9,9,9,NULL);
sql insert into t1 values(1648791222000,10,10,10,NULL);
sql insert into t1 values(1648791222003,11,11,11,NULL);
sql insert into t1 values(1648791222005,12,12,12,NULL);
sql insert into t1 values(1648791232005,13,13,13,NULL);
sql insert into t2 values(1648791242005,14,14,14,NULL);
$loop_count = 0
loop19:
sleep 200
sql select * from test.streamt3 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print =====rows=$rows
goto loop19
endi
sql delete from t2 where ts >= 1648791213001 and ts <= 1648791218003;
$loop_count = 0
loop20:
sleep 200
sql select * from test.streamt3 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 5 then
print =====rows=$rows
goto loop20
endi
if $data01 != 2 then
print =====data01=$data01
goto loop20
endi
if $data02 != 2 then
print =====data02=$data02
goto loop20
endi
if $data11 != 3 then
print =====data11=$data11
goto loop20
endi
if $data12 != 7 then
print =====data12=$data12
goto loop20
endi
if $data21 != 3 then
print =====data21=$data21
goto loop20
endi
if $data22 != 12 then
print =====data22=$data22
goto loop20
endi
if $data31 != 1 then
print =====data31=$data31
goto loop20
endi
if $data32 != 13 then
print =====data32=$data32
goto loop20
endi
if $data41 != 1 then
print =====data41=$data41
goto loop20
endi
if $data42 != 14 then
print =====data42=$data42
goto loop20
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
system sh/stop_dnodes.sh
#goto looptest
\ No newline at end of file
tests/script/tsim/stream/deleteState.sim
0 → 100644
浏览文件 @
bb0b1fda
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 200
sql connect
sql drop stream if exists streams0;
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop stream if exists streams4;
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 state_window(a);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sleep 200
sql delete from t1 where ts = 1648791213000;
$loop_count = 0
loop0:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows
goto loop0
endi
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop1:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop1
endi
if $data02 != NULL then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213001,1,2,2,2.0);
sql insert into t1 values(1648791213002,1,3,3,3.0);
sql insert into t1 values(1648791213003,1,4,4,4.0);
sleep 200
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002;
$loop_count = 0
loop3:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
if $data02 != 4 then
print =====data02=$data02
goto loop3
endi
sql insert into t1 values(1648791223000,2,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.0);
sql insert into t1 values(1648791223002,2,2,3,1.0);
sql insert into t1 values(1648791223003,2,2,3,1.0);
$loop_count = 0
loop4:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop4
endi
sleep 200
sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003;
$loop_count = 0
loop5:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
if $data02 != 4 then
print =====data02=$data02
goto loop5
endi
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213005,1,2,2,2.0);
sql insert into t1 values(1648791213006,1,3,3,3.0);
sql insert into t1 values(1648791213007,1,4,4,4.0);
sql insert into t1 values(1648791223000,2,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,2.0);
sql insert into t1 values(1648791223002,2,3,3,3.0);
sql insert into t1 values(1648791223003,2,4,4,4.0);
sql insert into t1 values(1648791233000,3,1,1,1.0);
sql insert into t1 values(1648791233001,3,2,2,2.0);
sql insert into t1 values(1648791233008,3,3,3,3.0);
sql insert into t1 values(1648791233009,3,4,4,4.0);
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005;
$loop_count = 0
loop6:
sleep 200
sql select * from streamt order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop6
endi
if $data01 != 1 then
print =====data01=$data01
goto loop6
endi
if $data02 != 1 then
print =====data02=$data02
goto loop6
endi
if $data11 != 2 then
print =====data11=$data11
goto loop6
endi
if $data12 != 4 then
print =====data12=$data12
goto loop6
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
system sh/stop_dnodes.sh
#goto looptest
\ No newline at end of file
tests/script/tsim/stream/partitionbyColumn
0
.sim
→
tests/script/tsim/stream/partitionbyColumn
Interval
.sim
浏览文件 @
bb0b1fda
文件已移动
tests/script/tsim/stream/partitionbyColumn
1
.sim
→
tests/script/tsim/stream/partitionbyColumn
Session
.sim
浏览文件 @
bb0b1fda
文件已移动
tests/script/tsim/stream/partitionbyColumn
2
.sim
→
tests/script/tsim/stream/partitionbyColumn
State
.sim
浏览文件 @
bb0b1fda
文件已移动
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录