Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
863d96ea
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
863d96ea
编写于
7月 17, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'fix/newCheckpoint' into enh/triggerCheckPoint2
上级
40eb5067
d45adeaa
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
277 addition
and
23 deletion
+277
-23
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+99
-23
tests/script/tsim/stream/checkpointState0.sim
tests/script/tsim/stream/checkpointState0.sim
+178
-0
未找到文件。
source/libs/executor/src/timewindowoperator.c
浏览文件 @
863d96ea
...
@@ -18,6 +18,7 @@
...
@@ -18,6 +18,7 @@
#include "functionMgt.h"
#include "functionMgt.h"
#include "operator.h"
#include "operator.h"
#include "querytask.h"
#include "querytask.h"
#include "tchecksum.h"
#include "tcommon.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdatablock.h"
...
@@ -2567,12 +2568,14 @@ void* decodeSPullWindowInfoArray(void *buf, SArray* pPullInfos) {
...
@@ -2567,12 +2568,14 @@ void* decodeSPullWindowInfoArray(void *buf, SArray* pPullInfos) {
return
buf
;
return
buf
;
}
}
int32_t
doStreamIntervalEncodeOpState
(
void
**
buf
,
SOperatorInfo
*
pOperator
)
{
int32_t
doStreamIntervalEncodeOpState
(
void
**
buf
,
int32_t
len
,
SOperatorInfo
*
pOperator
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
!
pInfo
)
{
if
(
!
pInfo
)
{
return
0
;
return
0
;
}
}
void
*
pData
=
(
buf
==
NULL
)
?
NULL
:
*
buf
;
// 1.pResultRowHashTable
// 1.pResultRowHashTable
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
int32_t
mapSize
=
tSimpleHashGetSize
(
pInfo
->
aggSup
.
pResultRowHashTable
);
int32_t
mapSize
=
tSimpleHashGetSize
(
pInfo
->
aggSup
.
pResultRowHashTable
);
...
@@ -2613,15 +2616,32 @@ int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) {
...
@@ -2613,15 +2616,32 @@ int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) {
// 5.dataVersion
// 5.dataVersion
tlen
+=
taosEncodeFixedI64
(
buf
,
pInfo
->
dataVersion
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pInfo
->
dataVersion
);
// 6.checksum
if
(
buf
)
{
uint32_t
cksum
=
taosCalcChecksum
(
0
,
pData
,
len
-
sizeof
(
uint32_t
));
tlen
+=
taosEncodeFixedU32
(
buf
,
cksum
);
}
else
{
tlen
+=
sizeof
(
uint32_t
);
}
return
tlen
;
return
tlen
;
}
}
void
doStreamIntervalDecodeOpState
(
void
*
buf
,
SOperatorInfo
*
pOperator
)
{
void
doStreamIntervalDecodeOpState
(
void
*
buf
,
int32_t
len
,
SOperatorInfo
*
pOperator
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
!
pInfo
)
{
if
(
!
pInfo
)
{
return
;
return
;
}
}
// 6.checksum
int32_t
dataLen
=
len
-
sizeof
(
uint32_t
);
void
*
pCksum
=
POINTER_SHIFT
(
buf
,
dataLen
);
if
(
taosCheckChecksum
(
buf
,
dataLen
,
*
(
uint32_t
*
)
pCksum
)
!=
TSDB_CODE_SUCCESS
)
{
ASSERT
(
0
);
// debug
qError
(
"stream interval state is invalid"
);
return
;
}
// 1.pResultRowHashTable
// 1.pResultRowHashTable
int32_t
mapSize
=
0
;
int32_t
mapSize
=
0
;
buf
=
taosDecodeFixedI32
(
buf
,
&
mapSize
);
buf
=
taosDecodeFixedI32
(
buf
,
&
mapSize
);
...
@@ -2662,10 +2682,10 @@ void doStreamIntervalDecodeOpState(void* buf, SOperatorInfo* pOperator) {
...
@@ -2662,10 +2682,10 @@ void doStreamIntervalDecodeOpState(void* buf, SOperatorInfo* pOperator) {
void
doStreamIntervalSaveCheckpoint
(
SOperatorInfo
*
pOperator
)
{
void
doStreamIntervalSaveCheckpoint
(
SOperatorInfo
*
pOperator
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
int32_t
len
=
doStreamIntervalEncodeOpState
(
NULL
,
pOperator
);
int32_t
len
=
doStreamIntervalEncodeOpState
(
NULL
,
0
,
pOperator
);
void
*
buf
=
taosMemoryCalloc
(
1
,
len
);
void
*
buf
=
taosMemoryCalloc
(
1
,
len
);
void
*
pBuf
=
buf
;
void
*
pBuf
=
buf
;
len
=
doStreamIntervalEncodeOpState
(
&
pBuf
,
pOperator
);
len
=
doStreamIntervalEncodeOpState
(
&
pBuf
,
len
,
pOperator
);
pInfo
->
stateStore
.
streamStateSaveInfo
(
pInfo
->
pState
,
STREAM_INTERVAL_OP_CHECKPOINT_NAME
,
pInfo
->
stateStore
.
streamStateSaveInfo
(
pInfo
->
pState
,
STREAM_INTERVAL_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_INTERVAL_OP_CHECKPOINT_NAME
),
buf
,
len
);
strlen
(
STREAM_INTERVAL_OP_CHECKPOINT_NAME
),
buf
,
len
);
taosMemoryFree
(
buf
);
taosMemoryFree
(
buf
);
...
@@ -2816,8 +2836,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2816,8 +2836,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
return
pBlock
;
return
pBlock
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
doStreamIntervalSaveCheckpoint
(
pOperator
);
pAPI
->
stateStore
.
streamStateCommit
(
pInfo
->
pState
);
pAPI
->
stateStore
.
streamStateCommit
(
pInfo
->
pState
);
doStreamIntervalSaveCheckpoint
(
pOperator
);
copyDataBlock
(
pInfo
->
pCheckpointRes
,
pBlock
);
copyDataBlock
(
pInfo
->
pCheckpointRes
,
pBlock
);
continue
;
continue
;
}
else
{
}
else
{
...
@@ -3075,7 +3095,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
...
@@ -3075,7 +3095,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t
len
=
0
;
int32_t
len
=
0
;
int32_t
res
=
pAPI
->
stateStore
.
streamStateGetInfo
(
pInfo
->
pState
,
STREAM_INTERVAL_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_INTERVAL_OP_CHECKPOINT_NAME
),
&
buff
,
&
len
);
int32_t
res
=
pAPI
->
stateStore
.
streamStateGetInfo
(
pInfo
->
pState
,
STREAM_INTERVAL_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_INTERVAL_OP_CHECKPOINT_NAME
),
&
buff
,
&
len
);
if
(
res
==
TSDB_CODE_SUCCESS
)
{
if
(
res
==
TSDB_CODE_SUCCESS
)
{
doStreamIntervalDecodeOpState
(
buff
,
pOperator
);
doStreamIntervalDecodeOpState
(
buff
,
len
,
pOperator
);
taosMemoryFree
(
buff
);
taosMemoryFree
(
buff
);
}
}
...
@@ -3794,12 +3814,14 @@ void* decodeSResultWindowInfo(void *buf, SResultWindowInfo* key, int32_t outLen)
...
@@ -3794,12 +3814,14 @@ void* decodeSResultWindowInfo(void *buf, SResultWindowInfo* key, int32_t outLen)
return
buf
;
return
buf
;
}
}
int32_t
doStreamSessionEncodeOpState
(
void
**
buf
,
SOperatorInfo
*
pOperator
)
{
int32_t
doStreamSessionEncodeOpState
(
void
**
buf
,
int32_t
len
,
SOperatorInfo
*
pOperator
,
bool
isParent
)
{
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
!
pInfo
)
{
if
(
!
pInfo
)
{
return
0
;
return
0
;
}
}
void
*
pData
=
(
buf
==
NULL
)
?
NULL
:
*
buf
;
// 1.streamAggSup.pResultRows
// 1.streamAggSup.pResultRows
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
int32_t
mapSize
=
tSimpleHashGetSize
(
pInfo
->
streamAggSup
.
pResultRows
);
int32_t
mapSize
=
tSimpleHashGetSize
(
pInfo
->
streamAggSup
.
pResultRows
);
...
@@ -3821,21 +3843,42 @@ int32_t doStreamSessionEncodeOpState(void **buf, SOperatorInfo* pOperator) {
...
@@ -3821,21 +3843,42 @@ int32_t doStreamSessionEncodeOpState(void **buf, SOperatorInfo* pOperator) {
tlen
+=
taosEncodeFixedI32
(
buf
,
size
);
tlen
+=
taosEncodeFixedI32
(
buf
,
size
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
SOperatorInfo
*
pChOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
tlen
+=
doStreamSessionEncodeOpState
(
buf
,
pChOp
);
tlen
+=
doStreamSessionEncodeOpState
(
buf
,
0
,
pChOp
,
false
);
}
}
// 4.dataVersion
// 4.dataVersion
tlen
+=
taosEncodeFixedI32
(
buf
,
pInfo
->
dataVersion
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pInfo
->
dataVersion
);
// 5.checksum
if
(
isParent
)
{
if
(
buf
)
{
uint32_t
cksum
=
taosCalcChecksum
(
0
,
pData
,
len
-
sizeof
(
uint32_t
));
tlen
+=
taosEncodeFixedU32
(
buf
,
cksum
);
}
else
{
tlen
+=
sizeof
(
uint32_t
);
}
}
return
tlen
;
return
tlen
;
}
}
void
*
doStreamSessionDecodeOpState
(
void
*
buf
,
SOperatorInfo
*
pOperator
)
{
void
*
doStreamSessionDecodeOpState
(
void
*
buf
,
int32_t
len
,
SOperatorInfo
*
pOperator
,
bool
isParent
)
{
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
!
pInfo
)
{
if
(
!
pInfo
)
{
return
buf
;
return
buf
;
}
}
// 5.checksum
if
(
isParent
)
{
int32_t
dataLen
=
len
-
sizeof
(
uint32_t
);
void
*
pCksum
=
POINTER_SHIFT
(
buf
,
dataLen
);
if
(
taosCheckChecksum
(
buf
,
dataLen
,
*
(
uint32_t
*
)
pCksum
)
!=
TSDB_CODE_SUCCESS
)
{
ASSERT
(
0
);
// debug
qError
(
"stream interval state is invalid"
);
return
buf
;
}
}
// 1.streamAggSup.pResultRows
// 1.streamAggSup.pResultRows
int32_t
mapSize
=
0
;
int32_t
mapSize
=
0
;
buf
=
taosDecodeFixedI32
(
buf
,
&
mapSize
);
buf
=
taosDecodeFixedI32
(
buf
,
&
mapSize
);
...
@@ -3856,7 +3899,7 @@ void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) {
...
@@ -3856,7 +3899,7 @@ void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) {
ASSERT
(
size
<=
taosArrayGetSize
(
pInfo
->
pChildren
));
ASSERT
(
size
<=
taosArrayGetSize
(
pInfo
->
pChildren
));
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
SOperatorInfo
*
pChOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
buf
=
doStreamSessionDecodeOpState
(
buf
,
pChOp
);
buf
=
doStreamSessionDecodeOpState
(
buf
,
0
,
pChOp
,
false
);
}
}
// 4.dataVersion
// 4.dataVersion
...
@@ -3866,10 +3909,10 @@ void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) {
...
@@ -3866,10 +3909,10 @@ void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) {
void
doStreamSessionSaveCheckpoint
(
SOperatorInfo
*
pOperator
)
{
void
doStreamSessionSaveCheckpoint
(
SOperatorInfo
*
pOperator
)
{
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
int32_t
len
=
doStreamSessionEncodeOpState
(
NULL
,
pOperator
);
int32_t
len
=
doStreamSessionEncodeOpState
(
NULL
,
0
,
pOperator
,
true
);
void
*
buf
=
taosMemoryCalloc
(
1
,
len
);
void
*
buf
=
taosMemoryCalloc
(
1
,
len
);
void
*
pBuf
=
buf
;
void
*
pBuf
=
buf
;
len
=
doStreamSessionEncodeOpState
(
&
pBuf
,
pOperator
);
len
=
doStreamSessionEncodeOpState
(
&
pBuf
,
len
,
pOperator
,
true
);
pInfo
->
streamAggSup
.
stateStore
.
streamStateSaveInfo
(
pInfo
->
streamAggSup
.
pState
,
STREAM_SESSION_OP_CHECKPOINT_NAME
,
pInfo
->
streamAggSup
.
stateStore
.
streamStateSaveInfo
(
pInfo
->
streamAggSup
.
pState
,
STREAM_SESSION_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_SESSION_OP_CHECKPOINT_NAME
),
buf
,
len
);
strlen
(
STREAM_SESSION_OP_CHECKPOINT_NAME
),
buf
,
len
);
}
}
...
@@ -3941,8 +3984,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
...
@@ -3941,8 +3984,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
return
pBlock
;
return
pBlock
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
doStreamSessionSaveCheckpoint
(
pOperator
);
pAggSup
->
stateStore
.
streamStateCommit
(
pAggSup
->
pState
);
pAggSup
->
stateStore
.
streamStateCommit
(
pAggSup
->
pState
);
doStreamSessionSaveCheckpoint
(
pOperator
);
copyDataBlock
(
pInfo
->
pCheckpointRes
,
pBlock
);
copyDataBlock
(
pInfo
->
pCheckpointRes
,
pBlock
);
continue
;
continue
;
}
else
{
}
else
{
...
@@ -4141,7 +4184,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
...
@@ -4141,7 +4184,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
int32_t
len
=
0
;
int32_t
len
=
0
;
int32_t
res
=
pInfo
->
streamAggSup
.
stateStore
.
streamStateGetInfo
(
pInfo
->
streamAggSup
.
pState
,
STREAM_SESSION_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_SESSION_OP_CHECKPOINT_NAME
),
&
buff
,
&
len
);
int32_t
res
=
pInfo
->
streamAggSup
.
stateStore
.
streamStateGetInfo
(
pInfo
->
streamAggSup
.
pState
,
STREAM_SESSION_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_SESSION_OP_CHECKPOINT_NAME
),
&
buff
,
&
len
);
if
(
res
==
TSDB_CODE_SUCCESS
)
{
if
(
res
==
TSDB_CODE_SUCCESS
)
{
doStreamSessionDecodeOpState
(
buff
,
pOperator
);
doStreamSessionDecodeOpState
(
buff
,
len
,
pOperator
,
true
);
taosMemoryFree
(
buff
);
taosMemoryFree
(
buff
);
}
}
...
@@ -4242,8 +4285,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
...
@@ -4242,8 +4285,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
return
pBlock
;
return
pBlock
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
doStreamSessionSaveCheckpoint
(
pOperator
);
pAggSup
->
stateStore
.
streamStateCommit
(
pAggSup
->
pState
);
pAggSup
->
stateStore
.
streamStateCommit
(
pAggSup
->
pState
);
doStreamSessionSaveCheckpoint
(
pOperator
);
continue
;
continue
;
}
else
{
}
else
{
ASSERTS
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_INVALID
,
"invalid SSDataBlock type"
);
ASSERTS
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_INVALID
,
"invalid SSDataBlock type"
);
...
@@ -4541,12 +4584,14 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
...
@@ -4541,12 +4584,14 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
}
}
}
int32_t
doStreamStateEncodeOpState
(
void
**
buf
,
SOperatorInfo
*
pOperator
)
{
int32_t
doStreamStateEncodeOpState
(
void
**
buf
,
int32_t
len
,
SOperatorInfo
*
pOperator
,
bool
isParent
)
{
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
!
pInfo
)
{
if
(
!
pInfo
)
{
return
0
;
return
0
;
}
}
void
*
pData
=
(
buf
==
NULL
)
?
NULL
:
*
buf
;
// 1.streamAggSup.pResultRows
// 1.streamAggSup.pResultRows
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
int32_t
mapSize
=
tSimpleHashGetSize
(
pInfo
->
streamAggSup
.
pResultRows
);
int32_t
mapSize
=
tSimpleHashGetSize
(
pInfo
->
streamAggSup
.
pResultRows
);
...
@@ -4568,21 +4613,42 @@ int32_t doStreamStateEncodeOpState(void **buf, SOperatorInfo* pOperator) {
...
@@ -4568,21 +4613,42 @@ int32_t doStreamStateEncodeOpState(void **buf, SOperatorInfo* pOperator) {
tlen
+=
taosEncodeFixedI32
(
buf
,
size
);
tlen
+=
taosEncodeFixedI32
(
buf
,
size
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
SOperatorInfo
*
pChOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
tlen
+=
doStreamS
essionEncodeOpState
(
buf
,
pChOp
);
tlen
+=
doStreamS
tateEncodeOpState
(
buf
,
0
,
pChOp
,
false
);
}
}
// 4.dataVersion
// 4.dataVersion
tlen
+=
taosEncodeFixedI32
(
buf
,
pInfo
->
dataVersion
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pInfo
->
dataVersion
);
// 5.checksum
if
(
isParent
)
{
if
(
buf
)
{
uint32_t
cksum
=
taosCalcChecksum
(
0
,
pData
,
len
-
sizeof
(
uint32_t
));
tlen
+=
taosEncodeFixedU32
(
buf
,
cksum
);
}
else
{
tlen
+=
sizeof
(
uint32_t
);
}
}
return
tlen
;
return
tlen
;
}
}
void
*
doStreamStateDecodeOpState
(
void
*
buf
,
SOperatorInfo
*
pOperator
)
{
void
*
doStreamStateDecodeOpState
(
void
*
buf
,
int32_t
len
,
SOperatorInfo
*
pOperator
,
bool
isParent
)
{
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
!
pInfo
)
{
if
(
!
pInfo
)
{
return
buf
;
return
buf
;
}
}
// 5.checksum
if
(
isParent
)
{
int32_t
dataLen
=
len
-
sizeof
(
uint32_t
);
void
*
pCksum
=
POINTER_SHIFT
(
buf
,
dataLen
);
if
(
taosCheckChecksum
(
buf
,
dataLen
,
*
(
uint32_t
*
)
pCksum
)
!=
TSDB_CODE_SUCCESS
)
{
ASSERT
(
0
);
// debug
qError
(
"stream interval state is invalid"
);
return
buf
;
}
}
// 1.streamAggSup.pResultRows
// 1.streamAggSup.pResultRows
int32_t
mapSize
=
0
;
int32_t
mapSize
=
0
;
buf
=
taosDecodeFixedI32
(
buf
,
&
mapSize
);
buf
=
taosDecodeFixedI32
(
buf
,
&
mapSize
);
...
@@ -4603,7 +4669,7 @@ void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) {
...
@@ -4603,7 +4669,7 @@ void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) {
ASSERT
(
size
<=
taosArrayGetSize
(
pInfo
->
pChildren
));
ASSERT
(
size
<=
taosArrayGetSize
(
pInfo
->
pChildren
));
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
SOperatorInfo
*
pChOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
buf
=
doStreamStateDecodeOpState
(
buf
,
pChOp
);
buf
=
doStreamStateDecodeOpState
(
buf
,
0
,
pChOp
,
false
);
}
}
// 4.dataVersion
// 4.dataVersion
...
@@ -4611,6 +4677,16 @@ void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) {
...
@@ -4611,6 +4677,16 @@ void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) {
return
buf
;
return
buf
;
}
}
void
doStreamStateSaveCheckpoint
(
SOperatorInfo
*
pOperator
)
{
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
int32_t
len
=
doStreamStateEncodeOpState
(
NULL
,
0
,
pOperator
,
true
);
void
*
buf
=
taosMemoryCalloc
(
1
,
len
);
void
*
pBuf
=
buf
;
len
=
doStreamStateEncodeOpState
(
&
pBuf
,
len
,
pOperator
,
true
);
pInfo
->
streamAggSup
.
stateStore
.
streamStateSaveInfo
(
pInfo
->
streamAggSup
.
pState
,
STREAM_STATE_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_STATE_OP_CHECKPOINT_NAME
),
buf
,
len
);
}
static
SSDataBlock
*
doStreamStateAgg
(
SOperatorInfo
*
pOperator
)
{
static
SSDataBlock
*
doStreamStateAgg
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
return
NULL
;
...
@@ -4665,8 +4741,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
...
@@ -4665,8 +4741,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
return
pBlock
;
return
pBlock
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
doStreamSessionSaveCheckpoint
(
pOperator
);
pInfo
->
streamAggSup
.
stateStore
.
streamStateCommit
(
pInfo
->
streamAggSup
.
pState
);
pInfo
->
streamAggSup
.
stateStore
.
streamStateCommit
(
pInfo
->
streamAggSup
.
pState
);
doStreamSessionSaveCheckpoint
(
pOperator
);
copyDataBlock
(
pInfo
->
pCheckpointRes
,
pBlock
);
copyDataBlock
(
pInfo
->
pCheckpointRes
,
pBlock
);
continue
;
continue
;
}
else
{
}
else
{
...
@@ -4861,7 +4937,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
...
@@ -4861,7 +4937,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t
len
=
0
;
int32_t
len
=
0
;
int32_t
res
=
pInfo
->
streamAggSup
.
stateStore
.
streamStateGetInfo
(
pInfo
->
streamAggSup
.
pState
,
STREAM_STATE_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_STATE_OP_CHECKPOINT_NAME
),
&
buff
,
&
len
);
int32_t
res
=
pInfo
->
streamAggSup
.
stateStore
.
streamStateGetInfo
(
pInfo
->
streamAggSup
.
pState
,
STREAM_STATE_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_STATE_OP_CHECKPOINT_NAME
),
&
buff
,
&
len
);
if
(
res
==
TSDB_CODE_SUCCESS
)
{
if
(
res
==
TSDB_CODE_SUCCESS
)
{
doStreamStateDecodeOpState
(
buff
,
pOperator
);
doStreamStateDecodeOpState
(
buff
,
len
,
pOperator
,
true
);
taosMemoryFree
(
buff
);
taosMemoryFree
(
buff
);
}
}
...
@@ -5530,8 +5606,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -5530,8 +5606,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock
(
pBlock
,
"single interval"
);
printDataBlock
(
pBlock
,
"single interval"
);
return
pBlock
;
return
pBlock
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CHECKPOINT
)
{
doStreamIntervalSaveCheckpoint
(
pOperator
);
pAPI
->
stateStore
.
streamStateCommit
(
pInfo
->
pState
);
pAPI
->
stateStore
.
streamStateCommit
(
pInfo
->
pState
);
doStreamIntervalSaveCheckpoint
(
pOperator
);
pInfo
->
reCkBlock
=
true
;
pInfo
->
reCkBlock
=
true
;
copyDataBlock
(
pInfo
->
pCheckpointRes
,
pBlock
);
copyDataBlock
(
pInfo
->
pCheckpointRes
,
pBlock
);
continue
;
continue
;
...
@@ -5714,7 +5790,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
...
@@ -5714,7 +5790,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t
len
=
0
;
int32_t
len
=
0
;
int32_t
res
=
pAPI
->
stateStore
.
streamStateGetInfo
(
pInfo
->
pState
,
STREAM_INTERVAL_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_INTERVAL_OP_CHECKPOINT_NAME
),
&
buff
,
&
len
);
int32_t
res
=
pAPI
->
stateStore
.
streamStateGetInfo
(
pInfo
->
pState
,
STREAM_INTERVAL_OP_CHECKPOINT_NAME
,
strlen
(
STREAM_INTERVAL_OP_CHECKPOINT_NAME
),
&
buff
,
&
len
);
if
(
res
==
TSDB_CODE_SUCCESS
)
{
if
(
res
==
TSDB_CODE_SUCCESS
)
{
doStreamIntervalDecodeOpState
(
buff
,
pOperator
);
doStreamIntervalDecodeOpState
(
buff
,
len
,
pOperator
);
taosMemoryFree
(
buff
);
taosMemoryFree
(
buff
);
}
}
...
...
tests/script/tsim/stream/checkpointState0.sim
0 → 100644
浏览文件 @
863d96ea
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step 1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 state_window(b);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1);
$loop_count = 0
loop0:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop0
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data02 != 3 then
print =====data02=$data02
goto loop0
endi
print waiting for checkpoint generation 1 ......
sleep 25000
print restart taosd 01 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791213002,3,2,3,1.1);
$loop_count = 0
loop1:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop1
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop1
endi
if $data02 != 6 then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791233003,4,3,3,1.1);
$loop_count = 0
loop2:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop2
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop2
endi
if $data02 != 6 then
print =====data02=$data02
goto loop2
endi
# row 1
if $data11 != 1 then
print =====data11=$data11
goto loop2
endi
if $data12 != 4 then
print =====data12=$data12
goto loop2
endi
print step 2
print restart taosd 02 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791233004,5,3,3,1.1);
loop20:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop20
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop20
endi
if $data02 != 6 then
print =====data02=$data02
goto loop20
endi
# row 1
if $data11 != 2 then
print =====data11=$data11
goto loop20
endi
if $data12 != 9 then
print =====data12=$data12
goto loop20
endi
print end---------------------------------
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录