Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d755ad67
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d755ad67
编写于
8月 04, 2022
作者:
L
Liu Jicong
提交者:
GitHub
8月 04, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15745 from taosdata/feature/stream
refactor(stream): exec scheduler
上级
3b0b5007
03141400
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
91 addition
and
132 deletion
+91
-132
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+9
-7
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+4
-3
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+0
-25
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+34
-10
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+2
-16
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+36
-66
source/libs/stream/src/streamQueue.c
source/libs/stream/src/streamQueue.c
+3
-2
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+3
-3
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
d755ad67
...
@@ -46,9 +46,10 @@ enum {
...
@@ -46,9 +46,10 @@ enum {
};
};
enum
{
enum
{
TASK_EXEC_STATUS__IDLE
=
1
,
TASK_SCHED_STATUS__INACTIVE
=
1
,
TASK_EXEC_STATUS__EXECUTING
,
TASK_SCHED_STATUS__WAITING
,
TASK_EXEC_STATUS__CLOSING
,
TASK_SCHED_STATUS__ACTIVE
,
TASK_SCHED_STATUS__FAILED
,
};
};
enum
{
enum
{
...
@@ -204,13 +205,11 @@ typedef struct {
...
@@ -204,13 +205,11 @@ typedef struct {
enum
{
enum
{
TASK_SOURCE__SCAN
=
1
,
TASK_SOURCE__SCAN
=
1
,
TASK_SOURCE__PIPE
,
TASK_SOURCE__PIPE
,
TASK_SOURCE__MERGE
,
};
};
enum
{
enum
{
TASK_EXEC__NONE
=
1
,
TASK_EXEC__NONE
=
1
,
TASK_EXEC__PIPE
,
TASK_EXEC__PIPE
,
TASK_EXEC__MERGE
,
};
};
enum
{
enum
{
...
@@ -256,7 +255,7 @@ typedef struct SStreamTask {
...
@@ -256,7 +255,7 @@ typedef struct SStreamTask {
int16_t
dispatchMsgType
;
int16_t
dispatchMsgType
;
int8_t
taskStatus
;
int8_t
taskStatus
;
int8_t
exec
Status
;
int8_t
sched
Status
;
// node info
// node info
int32_t
selfChildId
;
int32_t
selfChildId
;
...
@@ -475,7 +474,7 @@ typedef struct {
...
@@ -475,7 +474,7 @@ typedef struct {
int32_t
tDecodeStreamDispatchReq
(
SDecoder
*
pDecoder
,
SStreamDispatchReq
*
pReq
);
int32_t
tDecodeStreamDispatchReq
(
SDecoder
*
pDecoder
,
SStreamDispatchReq
*
pReq
);
int32_t
tDecodeStreamRetrieveReq
(
SDecoder
*
pDecoder
,
SStreamRetrieveReq
*
pReq
);
int32_t
tDecodeStreamRetrieveReq
(
SDecoder
*
pDecoder
,
SStreamRetrieveReq
*
pReq
);
int32_t
streamLaunchByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
);
//
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
int32_t
streamSetupTrigger
(
SStreamTask
*
pTask
);
int32_t
streamSetupTrigger
(
SStreamTask
*
pTask
);
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
);
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
);
...
@@ -487,6 +486,9 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
...
@@ -487,6 +486,9 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
int32_t
streamProcessRetrieveReq
(
SStreamTask
*
pTask
,
SStreamRetrieveReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRetrieveReq
(
SStreamTask
*
pTask
,
SStreamRetrieveReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRetrieveRsp
(
SStreamTask
*
pTask
,
SStreamRetrieveRsp
*
pRsp
);
int32_t
streamProcessRetrieveRsp
(
SStreamTask
*
pTask
,
SStreamRetrieveRsp
*
pRsp
);
int32_t
streamTryExec
(
SStreamTask
*
pTask
);
int32_t
streamSchedExec
(
SStreamTask
*
pTask
);
typedef
struct
SStreamMeta
SStreamMeta
;
typedef
struct
SStreamMeta
SStreamMeta
;
SStreamMeta
*
streamMetaOpen
();
SStreamMeta
*
streamMetaOpen
();
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d755ad67
...
@@ -604,7 +604,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
...
@@ -604,7 +604,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
ASSERT
(
taosArrayGetSize
(
pTask
->
childEpInfo
)
!=
0
);
ASSERT
(
taosArrayGetSize
(
pTask
->
childEpInfo
)
!=
0
);
}
}
pTask
->
execStatus
=
TASK_EXEC_STATUS__IDL
E
;
pTask
->
schedStatus
=
TASK_SCHED_STATUS__INACTIV
E
;
pTask
->
inputQueue
=
streamQueueOpen
();
pTask
->
inputQueue
=
streamQueueOpen
();
pTask
->
outputQueue
=
streamQueueOpen
();
pTask
->
outputQueue
=
streamQueueOpen
();
...
@@ -720,7 +720,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
...
@@ -720,7 +720,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
continue
;
continue
;
}
}
if
(
stream
LaunchByWrite
(
pTask
,
TD_VID
(
pTq
->
pVnode
)
)
<
0
)
{
if
(
stream
SchedExec
(
pTask
)
<
0
)
{
qError
(
"stream task launch failed, task id %d"
,
pTask
->
taskId
);
qError
(
"stream task launch failed, task id %d"
,
pTask
->
taskId
);
continue
;
continue
;
}
}
...
@@ -751,12 +751,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -751,12 +751,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
int32_t
tqProcessTaskDispatchReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
bool
exec
)
{
int32_t
tqProcessTaskDispatchReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
bool
exec
)
{
ASSERT
(
0
);
char
*
msgStr
=
pMsg
->
pCont
;
char
*
msgStr
=
pMsg
->
pCont
;
char
*
msgBody
=
POINTER_SHIFT
(
msgStr
,
sizeof
(
SMsgHead
));
char
*
msgBody
=
POINTER_SHIFT
(
msgStr
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
SStreamDispatchReq
req
;
SStreamDispatchReq
req
;
SDecoder
decoder
;
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
msgBody
,
msgLen
);
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
msgBody
,
msgLen
);
tDecodeStreamDispatchReq
(
&
decoder
,
&
req
);
tDecodeStreamDispatchReq
(
&
decoder
,
&
req
);
int32_t
taskId
=
req
.
taskId
;
int32_t
taskId
=
req
.
taskId
;
SStreamTask
**
ppTask
=
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
**
ppTask
=
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
d755ad67
...
@@ -136,31 +136,6 @@ int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
...
@@ -136,31 +136,6 @@ int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
return 0;
return 0;
}
}
int32_t tqEnqueueAll(STQ* pTq, SSubmitReq* pReq) {
void* pIter = NULL;
SStreamDataSubmit* pSubmit = streamDataSubmitNew(pReq);
if (pSubmit == NULL) {
return -1;
}
while (1) {
pIter = taosHashIterate(pTq->handles, pIter);
if (pIter == NULL) break;
STqHandle* pHandle = (STqHandle*)pIter;
if (tqEnqueue(pHandle, pSubmit) < 0) {
continue;
}
int8_t execStatus = atomic_load_8(&pHandle->pushHandle.execStatus);
if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) {
tqSendExecReq(pTq, pHandle);
}
}
streamDataSubmitRefDec(pSubmit);
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
if (msgType != TDMT_VND_SUBMIT) return 0;
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL;
void* pIter = NULL;
...
...
source/libs/stream/src/stream.c
浏览文件 @
d755ad67
...
@@ -68,7 +68,7 @@ void streamTriggerByTimer(void* param, void* tmrId) {
...
@@ -68,7 +68,7 @@ void streamTriggerByTimer(void* param, void* tmrId) {
atomic_store_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__IN_ACTIVE
);
atomic_store_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__IN_ACTIVE
);
streamTaskInput
(
pTask
,
(
SStreamQueueItem
*
)
trigger
);
streamTaskInput
(
pTask
,
(
SStreamQueueItem
*
)
trigger
);
stream
LaunchByWrite
(
pTask
,
pTask
->
nodeId
);
stream
SchedExec
(
pTask
);
}
}
taosTmrReset
(
streamTriggerByTimer
,
(
int32_t
)
pTask
->
triggerParam
,
pTask
,
streamEnv
.
timer
,
&
pTask
->
timer
);
taosTmrReset
(
streamTriggerByTimer
,
(
int32_t
)
pTask
->
triggerParam
,
pTask
,
streamEnv
.
timer
,
&
pTask
->
timer
);
...
@@ -82,9 +82,10 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
...
@@ -82,9 +82,10 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
return
0
;
return
0
;
}
}
#if 0
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
exec
Status
);
int8_t
schedStatus = atomic_load_8(&pTask->sched
Status);
if
(
execStatus
==
TASK_EXEC_STATUS__IDLE
||
execStatus
==
TASK_EXEC_STATUS__CLOSING
)
{
if (
schedStatus == TASK_SCHED_STATUS__INACTIVE
) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) return -1;
if (pRunReq == NULL) return -1;
...
@@ -101,6 +102,29 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
...
@@ -101,6 +102,29 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
}
}
return 0;
return 0;
}
}
#endif
int32_t
streamSchedExec
(
SStreamTask
*
pTask
)
{
int8_t
schedStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
,
TASK_SCHED_STATUS__WAITING
);
if
(
schedStatus
==
TASK_SCHED_STATUS__INACTIVE
)
{
SStreamTaskRunReq
*
pRunReq
=
rpcMallocCont
(
sizeof
(
SStreamTaskRunReq
));
if
(
pRunReq
==
NULL
)
{
atomic_store_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
);
return
-
1
;
}
pRunReq
->
head
.
vgId
=
pTask
->
nodeId
;
pRunReq
->
streamId
=
pTask
->
streamId
;
pRunReq
->
taskId
=
pTask
->
taskId
;
SRpcMsg
msg
=
{
.
msgType
=
TDMT_STREAM_TASK_RUN
,
.
pCont
=
pRunReq
,
.
contLen
=
sizeof
(
SStreamTaskRunReq
),
};
tmsgPutToQueue
(
pTask
->
pMsgCb
,
STREAM_QUEUE
,
&
msg
);
}
return
0
;
}
int32_t
streamTaskEnqueue
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
int32_t
streamTaskEnqueue
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
SStreamDataBlock
*
pData
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
);
SStreamDataBlock
*
pData
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
);
...
@@ -182,14 +206,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
...
@@ -182,14 +206,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
if
(
exec
)
{
if
(
exec
)
{
streamExec
(
pTask
);
stream
Try
Exec
(
pTask
);
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
streamDispatch
(
pTask
);
streamDispatch
(
pTask
);
}
}
}
else
{
}
else
{
stream
LaunchByWrite
(
pTask
,
pTask
->
nodeId
);
stream
SchedExec
(
pTask
);
}
}
return
0
;
return
0
;
...
@@ -219,7 +242,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
...
@@ -219,7 +242,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
}
}
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
)
{
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
)
{
streamExec
(
pTask
);
stream
Try
Exec
(
pTask
);
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
streamDispatch
(
pTask
);
streamDispatch
(
pTask
);
...
@@ -272,10 +295,11 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
...
@@ -272,10 +295,11 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve
(
pTask
,
pReq
,
pRsp
);
streamTaskEnqueueRetrieve
(
pTask
,
pReq
,
pRsp
);
ASSERT
(
pTask
->
execType
!=
TASK_EXEC__NONE
);
ASSERT
(
pTask
->
execType
!=
TASK_EXEC__NONE
);
streamExec
(
pTask
);
streamSchedExec
(
pTask
);
/*streamTryExec(pTask);*/
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
/*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/
streamDispatch
(
pTask
);
/*streamDispatch(pTask);*/
return
0
;
return
0
;
}
}
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
d755ad67
...
@@ -440,13 +440,13 @@ FAIL:
...
@@ -440,13 +440,13 @@ FAIL:
int32_t
streamDispatch
(
SStreamTask
*
pTask
)
{
int32_t
streamDispatch
(
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
#if 1
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
int8_t
old
=
int8_t
old
=
atomic_val_compare_exchange_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
,
TASK_OUTPUT_STATUS__WAIT
);
atomic_val_compare_exchange_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
,
TASK_OUTPUT_STATUS__WAIT
);
if
(
old
!=
TASK_OUTPUT_STATUS__NORMAL
)
{
if
(
old
!=
TASK_OUTPUT_STATUS__NORMAL
)
{
return
0
;
return
0
;
}
}
#endif
SStreamDataBlock
*
pBlock
=
streamQueueNextItem
(
pTask
->
outputQueue
);
SStreamDataBlock
*
pBlock
=
streamQueueNextItem
(
pTask
->
outputQueue
);
if
(
pBlock
==
NULL
)
{
if
(
pBlock
==
NULL
)
{
...
@@ -466,22 +466,8 @@ int32_t streamDispatch(SStreamTask* pTask) {
...
@@ -466,22 +466,8 @@ int32_t streamDispatch(SStreamTask* pTask) {
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
goto
FREE
;
goto
FREE
;
}
}
/*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/
FREE:
FREE:
taosArrayDestroyEx
(
pBlock
->
blocks
,
(
FDelete
)
blockDataFreeRes
);
taosArrayDestroyEx
(
pBlock
->
blocks
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
pBlock
);
taosFreeQitem
(
pBlock
);
#if 0
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return -1;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
tmsgSendReq(pEpSet, &dispatchMsg);
#endif
return
code
;
return
code
;
}
}
source/libs/stream/src/streamExec.c
浏览文件 @
d755ad67
...
@@ -147,24 +147,23 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
...
@@ -147,24 +147,23 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
return
0
;
return
0
;
}
}
// TODO: handle version
static
SArray
*
streamExecForQall
(
SStreamTask
*
pTask
,
SArray
*
pRes
)
{
int32_t
streamExecForAll
(
SStreamTask
*
pTask
)
{
while
(
1
)
{
while
(
1
)
{
int32_t
cnt
=
1
;
int32_t
cnt
=
1
;
void
*
data
=
NULL
;
void
*
data
=
NULL
;
while
(
1
)
{
while
(
1
)
{
SStreamQueueItem
*
qItem
=
streamQueueNextItem
(
pTask
->
inputQueue
);
SStreamQueueItem
*
qItem
=
streamQueueNextItem
(
pTask
->
inputQueue
);
if
(
qItem
==
NULL
)
{
if
(
qItem
==
NULL
)
{
qDebug
(
"stream
exec over, queue empty"
);
qDebug
(
"stream
task exec over, queue empty, task: %d"
,
pTask
->
taskId
);
break
;
break
;
}
}
if
(
data
==
NULL
)
{
if
(
data
==
NULL
)
{
data
=
qItem
;
data
=
qItem
;
streamQueueProcessSuccess
(
pTask
->
inputQueue
);
streamQueueProcessSuccess
(
pTask
->
inputQueue
);
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
break
;
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
{
/*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/
break
;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
}
/*}*/
}
else
{
}
else
{
void
*
newRet
;
void
*
newRet
;
if
((
newRet
=
streamAppendQueueItem
(
data
,
qItem
))
==
NULL
)
{
if
((
newRet
=
streamAppendQueueItem
(
data
,
qItem
))
==
NULL
)
{
...
@@ -181,11 +180,12 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
...
@@ -181,11 +180,12 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if
(
pTask
->
taskStatus
==
TASK_STATUS__DROPPING
)
{
if
(
pTask
->
taskStatus
==
TASK_STATUS__DROPPING
)
{
if
(
data
)
streamFreeQitem
(
data
);
if
(
data
)
streamFreeQitem
(
data
);
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
return
0
;
return
NULL
;
}
}
if
(
data
==
NULL
)
break
;
if
(
data
==
NULL
)
{
break
;
}
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
{
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
{
ASSERT
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_BLOCK
);
ASSERT
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_BLOCK
);
...
@@ -193,6 +193,8 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
...
@@ -193,6 +193,8 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
continue
;
continue
;
}
}
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
qDebug
(
"stream task %d exec begin, msg batch: %d"
,
pTask
->
taskId
,
cnt
);
qDebug
(
"stream task %d exec begin, msg batch: %d"
,
pTask
->
taskId
,
cnt
);
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
qDebug
(
"stream task %d exec end"
,
pTask
->
taskId
);
qDebug
(
"stream task %d exec end"
,
pTask
->
taskId
);
...
@@ -203,76 +205,44 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
...
@@ -203,76 +205,44 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
// TODO log failed ver
// TODO log failed ver
streamQueueProcessFail
(
pTask
->
inputQueue
);
streamQueueProcessFail
(
pTask
->
inputQueue
);
taosArrayDestroy
(
pRes
);
taosArrayDestroy
(
pRes
);
return
NULL
;
return
-
1
;
}
}
qRes
->
type
=
STREAM_INPUT__DATA_BLOCK
;
qRes
->
type
=
STREAM_INPUT__DATA_BLOCK
;
qRes
->
blocks
=
pRes
;
qRes
->
blocks
=
pRes
;
if
(
streamTaskOutput
(
pTask
,
qRes
)
<
0
)
{
// TODO log failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
qRes
);
return
NULL
;
}
if
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
if
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
qRes
->
childId
=
pTask
->
selfChildId
;
qRes
->
childId
=
pTask
->
selfChildId
;
qRes
->
sourceVer
=
pSubmit
->
ver
;
qRes
->
sourceVer
=
pSubmit
->
ver
;
}
}
if
(
streamTaskOutput
(
pTask
,
qRes
)
<
0
)
{
// TODO save failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
qRes
);
return
-
1
;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
/*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
}
}
streamFreeQitem
(
data
);
}
}
return
pRes
;
return
0
;
}
}
// TODO: handle version
int32_t
streamTryExec
(
SStreamTask
*
pTask
)
{
int32_t
streamExec
(
SStreamTask
*
pTask
)
{
int8_t
schedStatus
=
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
atomic_val_compare_exchange_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__WAITING
,
TASK_SCHED_STATUS__ACTIVE
);
if
(
pRes
==
NULL
)
return
-
1
;
if
(
schedStatus
==
TASK_SCHED_STATUS__WAITING
)
{
while
(
1
)
{
int32_t
code
=
streamExecForAll
(
pTask
);
int8_t
execStatus
=
if
(
code
<
0
)
{
atomic_val_compare_exchange_8
(
&
pTask
->
execStatus
,
TASK_EXEC_STATUS__IDLE
,
TASK_EXEC_STATUS__EXECUTING
);
atomic_store_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__FAILED
);
if
(
execStatus
==
TASK_EXEC_STATUS__IDLE
)
{
return
-
1
;
// first run
}
qDebug
(
"stream exec, enter exec status"
);
atomic_store_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
);
pRes
=
streamExecForQall
(
pTask
,
pRes
);
if
(
pRes
==
NULL
)
goto
FAIL
;
// temporarily disable status closing, since it runs out of threads
#if 0
// set status closing
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
// second run, make sure inputQ and qall are cleared
qDebug("stream exec, enter closing status");
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
#endif
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
if
(
!
taosQueueEmpty
(
pTask
->
inputQueue
->
queue
))
{
atomic_store_8
(
&
pTask
->
execStatus
,
TASK_EXEC_STATUS__IDLE
);
streamSchedExec
(
pTask
);
qDebug
(
"stream exec, return result"
);
return
0
;
}
else
if
(
execStatus
==
TASK_EXEC_STATUS__CLOSING
)
{
continue
;
}
else
if
(
execStatus
==
TASK_EXEC_STATUS__EXECUTING
)
{
ASSERT
(
taosArrayGetSize
(
pRes
)
==
0
);
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
return
0
;
}
else
{
ASSERT
(
0
);
}
}
}
}
FAIL:
return
0
;
if
(
pRes
)
taosArrayDestroy
(
pRes
);
if
(
pTask
->
taskStatus
==
TASK_STATUS__DROPPING
)
{
tFreeSStreamTask
(
pTask
);
return
0
;
}
else
{
atomic_store_8
(
&
pTask
->
execStatus
,
TASK_EXEC_STATUS__IDLE
);
return
-
1
;
}
}
}
source/libs/stream/src/streamQueue.c
浏览文件 @
d755ad67
...
@@ -35,9 +35,10 @@ FAIL:
...
@@ -35,9 +35,10 @@ FAIL:
void
streamQueueClose
(
SStreamQueue
*
queue
)
{
void
streamQueueClose
(
SStreamQueue
*
queue
)
{
while
(
1
)
{
while
(
1
)
{
void
*
qItem
=
streamQueueNextItem
(
queue
);
void
*
qItem
=
streamQueueNextItem
(
queue
);
if
(
qItem
)
if
(
qItem
)
{
taosFreeQitem
(
qItem
);
taosFreeQitem
(
qItem
);
else
}
else
{
return
;
return
;
}
}
}
}
}
source/libs/stream/src/streamTask.c
浏览文件 @
d755ad67
...
@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
...
@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
}
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
streamId
=
streamId
;
pTask
->
streamId
=
streamId
;
pTask
->
execStatus
=
TASK_EXEC_STATUS__IDL
E
;
pTask
->
schedStatus
=
TASK_SCHED_STATUS__INACTIV
E
;
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
...
@@ -59,7 +59,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
...
@@ -59,7 +59,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
taskStatus
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
taskStatus
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
exec
Status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sched
Status
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
...
@@ -114,7 +114,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
...
@@ -114,7 +114,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
taskStatus
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
taskStatus
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
exec
Status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sched
Status
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录