Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e71375f1
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e71375f1
编写于
7月 28, 2022
作者:
L
Liu Jicong
提交者:
GitHub
7月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15516 from taosdata/feature/stream
enh(stream): recover
上级
c402a18a
a9d11b58
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
60 addition
and
40 deletion
+60
-40
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+4
-15
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-1
source/libs/stream/inc/streamInc.h
source/libs/stream/inc/streamInc.h
+2
-2
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+39
-10
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+1
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+2
-2
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+9
-7
source/util/src/tarray.c
source/util/src/tarray.c
+1
-1
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
e71375f1
...
...
@@ -55,7 +55,6 @@ enum {
TASK_INPUT_STATUS__NORMAL
=
1
,
TASK_INPUT_STATUS__BLOCKED
,
TASK_INPUT_STATUS__RECOVER
,
TASK_INPUT_STATUS__PROCESSING
,
TASK_INPUT_STATUS__STOP
,
TASK_INPUT_STATUS__FAILED
,
};
...
...
@@ -320,17 +319,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void
tFreeSStreamTask
(
SStreamTask
*
pTask
);
static
FORCE_INLINE
int32_t
streamTaskInput
(
SStreamTask
*
pTask
,
SStreamQueueItem
*
pItem
)
{
#if 0
while (1) {
int8_t inputStatus =
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
break;
}
ASSERT(0);
}
#endif
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmitClone
=
streamSubmitRefClone
((
SStreamDataSubmit
*
)
pItem
);
if
(
pSubmitClone
==
NULL
)
{
...
...
@@ -443,13 +431,14 @@ typedef struct {
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int32_t
source
TaskId
;
int32_t
sourceVg
;
int32_t
upstream
TaskId
;
int32_t
upstreamNodeId
;
}
SStreamTaskRecoverReq
;
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int32_t
rspTaskId
;
int32_t
reqTaskId
;
int8_t
inputStatus
;
}
SStreamTaskRecoverRsp
;
...
...
source/dnode/snode/src/snode.c
浏览文件 @
e71375f1
...
...
@@ -179,7 +179,7 @@ static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta
*
pMeta
=
pNode
->
pMeta
;
SStreamTaskRecoverRsp
*
pRsp
=
pMsg
->
pCont
;
int32_t
taskId
=
pRsp
->
t
askId
;
int32_t
taskId
=
pRsp
->
rspT
askId
;
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pMeta
->
pHash
,
&
taskId
,
sizeof
(
int32_t
));
streamProcessRecoverRsp
(
pTask
,
pRsp
);
return
0
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
e71375f1
...
...
@@ -796,7 +796,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t
tqProcessTaskRecoverRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SStreamTaskRecoverRsp
*
pRsp
=
pMsg
->
pCont
;
int32_t
taskId
=
pRsp
->
t
askId
;
int32_t
taskId
=
pRsp
->
rspT
askId
;
SStreamTask
**
ppTask
=
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
if
(
ppTask
)
{
streamProcessRecoverRsp
(
*
ppTask
,
pRsp
);
...
...
source/libs/stream/inc/streamInc.h
浏览文件 @
e71375f1
...
...
@@ -32,10 +32,10 @@ typedef struct {
static
SStreamGlobalEnv
streamEnv
;
int32_t
streamExec
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
streamExec
(
SStreamTask
*
pTask
);
int32_t
streamPipelineExec
(
SStreamTask
*
pTask
,
int32_t
batchNum
);
int32_t
streamDispatch
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
streamDispatch
(
SStreamTask
*
pTask
);
int32_t
streamDispatchReqToData
(
const
SStreamDispatchReq
*
pReq
,
SStreamDataBlock
*
pData
);
int32_t
streamRetrieveReqToData
(
const
SStreamRetrieveReq
*
pReq
,
SStreamDataBlock
*
pData
);
int32_t
streamDispatchAllBlocks
(
SStreamTask
*
pTask
,
const
SStreamDataBlock
*
data
);
...
...
source/libs/stream/src/stream.c
浏览文件 @
e71375f1
...
...
@@ -189,7 +189,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
#if 0
if (pTask->execType != TASK_EXEC__NONE) {
#endif
streamExec
(
pTask
,
pTask
->
pMsgCb
);
streamExec
(
pTask
);
#if 0
} else {
ASSERT(pTask->sinkType != TASK_SINK__NONE);
...
...
@@ -208,7 +208,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
// 3.2 dispatch / sink
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
streamDispatch
(
pTask
,
pTask
->
pMsgCb
);
streamDispatch
(
pTask
);
}
return
0
;
...
...
@@ -233,26 +233,55 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
return
0
;
}
// continue dispatch
streamDispatch
(
pTask
,
pTask
->
pMsgCb
);
streamDispatch
(
pTask
);
return
0
;
}
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
)
{
streamExec
(
pTask
,
pTask
->
pMsgCb
);
streamExec
(
pTask
);
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
streamDispatch
(
pTask
,
pTask
->
pMsgCb
);
streamDispatch
(
pTask
);
}
return
0
;
}
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
)
{
//
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
void
*
buf
=
rpcMallocCont
(
sizeof
(
SMsgHead
)
+
sizeof
(
SStreamTaskRecoverRsp
));
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pReq
->
upstreamNodeId
);
SStreamTaskRecoverRsp
*
pCont
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
pCont
->
inputStatus
=
pTask
->
inputStatus
;
pCont
->
streamId
=
pTask
->
streamId
;
pCont
->
reqTaskId
=
pTask
->
taskId
;
pCont
->
rspTaskId
=
pReq
->
upstreamTaskId
;
pRsp
->
pCont
=
buf
;
pRsp
->
contLen
=
sizeof
(
SMsgHead
)
+
sizeof
(
SStreamTaskRecoverRsp
);
tmsgSendRsp
(
pRsp
);
return
0
;
}
int32_t
streamProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
)
{
//
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__NORMAL
)
{
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
streamProcessRunReq
(
pTask
);
if
(
pTask
->
isDataScan
)
{
// scan data to recover
pTask
->
inputStatus
=
TASK_INPUT_STATUS__RECOVER
;
pTask
->
taskStatus
=
TASK_STATUS__RECOVERING
;
qStreamPrepareRecover
(
pTask
->
exec
.
executor
,
pTask
->
startVer
,
pTask
->
recoverSnapVer
);
if
(
streamPipelineExec
(
pTask
,
100
)
<
0
)
{
return
-
1
;
}
}
else
{
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
taskStatus
=
TASK_STATUS__NORMAL
;
}
}
return
0
;
}
...
...
@@ -262,10 +291,10 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve
(
pTask
,
pReq
,
pRsp
);
ASSERT
(
pTask
->
execType
!=
TASK_EXEC__NONE
);
streamExec
(
pTask
,
pTask
->
pMsgCb
);
streamExec
(
pTask
);
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
streamDispatch
(
pTask
,
pTask
->
pMsgCb
);
streamDispatch
(
pTask
);
return
0
;
}
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
e71375f1
...
...
@@ -438,7 +438,7 @@ FAIL:
return
code
;
}
int32_t
streamDispatch
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
int32_t
streamDispatch
(
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
#if 1
int8_t
old
=
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
e71375f1
...
...
@@ -141,7 +141,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
streamDispatch
(
pTask
,
pTask
->
pMsgCb
);
streamDispatch
(
pTask
);
}
}
...
...
@@ -229,7 +229,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
}
// TODO: handle version
int32_t
streamExec
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
int32_t
streamExec
(
SStreamTask
*
pTask
)
{
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
return
-
1
;
while
(
1
)
{
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
e71375f1
...
...
@@ -19,8 +19,8 @@ int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecover
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
source
TaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceVg
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
upstream
TaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
upstreamNodeId
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
...
...
@@ -29,8 +29,8 @@ int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* p
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
source
TaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceVg
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
upstream
TaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
upstreamNodeId
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
...
...
@@ -38,7 +38,8 @@ int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* p
int32_t
tEncodeStreamTaskRecoverRsp
(
SEncoder
*
pEncoder
,
const
SStreamTaskRecoverRsp
*
pRsp
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pRsp
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
reqTaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
rspTaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pRsp
->
inputStatus
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
...
...
@@ -47,7 +48,8 @@ int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecover
int32_t
tDecodeStreamTaskRecoverRsp
(
SDecoder
*
pDecoder
,
SStreamTaskRecoverRsp
*
pReq
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
reqTaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
rspTaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pReq
->
inputStatus
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
...
...
@@ -125,7 +127,7 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq*
}
if
(
pTask
->
taskStatus
==
TASK_STATUS__RECOVERING
)
{
if
(
streamPipelineExec
(
pTask
,
10
)
<
0
)
{
if
(
streamPipelineExec
(
pTask
,
10
0
)
<
0
)
{
// set fail
return
-
1
;
}
...
...
source/util/src/tarray.c
浏览文件 @
e71375f1
...
...
@@ -294,7 +294,7 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
void
taosArrayPopFrontBatch
(
SArray
*
pArray
,
size_t
cnt
)
{
assert
(
cnt
<=
pArray
->
size
);
pArray
->
size
=
pArray
->
size
-
cnt
;
if
(
pArray
->
size
==
0
)
{
if
(
pArray
->
size
==
0
||
cnt
==
0
)
{
return
;
}
memmove
(
pArray
->
pData
,
(
char
*
)
pArray
->
pData
+
cnt
*
pArray
->
elemSize
,
pArray
->
size
*
pArray
->
elemSize
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录