Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4c287f60
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4c287f60
编写于
8月 24, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
8月 24, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16352 from taosdata/feature/stream
feat(stream): support tdb state backend
上级
f7014e09
c064bc38
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
292 addition
and
41 deletion
+292
-41
docs/zh/12-taos-sql/14-stream.md
docs/zh/12-taos-sql/14-stream.md
+2
-2
examples/c/stream_demo.c
examples/c/stream_demo.c
+2
-6
include/libs/executor/executor.h
include/libs/executor/executor.h
+5
-4
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+43
-1
source/dnode/mnode/impl/test/sma/CMakeLists.txt
source/dnode/mnode/impl/test/sma/CMakeLists.txt
+6
-4
source/dnode/mnode/impl/test/stb/CMakeLists.txt
source/dnode/mnode/impl/test/stb/CMakeLists.txt
+6
-4
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+9
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+9
-5
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+2
-2
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+0
-1
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+12
-9
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+4
-1
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+187
-0
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+3
-0
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+1
-2
未找到文件。
docs/zh/12-taos-sql/14-stream.md
浏览文件 @
4c287f60
...
@@ -18,7 +18,7 @@ stream_options: {
...
@@ -18,7 +18,7 @@ stream_options: {
其中 subquery 是 select 普通查询语法的子集:
其中 subquery 是 select 普通查询语法的子集:
```
sql
```
sql
subquery
:
SELECT
[
DISTINCT
]
select_list
subquery
:
SELECT
select_list
from_clause
from_clause
[
WHERE
condition
]
[
WHERE
condition
]
[
PARTITION
BY
tag_list
]
[
PARTITION
BY
tag_list
]
...
@@ -37,7 +37,7 @@ window_clause: {
...
@@ -37,7 +37,7 @@ window_clause: {
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
窗口的定义与时序数据特色查询中的定义完全相同
。
窗口的定义与时序数据特色查询中的定义完全相同
,详见
[
TDengine 特色查询
](
../distinguished
)
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
...
...
examples/c/stream_demo.c
浏览文件 @
4c287f60
...
@@ -13,6 +13,7 @@
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
// clang-format off
#include <assert.h>
#include <assert.h>
#include <stdio.h>
#include <stdio.h>
#include <string.h>
#include <string.h>
...
@@ -94,13 +95,8 @@ int32_t create_stream() {
...
@@ -94,13 +95,8 @@ int32_t create_stream() {
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes
=
taos_query
(
pConn
,
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)"
);
"count(k) from st1 partition by tbname interval(20s) "
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
return
-
1
;
...
...
include/libs/executor/executor.h
浏览文件 @
4c287f60
...
@@ -29,7 +29,7 @@ typedef void* DataSinkHandle;
...
@@ -29,7 +29,7 @@ typedef void* DataSinkHandle;
struct
SRpcMsg
;
struct
SRpcMsg
;
struct
SSubplan
;
struct
SSubplan
;
typedef
struct
SReadHandle
{
typedef
struct
{
void
*
tqReader
;
void
*
tqReader
;
void
*
meta
;
void
*
meta
;
void
*
config
;
void
*
config
;
...
@@ -41,6 +41,7 @@ typedef struct SReadHandle {
...
@@ -41,6 +41,7 @@ typedef struct SReadHandle {
bool
initTableReader
;
bool
initTableReader
;
bool
initTqReader
;
bool
initTqReader
;
int32_t
numOfVgroups
;
int32_t
numOfVgroups
;
void
*
pStateBackend
;
}
SReadHandle
;
}
SReadHandle
;
// in queue mode, data streams are seperated by msg
// in queue mode, data streams are seperated by msg
...
@@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
...
@@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
/**
/**
* @brief Cleanup SSDataBlock for StreamScanInfo
* @brief Cleanup SSDataBlock for StreamScanInfo
*
*
* @param tinfo
* @param tinfo
*/
*/
void
tdCleanupStreamInputDataBlock
(
qTaskInfo_t
tinfo
);
void
tdCleanupStreamInputDataBlock
(
qTaskInfo_t
tinfo
);
...
@@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
...
@@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
void
qProcessRspMsg
(
void
*
parent
,
struct
SRpcMsg
*
pMsg
,
struct
SEpSet
*
pEpSet
);
void
qProcessRspMsg
(
void
*
parent
,
struct
SRpcMsg
*
pMsg
,
struct
SEpSet
*
pEpSet
);
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
SArray
*
pExecInfoList
/*,int32_t* resNum, SExplainExecInfo** pRes*/
);
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
SArray
*
pExecInfoList
/*,int32_t* resNum, SExplainExecInfo** pRes*/
);
int32_t
qSerializeTaskStatus
(
qTaskInfo_t
tinfo
,
char
**
pOutput
,
int32_t
*
len
);
int32_t
qSerializeTaskStatus
(
qTaskInfo_t
tinfo
,
char
**
pOutput
,
int32_t
*
len
);
...
...
include/libs/stream/tstream.h
浏览文件 @
4c287f60
...
@@ -263,6 +263,14 @@ typedef struct {
...
@@ -263,6 +263,14 @@ typedef struct {
SArray
*
checkpointVer
;
SArray
*
checkpointVer
;
}
SStreamRecoveringState
;
}
SStreamRecoveringState
;
// incremental state storage
typedef
struct
{
SStreamTask
*
pOwner
;
TDB
*
db
;
TTB
*
pStateDb
;
TXN
txn
;
}
SStreamState
;
typedef
struct
SStreamTask
{
typedef
struct
SStreamTask
{
int64_t
streamId
;
int64_t
streamId
;
int32_t
taskId
;
int32_t
taskId
;
...
@@ -312,6 +320,10 @@ typedef struct SStreamTask {
...
@@ -312,6 +320,10 @@ typedef struct SStreamTask {
// msg handle
// msg handle
SMsgCb
*
pMsgCb
;
SMsgCb
*
pMsgCb
;
// state backend
SStreamState
*
pState
;
}
SStreamTask
;
}
SStreamTask
;
int32_t
tEncodeStreamEpInfo
(
SEncoder
*
pEncoder
,
const
SStreamChildEpInfo
*
pInfo
);
int32_t
tEncodeStreamEpInfo
(
SEncoder
*
pEncoder
,
const
SStreamChildEpInfo
*
pInfo
);
...
@@ -507,7 +519,7 @@ typedef struct SStreamMeta {
...
@@ -507,7 +519,7 @@ typedef struct SStreamMeta {
char
*
path
;
char
*
path
;
TDB
*
db
;
TDB
*
db
;
TTB
*
pTaskDb
;
TTB
*
pTaskDb
;
TTB
*
p
State
Db
;
TTB
*
p
Checkpoint
Db
;
SHashObj
*
pTasks
;
SHashObj
*
pTasks
;
SHashObj
*
pRecoverStatus
;
SHashObj
*
pRecoverStatus
;
void
*
ahandle
;
void
*
ahandle
;
...
@@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
...
@@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t
streamMetaRollBack
(
SStreamMeta
*
pMeta
);
int32_t
streamMetaRollBack
(
SStreamMeta
*
pMeta
);
int32_t
streamLoadTasks
(
SStreamMeta
*
pMeta
);
int32_t
streamLoadTasks
(
SStreamMeta
*
pMeta
);
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
);
void
streamStateClose
(
SStreamState
*
pState
);
int32_t
streamStateBegin
(
SStreamState
*
pState
);
int32_t
streamStateCommit
(
SStreamState
*
pState
);
int32_t
streamStateAbort
(
SStreamState
*
pState
);
typedef
struct
{
TBC
*
pCur
;
}
SStreamStateCur
;
#if 1
int32_t
streamStatePut
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
,
const
void
*
value
,
int32_t
vLen
);
int32_t
streamStateGet
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateDel
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
);
SStreamStateCur
*
streamStateGetCur
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
);
SStreamStateCur
*
streamStateSeekKeyNext
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
);
SStreamStateCur
*
streamStateSeekKeyPrev
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
);
void
streamStateFreeCur
(
SStreamStateCur
*
pCur
);
int32_t
streamGetKVByCur
(
SStreamStateCur
*
pCur
,
void
**
pKey
,
int32_t
*
pKLen
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateSeekFirst
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
);
int32_t
streamStateSeekLast
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
);
int32_t
streamStateCurNext
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
);
int32_t
streamStateCurPrev
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
);
#endif
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/mnode/impl/test/sma/CMakeLists.txt
浏览文件 @
4c287f60
...
@@ -5,7 +5,9 @@ target_link_libraries(
...
@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
PUBLIC sut
)
)
add_test
(
if
(
NOT
${
TD_WINDOWS
}
)
NAME smaTest
add_test
(
COMMAND smaTest
NAME smaTest
)
COMMAND smaTest
)
endif
(
NOT
${
TD_WINDOWS
}
)
source/dnode/mnode/impl/test/stb/CMakeLists.txt
浏览文件 @
4c287f60
...
@@ -5,7 +5,9 @@ target_link_libraries(
...
@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
PUBLIC sut
)
)
add_test
(
if
(
NOT
${
TD_WINDOWS
}
)
NAME stbTest
add_test
(
COMMAND stbTest
NAME stbTest
)
COMMAND stbTest
\ No newline at end of file
)
endif
(
NOT
${
TD_WINDOWS
}
)
\ No newline at end of file
source/dnode/vnode/src/tq/tq.c
浏览文件 @
4c287f60
...
@@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
...
@@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT
(
0
);
ASSERT
(
0
);
}
}
if
(
streamLoadTasks
(
pTq
->
pStreamMeta
)
<
0
)
{
ASSERT
(
0
);
}
return
pTq
;
return
pTq
;
}
}
...
@@ -664,6 +668,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
...
@@ -664,6 +668,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
ASSERT
(
pTask
->
exec
.
executor
);
ASSERT
(
pTask
->
exec
.
executor
);
}
}
pTask
->
pState
=
streamStateOpen
(
pTq
->
pStreamMeta
->
path
,
pTask
);
if
(
pTask
->
pState
==
NULL
)
{
return
-
1
;
}
// sink
// sink
/*pTask->ahandle = pTq->pVnode;*/
/*pTask->ahandle = pTq->pVnode;*/
if
(
pTask
->
outputType
==
TASK_OUTPUT__SMA
)
{
if
(
pTask
->
outputType
==
TASK_OUTPUT__SMA
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
4c287f60
...
@@ -150,6 +150,7 @@ typedef struct {
...
@@ -150,6 +150,7 @@ typedef struct {
SQueryTableDataCond
tableCond
;
SQueryTableDataCond
tableCond
;
int64_t
recoverStartVer
;
int64_t
recoverStartVer
;
int64_t
recoverEndVer
;
int64_t
recoverEndVer
;
SStreamState
*
pState
;
}
SStreamTaskInfo
;
}
SStreamTaskInfo
;
typedef
struct
{
typedef
struct
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
4c287f60
...
@@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
...
@@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
static
void
functionCtxRestore
(
SqlFunctionCtx
*
pCtx
,
SFunctionCtxStatus
*
pStatus
)
{
static
void
functionCtxRestore
(
SqlFunctionCtx
*
pCtx
,
SFunctionCtxStatus
*
pStatus
)
{
pCtx
->
input
.
colDataAggIsSet
=
pStatus
->
hasAgg
;
pCtx
->
input
.
colDataAggIsSet
=
pStatus
->
hasAgg
;
pCtx
->
input
.
numOfRows
=
pStatus
->
numOfRows
;
pCtx
->
input
.
numOfRows
=
pStatus
->
numOfRows
;
pCtx
->
input
.
startRowIndex
=
pStatus
->
startOffset
;
pCtx
->
input
.
startRowIndex
=
pStatus
->
startOffset
;
}
}
...
@@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
...
@@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
const
char
*
id
,
SInterval
*
pInterval
,
int32_t
fillType
,
int32_t
order
)
{
const
char
*
id
,
SInterval
*
pInterval
,
int32_t
fillType
,
int32_t
order
)
{
SFillColInfo
*
pColInfo
=
createFillColInfo
(
pExpr
,
numOfCols
,
pNotFillExpr
,
numOfNotFillCols
,
pValNode
);
SFillColInfo
*
pColInfo
=
createFillColInfo
(
pExpr
,
numOfCols
,
pNotFillExpr
,
numOfNotFillCols
,
pValNode
);
int64_t
startKey
=
(
order
==
TSDB_ORDER_ASC
)
?
win
.
skey
:
win
.
ekey
;
int64_t
startKey
=
(
order
==
TSDB_ORDER_ASC
)
?
win
.
skey
:
win
.
ekey
;
STimeWindow
w
=
getAlignQueryTimeWindow
(
pInterval
,
pInterval
->
precision
,
startKey
);
STimeWindow
w
=
getAlignQueryTimeWindow
(
pInterval
,
pInterval
->
precision
,
startKey
);
w
=
getFirstQualifiedTimeWindow
(
startKey
,
&
w
,
pInterval
,
order
);
w
=
getFirstQualifiedTimeWindow
(
startKey
,
&
w
,
pInterval
,
order
);
...
@@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
...
@@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
bool
assignUid
=
groupbyTbname
(
group
);
bool
assignUid
=
groupbyTbname
(
group
);
size_t
numOfTables
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
size_t
numOfTables
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
if
(
assignUid
)
{
if
(
assignUid
)
{
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STableKeyInfo
*
info
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
STableKeyInfo
*
info
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
info
->
groupId
=
info
->
uid
;
info
->
groupId
=
info
->
uid
;
taosHashPut
(
pTableListInfo
->
map
,
&
(
info
->
uid
),
sizeof
(
uint64_t
),
&
info
->
groupId
,
sizeof
(
uint64_t
));
taosHashPut
(
pTableListInfo
->
map
,
&
(
info
->
uid
),
sizeof
(
uint64_t
),
&
info
->
groupId
,
sizeof
(
uint64_t
));
}
}
}
else
{
}
else
{
int32_t
code
=
getColInfoResultForGroupby
(
pHandle
->
meta
,
group
,
pTableListInfo
);
int32_t
code
=
getColInfoResultForGroupby
(
pHandle
->
meta
,
group
,
pTableListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
...
@@ -4615,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
...
@@ -4615,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto
_complete
;
goto
_complete
;
}
}
if
(
pHandle
&&
pHandle
->
pStateBackend
)
{
(
*
pTaskInfo
)
->
streamInfo
.
pState
=
pHandle
->
pStateBackend
;
}
(
*
pTaskInfo
)
->
sql
=
sql
;
(
*
pTaskInfo
)
->
sql
=
sql
;
sql
=
NULL
;
sql
=
NULL
;
(
*
pTaskInfo
)
->
pSubplan
=
pPlan
;
(
*
pTaskInfo
)
->
pSubplan
=
pPlan
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
4c287f60
...
@@ -3128,8 +3128,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -3128,8 +3128,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
window
.
ekey
);
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
window
.
ekey
);
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
watermark
);
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
watermark
);
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_PULL_DATA
||
ASSERT
(
pBlock
->
info
.
type
!=
STREAM_INVERT
);
pBlock
->
info
.
type
==
STREAM_INVALID
)
{
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_PULL_DATA
)
{
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
SWinRes
));
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
SWinRes
));
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
4c287f60
...
@@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
...
@@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return
0
;
return
0
;
}
}
// TODO: handle version
int32_t
streamExecForAll
(
SStreamTask
*
pTask
)
{
int32_t
streamExecForAll
(
SStreamTask
*
pTask
)
{
while
(
1
)
{
while
(
1
)
{
int32_t
batchCnt
=
1
;
int32_t
batchCnt
=
1
;
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
4c287f60
...
@@ -14,7 +14,7 @@
...
@@ -14,7 +14,7 @@
*/
*/
#include "executor.h"
#include "executor.h"
#include "
tstream
.h"
#include "
streamInc
.h"
#include "ttimer.h"
#include "ttimer.h"
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
)
{
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
)
{
...
@@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
...
@@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
pMeta
->
path
=
strdup
(
path
);
int32_t
len
=
strlen
(
path
)
+
20
;
char
*
streamPath
=
taosMemoryCalloc
(
1
,
len
);
sprintf
(
streamPath
,
"%s/%s"
,
path
,
"stream"
);
pMeta
->
path
=
strdup
(
streamPath
);
if
(
tdbOpen
(
pMeta
->
path
,
16
*
1024
,
1
,
&
pMeta
->
db
)
<
0
)
{
if
(
tdbOpen
(
pMeta
->
path
,
16
*
1024
,
1
,
&
pMeta
->
db
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
sprintf
(
streamPath
,
"%s/%s"
,
pMeta
->
path
,
"checkpoints"
);
mkdir
(
streamPath
,
0755
);
taosMemoryFree
(
streamPath
);
if
(
tdbTbOpen
(
"task.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pTaskDb
)
<
0
)
{
if
(
tdbTbOpen
(
"task.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pTaskDb
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
// open state storage backend
if
(
tdbTbOpen
(
"checkpoint.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pCheckpointDb
)
<
0
)
{
if
(
tdbTbOpen
(
"state.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pStateDb
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
...
@@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
...
@@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta
->
ahandle
=
ahandle
;
pMeta
->
ahandle
=
ahandle
;
pMeta
->
expandFunc
=
expandFunc
;
pMeta
->
expandFunc
=
expandFunc
;
if
(
streamLoadTasks
(
pMeta
)
<
0
)
{
goto
_err
;
}
return
pMeta
;
return
pMeta
;
_err:
_err:
if
(
pMeta
->
path
)
taosMemoryFree
(
pMeta
->
path
);
if
(
pMeta
->
path
)
taosMemoryFree
(
pMeta
->
path
);
if
(
pMeta
->
pTasks
)
taosHashCleanup
(
pMeta
->
pTasks
);
if
(
pMeta
->
pTasks
)
taosHashCleanup
(
pMeta
->
pTasks
);
if
(
pMeta
->
pStateDb
)
tdbTbClose
(
pMeta
->
pStateDb
);
if
(
pMeta
->
pTaskDb
)
tdbTbClose
(
pMeta
->
pTaskDb
);
if
(
pMeta
->
pTaskDb
)
tdbTbClose
(
pMeta
->
pTaskDb
);
if
(
pMeta
->
pCheckpointDb
)
tdbTbClose
(
pMeta
->
pCheckpointDb
);
if
(
pMeta
->
db
)
tdbClose
(
pMeta
->
db
);
if
(
pMeta
->
db
)
tdbClose
(
pMeta
->
db
);
taosMemoryFree
(
pMeta
);
taosMemoryFree
(
pMeta
);
return
NULL
;
return
NULL
;
...
@@ -67,7 +70,7 @@ _err:
...
@@ -67,7 +70,7 @@ _err:
void
streamMetaClose
(
SStreamMeta
*
pMeta
)
{
void
streamMetaClose
(
SStreamMeta
*
pMeta
)
{
tdbCommit
(
pMeta
->
db
,
&
pMeta
->
txn
);
tdbCommit
(
pMeta
->
db
,
&
pMeta
->
txn
);
tdbTbClose
(
pMeta
->
pTaskDb
);
tdbTbClose
(
pMeta
->
pTaskDb
);
tdbTbClose
(
pMeta
->
p
State
Db
);
tdbTbClose
(
pMeta
->
p
Checkpoint
Db
);
tdbClose
(
pMeta
->
db
);
tdbClose
(
pMeta
->
db
);
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
4c287f60
...
@@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
...
@@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
}
}
int32_t
streamSaveStateInfo
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
int32_t
streamSaveStateInfo
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
#if 0
void* buf = NULL;
void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
...
@@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
...
@@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
FAIL:
FAIL:
if (buf) taosMemoryFree(buf);
if (buf) taosMemoryFree(buf);
return -1;
return -1;
#endif
return
0
;
return
0
;
}
}
int32_t
streamLoadStateInfo
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
int32_t
streamLoadStateInfo
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
#if 0
void* pVal = NULL;
void* pVal = NULL;
int32_t vLen = 0;
int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
...
@@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
...
@@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
pTask->checkpointInfo = aggCheckpoint.checkpointVer;
pTask->checkpointInfo = aggCheckpoint.checkpointVer;
#endif
return
0
;
return
0
;
}
}
...
...
source/libs/stream/src/streamState.c
0 → 100644
浏览文件 @
4c287f60
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamInc.h"
#include "ttimer.h"
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
)
{
SStreamState
*
pState
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamState
));
if
(
pState
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
char
statePath
[
200
];
sprintf
(
statePath
,
"%s/%d"
,
path
,
pTask
->
taskId
);
if
(
tdbOpen
(
statePath
,
16
*
1024
,
1
,
&
pState
->
db
)
<
0
)
{
goto
_err
;
}
// open state storage backend
if
(
tdbTbOpen
(
"state.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pState
->
db
,
&
pState
->
pStateDb
)
<
0
)
{
goto
_err
;
}
pState
->
pOwner
=
pTask
;
return
pState
;
_err:
if
(
pState
->
pStateDb
)
tdbTbClose
(
pState
->
pStateDb
);
if
(
pState
->
db
)
tdbClose
(
pState
->
db
);
taosMemoryFree
(
pState
);
return
NULL
;
}
void
streamStateClose
(
SStreamState
*
pState
)
{
tdbCommit
(
pState
->
db
,
&
pState
->
txn
);
tdbTbClose
(
pState
->
pStateDb
);
tdbClose
(
pState
->
db
);
taosMemoryFree
(
pState
);
}
int32_t
streamStateBegin
(
SStreamState
*
pState
)
{
if
(
tdbTxnOpen
(
&
pState
->
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
return
-
1
;
}
if
(
tdbBegin
(
pState
->
db
,
&
pState
->
txn
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int32_t
streamStateCommit
(
SStreamState
*
pState
)
{
if
(
tdbCommit
(
pState
->
db
,
&
pState
->
txn
)
<
0
)
{
return
-
1
;
}
memset
(
&
pState
->
txn
,
0
,
sizeof
(
TXN
));
if
(
tdbTxnOpen
(
&
pState
->
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
return
-
1
;
}
if
(
tdbBegin
(
pState
->
db
,
&
pState
->
txn
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int32_t
streamStateAbort
(
SStreamState
*
pState
)
{
if
(
tdbAbort
(
pState
->
db
,
&
pState
->
txn
)
<
0
)
{
return
-
1
;
}
memset
(
&
pState
->
txn
,
0
,
sizeof
(
TXN
));
if
(
tdbTxnOpen
(
&
pState
->
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
return
-
1
;
}
if
(
tdbBegin
(
pState
->
db
,
&
pState
->
txn
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int32_t
streamStatePut
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
,
const
void
*
value
,
int32_t
vLen
)
{
return
tdbTbUpsert
(
pState
->
pStateDb
,
key
,
kLen
,
value
,
vLen
,
&
pState
->
txn
);
}
int32_t
streamStateGet
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
,
void
**
pVal
,
int32_t
*
pVLen
)
{
return
tdbTbGet
(
pState
->
pStateDb
,
key
,
kLen
,
pVal
,
pVLen
);
}
int32_t
streamStateDel
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
)
{
return
tdbTbDelete
(
pState
->
pStateDb
,
key
,
kLen
,
&
pState
->
txn
);
}
SStreamStateCur
*
streamStateGetCur
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
tdbTbcOpen
(
pState
->
pStateDb
,
&
pCur
->
pCur
,
NULL
);
int32_t
c
;
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
kLen
,
&
c
);
if
(
c
!=
0
)
{
taosMemoryFree
(
pCur
);
return
NULL
;
}
return
0
;
}
int32_t
streamGetKVByCur
(
SStreamStateCur
*
pCur
,
void
**
pKey
,
int32_t
*
pKLen
,
void
**
pVal
,
int32_t
*
pVLen
)
{
return
tdbTbcGet
(
pCur
->
pCur
,
(
const
void
**
)
pKey
,
pKLen
,
(
const
void
**
)
pVal
,
pVLen
);
}
int32_t
streamStateSeekFirst
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
)
{
//
return
tdbTbcMoveToFirst
(
pCur
->
pCur
);
}
int32_t
streamStateSeekLast
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
)
{
//
return
tdbTbcMoveToLast
(
pCur
->
pCur
);
}
SStreamStateCur
*
streamStateSeekKeyNext
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
return
NULL
;
}
int32_t
c
;
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
kLen
,
&
c
)
<
0
)
{
taosMemoryFree
(
pCur
);
return
NULL
;
}
if
(
c
>
0
)
return
pCur
;
if
(
tdbTbcMoveToNext
(
pCur
->
pCur
)
<
0
)
{
taosMemoryFree
(
pCur
);
return
NULL
;
}
return
pCur
;
}
SStreamStateCur
*
streamStateSeekKeyPrev
(
SStreamState
*
pState
,
const
void
*
key
,
int32_t
kLen
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
return
NULL
;
}
int32_t
c
;
if
(
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
kLen
,
&
c
)
<
0
)
{
taosMemoryFree
(
pCur
);
return
NULL
;
}
if
(
c
<
0
)
return
pCur
;
if
(
tdbTbcMoveToPrev
(
pCur
->
pCur
)
<
0
)
{
taosMemoryFree
(
pCur
);
return
NULL
;
}
return
pCur
;
}
int32_t
streamStateCurNext
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
)
{
//
return
tdbTbcMoveToNext
(
pCur
->
pCur
);
}
int32_t
streamStateCurPrev
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
)
{
//
return
tdbTbcMoveToPrev
(
pCur
->
pCur
);
}
source/libs/stream/src/streamTask.c
浏览文件 @
4c287f60
...
@@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
...
@@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
if
(
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
if
(
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
taosArrayDestroy
(
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
);
taosArrayDestroy
(
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
);
}
}
if
(
pTask
->
pState
)
streamStateClose
(
pTask
->
pState
);
taosMemoryFree
(
pTask
);
taosMemoryFree
(
pTask
);
}
}
source/libs/wal/src/walMeta.c
浏览文件 @
4c287f60
...
@@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
...
@@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
if
(
found
==
NULL
)
{
if
(
found
==
NULL
)
{
// file corrupted, no complete log
// file corrupted, no complete log
// TODO delete and search in previous files
// TODO delete and search in previous files
ASSERT
(
0
);
/*ASSERT(0);*/
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
return
-
1
;
}
}
...
@@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
...
@@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
int
code
=
walSaveMeta
(
pWal
);
int
code
=
walSaveMeta
(
pWal
);
if
(
code
<
0
)
{
if
(
code
<
0
)
{
taosArrayDestroy
(
actualLog
);
return
-
1
;
return
-
1
;
}
}
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录