Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9394e338
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9394e338
编写于
7月 15, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(stream): recover from failure
上级
e01297d0
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
423 addition
and
43 deletion
+423
-43
include/common/tcommon.h
include/common/tcommon.h
+5
-5
include/common/tmsg.h
include/common/tmsg.h
+24
-5
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+2
-0
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+47
-2
source/common/src/tmsg.c
source/common/src/tmsg.c
+34
-5
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+18
-0
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+1
-0
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+2
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+1
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+65
-1
source/dnode/mnode/sdb/inc/sdb.h
source/dnode/mnode/sdb/inc/sdb.h
+13
-12
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+10
-0
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+9
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+25
-8
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+7
-2
source/libs/stream/src/streamData.c
source/libs/stream/src/streamData.c
+1
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+25
-2
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+131
-0
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+2
-0
未找到文件。
include/common/tcommon.h
浏览文件 @
9394e338
...
...
@@ -57,7 +57,7 @@ enum {
// STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN
,
STREAM_INPUT__DATA_RETRIEVE
,
STREAM_INPUT__
TRIGGER
,
STREAM_INPUT__
GET_RES
,
STREAM_INPUT__CHECKPOINT
,
STREAM_INPUT__DROP
,
};
...
...
@@ -155,10 +155,10 @@ typedef struct SQueryTableDataCond {
int32_t
numOfCols
;
SColumnInfo
*
colList
;
int32_t
type
;
// data block load type:
// int32_t numOfTWindows;
STimeWindow
twindows
;
int64_t
startVersion
;
int64_t
endVersion
;
// int32_t numOfTWindows;
STimeWindow
twindows
;
int64_t
startVersion
;
int64_t
endVersion
;
}
SQueryTableDataCond
;
int32_t
tEncodeDataBlock
(
void
**
buf
,
const
SSDataBlock
*
pBlock
);
...
...
include/common/tmsg.h
浏览文件 @
9394e338
...
...
@@ -1968,7 +1968,7 @@ typedef struct SVCreateTbReq {
int8_t
type
;
union
{
struct
{
char
*
name
;
// super table name
char
*
name
;
// super table name
tb_uid_t
suid
;
SArray
*
tagName
;
uint8_t
*
pTag
;
...
...
@@ -2437,9 +2437,6 @@ typedef struct {
int8_t
igNotExists
;
}
SMDropStreamReq
;
int32_t
tSerializeSMDropStreamReq
(
void
*
buf
,
int32_t
bufLen
,
const
SMDropStreamReq
*
pReq
);
int32_t
tDeserializeSMDropStreamReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropStreamReq
*
pReq
);
typedef
struct
{
int8_t
reserved
;
}
SMDropStreamRsp
;
...
...
@@ -2454,6 +2451,27 @@ typedef struct {
int8_t
reserved
;
}
SVDropStreamTaskRsp
;
int32_t
tSerializeSMDropStreamReq
(
void
*
buf
,
int32_t
bufLen
,
const
SMDropStreamReq
*
pReq
);
int32_t
tDeserializeSMDropStreamReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropStreamReq
*
pReq
);
typedef
struct
{
char
name
[
TSDB_STREAM_FNAME_LEN
];
int8_t
igNotExists
;
}
SMRecoverStreamReq
;
typedef
struct
{
int8_t
reserved
;
}
SMRecoverStreamRsp
;
typedef
struct
{
int64_t
recoverObjUid
;
int32_t
taskId
;
int32_t
hasCheckPoint
;
}
SMVStreamGatherInfoReq
;
int32_t
tSerializeSMRecoverStreamReq
(
void
*
buf
,
int32_t
bufLen
,
const
SMRecoverStreamReq
*
pReq
);
int32_t
tDeserializeSMRecoverStreamReq
(
void
*
buf
,
int32_t
bufLen
,
SMRecoverStreamReq
*
pReq
);
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
...
...
@@ -2876,7 +2894,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
}
static
FORCE_INLINE
void
*
tDecodeSMqMetaRsp
(
const
void
*
buf
,
SMqMetaRsp
*
pRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
buf
=
taosDecodeFixedI16
(
buf
,
&
pRsp
->
resMsgType
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
metaRspLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pRsp
->
metaRsp
,
pRsp
->
metaRspLen
);
...
...
include/common/tmsgdef.h
浏览文件 @
9394e338
...
...
@@ -131,6 +131,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_STREAM
,
"create-stream"
,
SCMCreateStreamReq
,
SCMCreateStreamRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_STREAM
,
"alter-stream"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_STREAM
,
"drop-stream"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_RECOVER_STREAM
,
"recover-stream"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_INDEX
,
"create-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_INDEX
,
"drop-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_INDEX
,
"get-index"
,
NULL
,
NULL
)
...
...
include/libs/executor/executor.h
浏览文件 @
9394e338
...
...
@@ -192,6 +192,8 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t
qStreamInput
(
qTaskInfo_t
tinfo
,
void
*
pItem
);
int32_t
qStreamPrepareRecover
(
qTaskInfo_t
tinfo
,
int64_t
startVer
,
int64_t
endVer
);
#ifdef __cplusplus
}
#endif
...
...
include/libs/stream/tstream.h
浏览文件 @
9394e338
...
...
@@ -34,6 +34,10 @@ typedef struct SStreamTask SStreamTask;
enum
{
TASK_STATUS__NORMAL
=
0
,
TASK_STATUS__DROPPING
,
TASK_STATUS__FAIL
,
TASK_STATUS__STOP
,
TASK_STATUS__PREPARE_RECOVER
,
TASK_STATUS__RECOVERING
,
};
enum
{
...
...
@@ -72,6 +76,7 @@ typedef struct {
int8_t
type
;
int32_t
srcVgId
;
int32_t
childId
;
int64_t
sourceVer
;
SArray
*
blocks
;
// SArray<SSDataBlock*>
...
...
@@ -222,6 +227,8 @@ typedef struct {
int32_t
nodeId
;
int32_t
childId
;
int32_t
taskId
;
int64_t
checkpointVer
;
int64_t
processedVer
;
SEpSet
epSet
;
}
SStreamChildEpInfo
;
...
...
@@ -232,6 +239,7 @@ typedef struct SStreamTask {
int8_t
execType
;
int8_t
sinkType
;
int8_t
dispatchType
;
int8_t
isStreamDistributed
;
int16_t
dispatchMsgType
;
int8_t
taskStatus
;
...
...
@@ -242,6 +250,13 @@ typedef struct SStreamTask {
int32_t
nodeId
;
SEpSet
epSet
;
// used for semi or single task,
// while final task should have processedVer for each child
int64_t
recoverSnapVer
;
int64_t
startVer
;
int64_t
checkpointVer
;
int64_t
processedVer
;
// children info
SArray
*
childEpInfo
;
// SArray<SStreamChildEpInfo*>
...
...
@@ -316,12 +331,12 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
}
else
if
(
pItem
->
type
==
STREAM_INPUT__CHECKPOINT
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
// qStreamInput(pTask->exec.executor, pItem);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__
TRIGGER
)
{
}
else
if
(
pItem
->
type
==
STREAM_INPUT__
GET_RES
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
// qStreamInput(pTask->exec.executor, pItem);
}
if
(
pItem
->
type
!=
STREAM_INPUT__
TRIGGER
&&
pItem
->
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
)
{
if
(
pItem
->
type
!=
STREAM_INPUT__
GET_RES
&&
pItem
->
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
)
{
atomic_val_compare_exchange_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__IN_ACTIVE
,
TASK_TRIGGER_STATUS__ACTIVE
);
}
...
...
@@ -420,6 +435,36 @@ typedef struct {
int8_t
inputStatus
;
}
SStreamTaskRecoverRsp
;
int32_t
tEncodeStreamTaskRecoverReq
(
SEncoder
*
pEncoder
,
const
SStreamTaskRecoverReq
*
pReq
);
int32_t
tDecodeStreamTaskRecoverReq
(
SDecoder
*
pDecoder
,
SStreamTaskRecoverReq
*
pReq
);
int32_t
tEncodeStreamTaskRecoverRsp
(
SEncoder
*
pEncoder
,
const
SStreamTaskRecoverRsp
*
pRsp
);
int32_t
tDecodeStreamTaskRecoverRsp
(
SDecoder
*
pDecoder
,
SStreamTaskRecoverRsp
*
pRsp
);
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
}
SMStreamTaskRecoverReq
;
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
}
SMStreamTaskRecoverRsp
;
int32_t
tEncodeSMStreamTaskRecoverReq
(
SEncoder
*
pEncoder
,
const
SMStreamTaskRecoverReq
*
pReq
);
int32_t
tDecodeSMStreamTaskRecoverReq
(
SDecoder
*
pDecoder
,
SMStreamTaskRecoverReq
*
pReq
);
int32_t
tEncodeSMStreamTaskRecoverRsp
(
SEncoder
*
pEncoder
,
const
SMStreamTaskRecoverRsp
*
pRsp
);
int32_t
tDecodeSMStreamTaskRecoverRsp
(
SDecoder
*
pDecoder
,
SMStreamTaskRecoverRsp
*
pRsp
);
typedef
struct
{
int64_t
streamId
;
}
SPStreamTaskRecoverReq
;
typedef
struct
{
int8_t
reserved
;
}
SPStreamTaskRecoverRsp
;
int32_t
tDecodeStreamDispatchReq
(
SDecoder
*
pDecoder
,
SStreamDispatchReq
*
pReq
);
int32_t
tDecodeStreamRetrieveReq
(
SDecoder
*
pDecoder
,
SStreamRetrieveReq
*
pReq
);
...
...
source/common/src/tmsg.c
浏览文件 @
9394e338
...
...
@@ -4823,6 +4823,35 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *
return
0
;
}
int32_t
tSerializeSMRecoverStreamReq
(
void
*
buf
,
int32_t
bufLen
,
const
SMRecoverStreamReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSMRecoverStreamReq
(
void
*
buf
,
int32_t
bufLen
,
SMRecoverStreamReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
void
tFreeSCMCreateStreamReq
(
SCMCreateStreamReq
*
pReq
)
{
taosMemoryFreeClear
(
pReq
->
sql
);
taosMemoryFreeClear
(
pReq
->
ast
);
...
...
@@ -4945,8 +4974,8 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if
(
tEncodeTag
(
pCoder
,
(
const
STag
*
)
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
int32_t
len
=
taosArrayGetSize
(
pReq
->
ctb
.
tagName
);
if
(
tEncodeI32
(
pCoder
,
len
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
len
;
i
++
){
char
*
name
=
taosArrayGet
(
pReq
->
ctb
.
tagName
,
i
);
for
(
int32_t
i
=
0
;
i
<
len
;
i
++
)
{
char
*
name
=
taosArrayGet
(
pReq
->
ctb
.
tagName
,
i
);
if
(
tEncodeCStr
(
pCoder
,
name
)
<
0
)
return
-
1
;
}
}
else
if
(
pReq
->
type
==
TSDB_NORMAL_TABLE
)
{
...
...
@@ -4982,9 +5011,9 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
int32_t
len
=
0
;
if
(
tDecodeI32
(
pCoder
,
&
len
)
<
0
)
return
-
1
;
pReq
->
ctb
.
tagName
=
taosArrayInit
(
len
,
TSDB_COL_NAME_LEN
);
if
(
pReq
->
ctb
.
tagName
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
len
;
i
++
){
char
name
[
TSDB_COL_NAME_LEN
]
=
{
0
};
if
(
pReq
->
ctb
.
tagName
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
len
;
i
++
)
{
char
name
[
TSDB_COL_NAME_LEN
]
=
{
0
};
char
*
tmp
=
NULL
;
if
(
tDecodeCStr
(
pCoder
,
&
tmp
)
<
0
)
return
-
1
;
strcpy
(
name
,
tmp
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
9394e338
...
...
@@ -559,6 +559,7 @@ typedef struct {
// info
int64_t
uid
;
int8_t
status
;
int8_t
isDistributed
;
// config
int8_t
igExpired
;
int8_t
trigger
;
...
...
@@ -586,6 +587,23 @@ typedef struct {
int32_t
tEncodeSStreamObj
(
SEncoder
*
pEncoder
,
const
SStreamObj
*
pObj
);
int32_t
tDecodeSStreamObj
(
SDecoder
*
pDecoder
,
SStreamObj
*
pObj
);
typedef
struct
{
char
streamName
[
TSDB_STREAM_FNAME_LEN
];
int64_t
uid
;
int64_t
streamUid
;
SArray
*
childInfo
;
// SArray<SStreamChildEpInfo>
}
SStreamCheckpointObj
;
#if 0
typedef struct {
int64_t uid;
int64_t streamId;
int8_t isDistributed;
int8_t status;
int8_t stage;
} SStreamRecoverObj;
#endif
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
9394e338
...
...
@@ -27,6 +27,7 @@ typedef enum {
TRANS_STOP_FUNC_TEST
=
2
,
TRANS_START_FUNC_MQ_REB
=
3
,
TRANS_STOP_FUNC_MQ_REB
=
4
,
TRANS_FUNC_RECOVER_STREAM_STEP_NEXT
=
5
,
}
ETrnFunc
;
typedef
enum
{
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
9394e338
...
...
@@ -27,6 +27,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeI64
(
pEncoder
,
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
isDistributed
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
igExpired
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
trigger
)
<
0
)
return
-
1
;
...
...
@@ -72,6 +73,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
isDistributed
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
igExpired
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
trigger
)
<
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
9394e338
...
...
@@ -319,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
int32_t
totLevel
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
ASSERT
(
totLevel
<=
2
);
pStream
->
tasks
=
taosArrayInit
(
totLevel
,
sizeof
(
void
*
));
pStream
->
isDistributed
=
totLevel
==
2
;
bool
hasExtraSink
=
false
;
bool
externalTargetDB
=
strcmp
(
pStream
->
sourceDb
,
pStream
->
targetDb
)
!=
0
;
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
9394e338
...
...
@@ -36,7 +36,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static
int32_t
mndStreamActionUpdate
(
SSdb
*
pSdb
,
SStreamObj
*
pStream
,
SStreamObj
*
pNewStream
);
static
int32_t
mndProcessCreateStreamReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropStreamReq
(
SRpcMsg
*
pReq
);
/*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/
static
int32_t
mndProcessRecoverStreamReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessStreamMetaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndGetStreamMeta
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveStream
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
...
...
@@ -55,6 +55,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_STREAM
,
mndProcessCreateStreamReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_STREAM
,
mndProcessDropStreamReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_RECOVER_STREAM
,
mndProcessRecoverStreamReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_DEPLOY_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_DROP_RSP
,
mndTransProcessRsp
);
...
...
@@ -672,6 +673,69 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessRecoverStreamReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SStreamObj
*
pStream
=
NULL
;
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMRecoverStreamReq
recoverReq
=
{
0
};
if
(
tDeserializeSMRecoverStreamReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
recoverReq
)
<
0
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
pStream
=
mndAcquireStream
(
pMnode
,
recoverReq
.
name
);
if
(
pStream
==
NULL
)
{
if
(
recoverReq
.
igNotExists
)
{
mDebug
(
"stream:%s, not exist, ignore not exist is set"
,
recoverReq
.
name
);
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
return
0
;
}
else
{
terrno
=
TSDB_CODE_MND_STREAM_NOT_EXIST
;
return
-
1
;
}
}
if
(
mndCheckDbPrivilegeByName
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_WRITE_DB
,
pStream
->
targetDb
)
!=
0
)
{
return
-
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"stream:%s, failed to recover since %s"
,
recoverReq
.
name
,
terrstr
());
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
return
-
1
;
}
mDebug
(
"trans:%d, used to drop stream:%s"
,
pTrans
->
id
,
recoverReq
.
name
);
// broadcast to recover all tasks
if
(
mndDropStreamTasks
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
mError
(
"stream:%s, failed to recover task since %s"
,
recoverReq
.
name
,
terrstr
());
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
return
-
1
;
}
// update stream status
if
(
mndPersistDropStreamLog
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
return
-
1
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare recover stream trans since %s"
,
pTrans
->
id
,
terrstr
());
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
int32_t
mndDropStreamByDb
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
...
...
source/dnode/mnode/sdb/inc/sdb.h
浏览文件 @
9394e338
...
...
@@ -137,17 +137,18 @@ typedef enum {
SDB_USER
=
7
,
SDB_AUTH
=
8
,
SDB_ACCT
=
9
,
SDB_STREAM
=
10
,
SDB_OFFSET
=
11
,
SDB_SUBSCRIBE
=
12
,
SDB_CONSUMER
=
13
,
SDB_TOPIC
=
14
,
SDB_VGROUP
=
15
,
SDB_SMA
=
16
,
SDB_STB
=
17
,
SDB_DB
=
18
,
SDB_FUNC
=
19
,
SDB_MAX
=
20
SDB_STREAM_CK
=
10
,
SDB_STREAM
=
11
,
SDB_OFFSET
=
12
,
SDB_SUBSCRIBE
=
13
,
SDB_CONSUMER
=
14
,
SDB_TOPIC
=
15
,
SDB_VGROUP
=
16
,
SDB_SMA
=
17
,
SDB_STB
=
18
,
SDB_DB
=
19
,
SDB_FUNC
=
20
,
SDB_MAX
=
21
}
ESdbType
;
typedef
struct
SSdbRaw
{
...
...
@@ -309,7 +310,7 @@ void sdbRelease(SSdb *pSdb, void *pObj);
* @return void* The next iterator of the table.
*/
void
*
sdbFetch
(
SSdb
*
pSdb
,
ESdbType
type
,
void
*
pIter
,
void
**
ppObj
);
void
*
sdbFetchAll
(
SSdb
*
pSdb
,
ESdbType
type
,
void
*
pIter
,
void
**
ppObj
,
ESdbStatus
*
status
)
;
void
*
sdbFetchAll
(
SSdb
*
pSdb
,
ESdbType
type
,
void
*
pIter
,
void
**
ppObj
,
ESdbStatus
*
status
);
/**
* @brief Cancel a traversal
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
9394e338
...
...
@@ -139,6 +139,12 @@ typedef struct STaskIdInfo {
char
*
str
;
}
STaskIdInfo
;
enum
{
STREAM_RECOVER_STEP__NONE
=
0
,
STREAM_RECOVER_STEP__PREPARE
,
STREAM_RECOVER_STEP__SCAN
,
};
typedef
struct
{
//TODO remove prepareStatus
STqOffsetVal
prepareStatus
;
// for tmq
...
...
@@ -147,6 +153,10 @@ typedef struct {
SSDataBlock
*
pullOverBlk
;
// for streaming
SWalFilterCond
cond
;
int64_t
lastScanUid
;
int8_t
recoverStep
;
SQueryTableDataCond
tableCond
;
int64_t
recoverStartVer
;
int64_t
recoverEndVer
;
}
SStreamTaskInfo
;
typedef
struct
SExecTaskInfo
{
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
9394e338
...
...
@@ -261,6 +261,15 @@ int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
}
#endif
int32_t
qStreamPrepareRecover
(
qTaskInfo_t
tinfo
,
int64_t
startVer
,
int64_t
endVer
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
);
pTaskInfo
->
streamInfo
.
recoverStartVer
=
startVer
;
pTaskInfo
->
streamInfo
.
recoverEndVer
=
endVer
;
pTaskInfo
->
streamInfo
.
recoverStep
=
STREAM_RECOVER_STEP__PREPARE
;
return
0
;
}
void
*
qExtractReaderFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
(
void
*
)
pInfo
->
tqReader
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
9394e338
...
...
@@ -517,14 +517,12 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SArray
*
tableList
=
taosArrayGetP
(
pTaskInfo
->
tableqinfoList
.
pGroupList
,
pInfo
->
currentGroupId
);
int32_t
code
;
if
(
pInfo
->
dataReader
!=
NULL
)
{
code
=
tsdbReaderReset
(
pInfo
->
dataReader
,
&
pInfo
->
cond
);
ASSERT
(
code
==
0
);
}
else
{
code
=
tsdbReaderOpen
(
pInfo
->
readHandle
.
vnode
,
&
pInfo
->
cond
,
tableList
,
(
STsdbReader
**
)
&
pInfo
->
dataReader
,
GET_TASKID
(
pTaskInfo
));
ASSERT
(
code
==
0
);
tsdbReaderClose
(
pInfo
->
dataReader
);
int32_t
code
=
tsdbReaderOpen
(
pInfo
->
readHandle
.
vnode
,
&
pInfo
->
cond
,
tableList
,
(
STsdbReader
**
)
&
pInfo
->
dataReader
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
0
)
{
// TODO
}
}
...
...
@@ -1264,6 +1262,24 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return
NULL
;
}
if
(
pTaskInfo
->
streamInfo
.
recoverStep
==
STREAM_RECOVER_STEP__PREPARE
)
{
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
memcpy
(
&
pTSInfo
->
cond
,
&
pTaskInfo
->
streamInfo
.
tableCond
,
sizeof
(
SQueryTableDataCond
));
pTSInfo
->
scanTimes
=
0
;
pTSInfo
->
currentGroupId
=
-
1
;
pTaskInfo
->
streamInfo
.
recoverStep
=
STREAM_RECOVER_STEP__SCAN
;
}
if
(
pTaskInfo
->
streamInfo
.
recoverStep
==
STREAM_RECOVER_STEP__SCAN
)
{
SSDataBlock
*
pBlock
=
doTableScan
(
pInfo
->
pTableScanOp
);
if
(
pBlock
!=
NULL
)
{
return
pBlock
;
}
// TODO fill in bloom filter
pTaskInfo
->
streamInfo
.
recoverStep
=
STREAM_RECOVER_STEP__NONE
;
return
NULL
;
}
size_t
total
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
// TODO: refactor
if
(
pInfo
->
blockType
==
STREAM_INPUT__DATA_BLOCK
)
{
...
...
@@ -1556,6 +1572,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
goto
_error
;
}
taosArrayDestroy
(
tableIdList
);
memcpy
(
&
pTaskInfo
->
streamInfo
.
tableCond
,
&
pTSInfo
->
cond
,
sizeof
(
SQueryTableDataCond
));
}
// create the pseduo columns info
...
...
source/libs/stream/src/stream.c
浏览文件 @
9394e338
...
...
@@ -57,7 +57,7 @@ void streamTriggerByTimer(void* param, void* tmrId) {
if
(
atomic_load_8
(
&
pTask
->
triggerStatus
)
==
TASK_TRIGGER_STATUS__ACTIVE
)
{
SStreamTrigger
*
trigger
=
taosAllocateQitem
(
sizeof
(
SStreamTrigger
),
DEF_QITEM
);
if
(
trigger
==
NULL
)
return
;
trigger
->
type
=
STREAM_INPUT__
TRIGGER
;
trigger
->
type
=
STREAM_INPUT__
GET_RES
;
trigger
->
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
if
(
trigger
->
pBlock
==
NULL
)
{
taosFreeQitem
(
trigger
);
...
...
@@ -183,8 +183,11 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
#if 0
if (pTask->execType != TASK_EXEC__NONE) {
streamExec
(
pTask
,
pTask
->
pMsgCb
);
#endif
streamExec
(
pTask
,
pTask
->
pMsgCb
);
#if 0
} else {
ASSERT(pTask->sinkType != TASK_SINK__NONE);
while (1) {
...
...
@@ -195,11 +198,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
}
}
#endif
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
streamDispatch
(
pTask
,
pTask
->
pMsgCb
);
}
...
...
source/libs/stream/src/streamData.c
浏览文件 @
9394e338
...
...
@@ -112,7 +112,7 @@ int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
void
streamFreeQitem
(
SStreamQueueItem
*
data
)
{
int8_t
type
=
data
->
type
;
if
(
type
==
STREAM_INPUT__
TRIGGER
)
{
if
(
type
==
STREAM_INPUT__
GET_RES
)
{
blockDataDestroy
(((
SStreamTrigger
*
)
data
)
->
pBlock
);
taosFreeQitem
(
data
);
}
else
if
(
type
==
STREAM_INPUT__DATA_BLOCK
||
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
9394e338
...
...
@@ -20,7 +20,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
// set input
SStreamQueueItem
*
pItem
=
(
SStreamQueueItem
*
)
data
;
if
(
pItem
->
type
==
STREAM_INPUT__
TRIGGER
)
{
if
(
pItem
->
type
==
STREAM_INPUT__
GET_RES
)
{
SStreamTrigger
*
pTrigger
=
(
SStreamTrigger
*
)
data
;
qSetMultiStreamInput
(
exec
,
pTrigger
->
pBlock
,
1
,
STREAM_INPUT__DATA_BLOCK
,
false
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
...
...
@@ -73,6 +73,15 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
return
0
;
}
static
FORCE_INLINE
int32_t
streamUpdateVer
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
pBlock
)
{
ASSERT
(
pBlock
->
type
==
STREAM_INPUT__DATA_BLOCK
);
int32_t
childId
=
pBlock
->
childId
;
int64_t
ver
=
pBlock
->
sourceVer
;
SStreamChildEpInfo
*
pChildInfo
=
taosArrayGetP
(
pTask
->
childEpInfo
,
childId
);
pChildInfo
->
processedVer
=
ver
;
return
0
;
}
static
SArray
*
streamExecForQall
(
SStreamTask
*
pTask
,
SArray
*
pRes
)
{
int32_t
cnt
=
0
;
void
*
data
=
NULL
;
...
...
@@ -84,14 +93,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
}
if
(
data
==
NULL
)
{
data
=
qItem
;
if
(
qItem
->
type
==
STREAM_INPUT__DATA_BLOCK
)
{
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
}
streamQueueProcessSuccess
(
pTask
->
inputQueue
);
continue
;
}
else
{
if
(
streamAppendQueueItem
(
data
,
qItem
)
<
0
)
{
streamQueueProcessFail
(
pTask
->
inputQueue
);
break
;
}
else
{
cnt
++
;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamQueueProcessSuccess
(
pTask
->
inputQueue
);
taosArrayDestroy
(((
SStreamDataBlock
*
)
qItem
)
->
blocks
);
taosFreeQitem
(
qItem
);
...
...
@@ -106,6 +118,12 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if
(
data
==
NULL
)
return
pRes
;
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
{
ASSERT
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_BLOCK
);
streamTaskOutput
(
pTask
,
data
);
return
pRes
;
}
qDebug
(
"stream task %d exec begin, msg batch: %d"
,
pTask
->
taskId
,
cnt
);
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
qDebug
(
"stream task %d exec end"
,
pTask
->
taskId
);
...
...
@@ -125,6 +143,11 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
taosFreeQitem
(
qRes
);
return
NULL
;
}
if
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
qRes
->
childId
=
pTask
->
selfChildId
;
qRes
->
sourceVer
=
pSubmit
->
ver
;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
}
...
...
source/libs/stream/src/streamRecover.c
0 → 100644
浏览文件 @
9394e338
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
int32_t
tEncodeStreamTaskRecoverReq
(
SEncoder
*
pEncoder
,
const
SStreamTaskRecoverReq
*
pReq
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceTaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceVg
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
int32_t
tDecodeStreamTaskRecoverReq
(
SDecoder
*
pDecoder
,
SStreamTaskRecoverReq
*
pReq
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceTaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceVg
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
tEncodeStreamTaskRecoverRsp
(
SEncoder
*
pEncoder
,
const
SStreamTaskRecoverRsp
*
pRsp
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pRsp
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pRsp
->
inputStatus
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
int32_t
tDecodeStreamTaskRecoverRsp
(
SDecoder
*
pDecoder
,
SStreamTaskRecoverRsp
*
pReq
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pReq
->
inputStatus
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
tEncodeSMStreamTaskRecoverReq
(
SEncoder
*
pEncoder
,
const
SMStreamTaskRecoverReq
*
pReq
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
taskId
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
int32_t
tDecodeSMStreamTaskRecoverReq
(
SDecoder
*
pDecoder
,
SMStreamTaskRecoverReq
*
pReq
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
tEncodeSMStreamTaskRecoverRsp
(
SEncoder
*
pEncoder
,
const
SMStreamTaskRecoverRsp
*
pRsp
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pRsp
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
taskId
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
int32_t
tDecodeSMStreamTaskRecoverRsp
(
SDecoder
*
pDecoder
,
SMStreamTaskRecoverRsp
*
pReq
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
streamProcessFailRecoverReq
(
SStreamTask
*
pTask
,
SMStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
if
(
pTask
->
taskStatus
!=
TASK_STATUS__FAIL
)
{
return
0
;
}
if
(
pTask
->
isStreamDistributed
)
{
if
(
pTask
->
isDataScan
)
{
pTask
->
taskStatus
=
TASK_STATUS__PREPARE_RECOVER
;
}
else
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
pTask
->
taskStatus
=
TASK_STATUS__PREPARE_RECOVER
;
bool
hasCheckpoint
=
false
;
int32_t
childSz
=
taosArrayGetSize
(
pTask
->
childEpInfo
);
for
(
int32_t
i
=
0
;
i
<
childSz
;
i
++
)
{
SStreamChildEpInfo
*
pEpInfo
=
taosArrayGetP
(
pTask
->
childEpInfo
,
i
);
if
(
pEpInfo
->
checkpointVer
==
-
1
)
{
hasCheckpoint
=
true
;
break
;
}
}
if
(
hasCheckpoint
)
{
// load from checkpoint
}
else
{
// recover child
}
}
}
else
{
if
(
pTask
->
isDataScan
)
{
if
(
pTask
->
checkpointVer
!=
-
1
)
{
// load from checkpoint
}
else
{
// reset stream query task info
// TODO get snapshot ver
pTask
->
recoverSnapVer
=
-
1
;
qStreamPrepareRecover
(
pTask
->
exec
.
executor
,
pTask
->
startVer
,
pTask
->
recoverSnapVer
);
pTask
->
taskStatus
=
TASK_STATUS__RECOVERING
;
}
}
}
if
(
pTask
->
taskStatus
==
TASK_STATUS__RECOVERING
)
{
streamProcessRunReq
(
pTask
);
}
return
0
;
}
source/libs/stream/src/streamTask.c
浏览文件 @
9394e338
...
...
@@ -34,6 +34,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo)
if
(
tEncodeI32
(
pEncoder
,
pInfo
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pInfo
->
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pInfo
->
childId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pInfo
->
processedVer
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pInfo
->
epSet
)
<
0
)
return
-
1
;
return
0
;
}
...
...
@@ -42,6 +43,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
if
(
tDecodeI32
(
pDecoder
,
&
pInfo
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pInfo
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pInfo
->
childId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pInfo
->
processedVer
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pInfo
->
epSet
)
<
0
)
return
-
1
;
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录