Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d6cc88ed
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d6cc88ed
编写于
8月 04, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(stream): exec scheduler
上级
e43d3d88
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
83 addition
and
131 deletion
+83
-131
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+8
-6
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-1
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+0
-25
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+28
-7
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+2
-16
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+38
-71
source/libs/stream/src/streamQueue.c
source/libs/stream/src/streamQueue.c
+3
-2
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+3
-3
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
d6cc88ed
...
@@ -46,9 +46,10 @@ enum {
...
@@ -46,9 +46,10 @@ enum {
};
};
enum
{
enum
{
TASK_EXEC_STATUS__IDLE
=
1
,
TASK_SCHED_STATUS__INACTIVE
=
1
,
TASK_EXEC_STATUS__EXECUTING
,
TASK_SCHED_STATUS__WAITING
,
TASK_EXEC_STATUS__CLOSING
,
TASK_SCHED_STATUS__ACTIVE
,
TASK_SCHED_STATUS__FAILED
,
};
};
enum
{
enum
{
...
@@ -204,13 +205,11 @@ typedef struct {
...
@@ -204,13 +205,11 @@ typedef struct {
enum
{
enum
{
TASK_SOURCE__SCAN
=
1
,
TASK_SOURCE__SCAN
=
1
,
TASK_SOURCE__PIPE
,
TASK_SOURCE__PIPE
,
TASK_SOURCE__MERGE
,
};
};
enum
{
enum
{
TASK_EXEC__NONE
=
1
,
TASK_EXEC__NONE
=
1
,
TASK_EXEC__PIPE
,
TASK_EXEC__PIPE
,
TASK_EXEC__MERGE
,
};
};
enum
{
enum
{
...
@@ -256,7 +255,7 @@ typedef struct SStreamTask {
...
@@ -256,7 +255,7 @@ typedef struct SStreamTask {
int16_t
dispatchMsgType
;
int16_t
dispatchMsgType
;
int8_t
taskStatus
;
int8_t
taskStatus
;
int8_t
exec
Status
;
int8_t
sched
Status
;
// node info
// node info
int32_t
selfChildId
;
int32_t
selfChildId
;
...
@@ -487,6 +486,9 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
...
@@ -487,6 +486,9 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
int32_t
streamProcessRetrieveReq
(
SStreamTask
*
pTask
,
SStreamRetrieveReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRetrieveReq
(
SStreamTask
*
pTask
,
SStreamRetrieveReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRetrieveRsp
(
SStreamTask
*
pTask
,
SStreamRetrieveRsp
*
pRsp
);
int32_t
streamProcessRetrieveRsp
(
SStreamTask
*
pTask
,
SStreamRetrieveRsp
*
pRsp
);
int32_t
streamTryExec
(
SStreamTask
*
pTask
);
int32_t
streamSchedExec
(
SStreamTask
*
pTask
);
typedef
struct
SStreamMeta
SStreamMeta
;
typedef
struct
SStreamMeta
SStreamMeta
;
SStreamMeta
*
streamMetaOpen
();
SStreamMeta
*
streamMetaOpen
();
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d6cc88ed
...
@@ -604,7 +604,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
...
@@ -604,7 +604,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
ASSERT
(
taosArrayGetSize
(
pTask
->
childEpInfo
)
!=
0
);
ASSERT
(
taosArrayGetSize
(
pTask
->
childEpInfo
)
!=
0
);
}
}
pTask
->
execStatus
=
TASK_EXEC_STATUS__IDL
E
;
pTask
->
schedStatus
=
TASK_SCHED_STATUS__INACTIV
E
;
pTask
->
inputQueue
=
streamQueueOpen
();
pTask
->
inputQueue
=
streamQueueOpen
();
pTask
->
outputQueue
=
streamQueueOpen
();
pTask
->
outputQueue
=
streamQueueOpen
();
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
d6cc88ed
...
@@ -136,31 +136,6 @@ int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
...
@@ -136,31 +136,6 @@ int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
return 0;
return 0;
}
}
int32_t tqEnqueueAll(STQ* pTq, SSubmitReq* pReq) {
void* pIter = NULL;
SStreamDataSubmit* pSubmit = streamDataSubmitNew(pReq);
if (pSubmit == NULL) {
return -1;
}
while (1) {
pIter = taosHashIterate(pTq->handles, pIter);
if (pIter == NULL) break;
STqHandle* pHandle = (STqHandle*)pIter;
if (tqEnqueue(pHandle, pSubmit) < 0) {
continue;
}
int8_t execStatus = atomic_load_8(&pHandle->pushHandle.execStatus);
if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) {
tqSendExecReq(pTq, pHandle);
}
}
streamDataSubmitRefDec(pSubmit);
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
if (msgType != TDMT_VND_SUBMIT) return 0;
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL;
void* pIter = NULL;
...
...
source/libs/stream/src/stream.c
浏览文件 @
d6cc88ed
...
@@ -83,8 +83,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
...
@@ -83,8 +83,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
}
}
int32_t
streamLaunchByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
)
{
int32_t
streamLaunchByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
)
{
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
exec
Status
);
int8_t
schedStatus
=
atomic_load_8
(
&
pTask
->
sched
Status
);
if
(
execStatus
==
TASK_EXEC_STATUS__IDLE
||
execStatus
==
TASK_EXEC_STATUS__CLOSING
)
{
if
(
schedStatus
==
TASK_SCHED_STATUS__INACTIVE
)
{
SStreamTaskRunReq
*
pRunReq
=
rpcMallocCont
(
sizeof
(
SStreamTaskRunReq
));
SStreamTaskRunReq
*
pRunReq
=
rpcMallocCont
(
sizeof
(
SStreamTaskRunReq
));
if
(
pRunReq
==
NULL
)
return
-
1
;
if
(
pRunReq
==
NULL
)
return
-
1
;
...
@@ -102,6 +102,28 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
...
@@ -102,6 +102,28 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
return
0
;
return
0
;
}
}
int32_t
streamSchedExec
(
SStreamTask
*
pTask
)
{
int8_t
schedStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
,
TASK_SCHED_STATUS__WAITING
);
if
(
schedStatus
==
TASK_SCHED_STATUS__INACTIVE
)
{
SStreamTaskRunReq
*
pRunReq
=
rpcMallocCont
(
sizeof
(
SStreamTaskRunReq
));
if
(
pRunReq
==
NULL
)
{
atomic_store_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
);
return
-
1
;
}
pRunReq
->
head
.
vgId
=
pTask
->
nodeId
;
pRunReq
->
streamId
=
pTask
->
streamId
;
pRunReq
->
taskId
=
pTask
->
taskId
;
SRpcMsg
msg
=
{
.
msgType
=
TDMT_STREAM_TASK_RUN
,
.
pCont
=
pRunReq
,
.
contLen
=
sizeof
(
SStreamTaskRunReq
),
};
tmsgPutToQueue
(
pTask
->
pMsgCb
,
STREAM_QUEUE
,
&
msg
);
}
return
0
;
}
int32_t
streamTaskEnqueue
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
int32_t
streamTaskEnqueue
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
SStreamDataBlock
*
pData
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
);
SStreamDataBlock
*
pData
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
);
int8_t
status
;
int8_t
status
;
...
@@ -182,14 +204,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
...
@@ -182,14 +204,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
if
(
exec
)
{
if
(
exec
)
{
streamExec
(
pTask
);
stream
Try
Exec
(
pTask
);
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
streamDispatch
(
pTask
);
streamDispatch
(
pTask
);
}
}
}
else
{
}
else
{
stream
LaunchByWrite
(
pTask
,
pTask
->
nodeId
);
stream
SchedExec
(
pTask
);
}
}
return
0
;
return
0
;
...
@@ -219,7 +240,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
...
@@ -219,7 +240,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
}
}
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
)
{
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
)
{
streamExec
(
pTask
);
stream
Try
Exec
(
pTask
);
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
if
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
)
{
streamDispatch
(
pTask
);
streamDispatch
(
pTask
);
...
@@ -272,7 +293,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
...
@@ -272,7 +293,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve
(
pTask
,
pReq
,
pRsp
);
streamTaskEnqueueRetrieve
(
pTask
,
pReq
,
pRsp
);
ASSERT
(
pTask
->
execType
!=
TASK_EXEC__NONE
);
ASSERT
(
pTask
->
execType
!=
TASK_EXEC__NONE
);
streamExec
(
pTask
);
stream
Try
Exec
(
pTask
);
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
streamDispatch
(
pTask
);
streamDispatch
(
pTask
);
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
d6cc88ed
...
@@ -440,13 +440,13 @@ FAIL:
...
@@ -440,13 +440,13 @@ FAIL:
int32_t
streamDispatch
(
SStreamTask
*
pTask
)
{
int32_t
streamDispatch
(
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
#if 1
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
int8_t
old
=
int8_t
old
=
atomic_val_compare_exchange_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
,
TASK_OUTPUT_STATUS__WAIT
);
atomic_val_compare_exchange_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
,
TASK_OUTPUT_STATUS__WAIT
);
if
(
old
!=
TASK_OUTPUT_STATUS__NORMAL
)
{
if
(
old
!=
TASK_OUTPUT_STATUS__NORMAL
)
{
return
0
;
return
0
;
}
}
#endif
SStreamDataBlock
*
pBlock
=
streamQueueNextItem
(
pTask
->
outputQueue
);
SStreamDataBlock
*
pBlock
=
streamQueueNextItem
(
pTask
->
outputQueue
);
if
(
pBlock
==
NULL
)
{
if
(
pBlock
==
NULL
)
{
...
@@ -466,22 +466,8 @@ int32_t streamDispatch(SStreamTask* pTask) {
...
@@ -466,22 +466,8 @@ int32_t streamDispatch(SStreamTask* pTask) {
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
goto
FREE
;
goto
FREE
;
}
}
/*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/
FREE:
FREE:
taosArrayDestroyEx
(
pBlock
->
blocks
,
(
FDelete
)
blockDataFreeRes
);
taosArrayDestroyEx
(
pBlock
->
blocks
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
pBlock
);
taosFreeQitem
(
pBlock
);
#if 0
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return -1;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
tmsgSendReq(pEpSet, &dispatchMsg);
#endif
return
code
;
return
code
;
}
}
source/libs/stream/src/streamExec.c
浏览文件 @
d6cc88ed
...
@@ -147,24 +147,26 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
...
@@ -147,24 +147,26 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
return
0
;
return
0
;
}
}
// TODO: handle version
static
SArray
*
streamExecForQall
(
SStreamTask
*
pTask
,
SArray
*
pRes
)
{
int32_t
streamExecForAll
(
SStreamTask
*
pTask
)
{
while
(
1
)
{
while
(
1
)
{
int32_t
cnt
=
1
;
int32_t
cnt
=
1
;
void
*
data
=
NULL
;
void
*
data
=
NULL
;
while
(
1
)
{
while
(
1
)
{
SStreamQueueItem
*
qItem
=
streamQueueNextItem
(
pTask
->
inputQueue
);
SStreamQueueItem
*
qItem
=
streamQueueNextItem
(
pTask
->
inputQueue
);
if
(
qItem
==
NULL
)
{
if
(
qItem
==
NULL
)
{
qDebug
(
"stream
exec over, queue empty"
);
qDebug
(
"stream
task exec over, queue empty, task: %d"
,
pTask
->
taskId
);
break
;
break
;
}
}
if
(
data
==
NULL
)
{
if
(
data
==
NULL
)
{
data
=
qItem
;
data
=
qItem
;
streamQueueProcessSuccess
(
pTask
->
inputQueue
);
streamQueueProcessSuccess
(
pTask
->
inputQueue
);
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
break
;
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
{
/*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/
ASSERT
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_BLOCK
);
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamTaskOutput
(
pTask
,
data
);
/*}*/
data
=
NULL
;
continue
;
}
}
else
{
}
else
{
void
*
newRet
;
void
*
newRet
;
if
((
newRet
=
streamAppendQueueItem
(
data
,
qItem
))
==
NULL
)
{
if
((
newRet
=
streamAppendQueueItem
(
data
,
qItem
))
==
NULL
)
{
...
@@ -181,18 +183,15 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
...
@@ -181,18 +183,15 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if
(
pTask
->
taskStatus
==
TASK_STATUS__DROPPING
)
{
if
(
pTask
->
taskStatus
==
TASK_STATUS__DROPPING
)
{
if
(
data
)
streamFreeQitem
(
data
);
if
(
data
)
streamFreeQitem
(
data
);
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
return
0
;
return
NULL
;
}
}
if
(
data
==
NULL
)
break
;
if
(
data
==
NULL
)
{
return
0
;
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
{
ASSERT
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_BLOCK
);
streamTaskOutput
(
pTask
,
data
);
continue
;
}
}
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
qDebug
(
"stream task %d exec begin, msg batch: %d"
,
pTask
->
taskId
,
cnt
);
qDebug
(
"stream task %d exec begin, msg batch: %d"
,
pTask
->
taskId
,
cnt
);
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
qDebug
(
"stream task %d exec end"
,
pTask
->
taskId
);
qDebug
(
"stream task %d exec end"
,
pTask
->
taskId
);
...
@@ -203,76 +202,44 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
...
@@ -203,76 +202,44 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
// TODO log failed ver
// TODO log failed ver
streamQueueProcessFail
(
pTask
->
inputQueue
);
streamQueueProcessFail
(
pTask
->
inputQueue
);
taosArrayDestroy
(
pRes
);
taosArrayDestroy
(
pRes
);
return
NULL
;
return
-
1
;
}
}
qRes
->
type
=
STREAM_INPUT__DATA_BLOCK
;
qRes
->
type
=
STREAM_INPUT__DATA_BLOCK
;
qRes
->
blocks
=
pRes
;
qRes
->
blocks
=
pRes
;
if
(
streamTaskOutput
(
pTask
,
qRes
)
<
0
)
{
// TODO log failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
qRes
);
return
NULL
;
}
if
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
if
(((
SStreamQueueItem
*
)
data
)
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
qRes
->
childId
=
pTask
->
selfChildId
;
qRes
->
childId
=
pTask
->
selfChildId
;
qRes
->
sourceVer
=
pSubmit
->
ver
;
qRes
->
sourceVer
=
pSubmit
->
ver
;
}
}
if
(
streamTaskOutput
(
pTask
,
qRes
)
<
0
)
{
// TODO save failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
qRes
);
return
-
1
;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
/*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
}
}
streamFreeQitem
(
data
);
}
}
return
pRes
;
return
0
;
}
}
// TODO: handle version
int32_t
streamTryExec
(
SStreamTask
*
pTask
)
{
int32_t
streamExec
(
SStreamTask
*
pTask
)
{
int8_t
schedStatus
=
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
atomic_val_compare_exchange_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__WAITING
,
TASK_SCHED_STATUS__ACTIVE
);
if
(
pRes
==
NULL
)
return
-
1
;
if
(
schedStatus
==
TASK_SCHED_STATUS__WAITING
)
{
while
(
1
)
{
int32_t
code
=
streamExecForAll
(
pTask
);
int8_t
execStatus
=
if
(
code
<
0
)
{
atomic_val_compare_exchange_8
(
&
pTask
->
execStatus
,
TASK_EXEC_STATUS__IDLE
,
TASK_EXEC_STATUS__EXECUTING
);
atomic_store_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__FAILED
);
if
(
execStatus
==
TASK_EXEC_STATUS__IDLE
)
{
return
-
1
;
// first run
}
qDebug
(
"stream exec, enter exec status"
);
atomic_store_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
);
pRes
=
streamExecForQall
(
pTask
,
pRes
);
if
(
pRes
==
NULL
)
goto
FAIL
;
// temporarily disable status closing, since it runs out of threads
#if 0
// set status closing
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
// second run, make sure inputQ and qall are cleared
qDebug("stream exec, enter closing status");
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
#endif
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
if
(
!
taosQueueEmpty
(
pTask
->
inputQueue
->
queue
))
{
atomic_store_8
(
&
pTask
->
execStatus
,
TASK_EXEC_STATUS__IDLE
);
streamSchedExec
(
pTask
);
qDebug
(
"stream exec, return result"
);
return
0
;
}
else
if
(
execStatus
==
TASK_EXEC_STATUS__CLOSING
)
{
continue
;
}
else
if
(
execStatus
==
TASK_EXEC_STATUS__EXECUTING
)
{
ASSERT
(
taosArrayGetSize
(
pRes
)
==
0
);
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
return
0
;
}
else
{
ASSERT
(
0
);
}
}
}
}
FAIL:
return
0
;
if
(
pRes
)
taosArrayDestroy
(
pRes
);
if
(
pTask
->
taskStatus
==
TASK_STATUS__DROPPING
)
{
tFreeSStreamTask
(
pTask
);
return
0
;
}
else
{
atomic_store_8
(
&
pTask
->
execStatus
,
TASK_EXEC_STATUS__IDLE
);
return
-
1
;
}
}
}
source/libs/stream/src/streamQueue.c
浏览文件 @
d6cc88ed
...
@@ -35,9 +35,10 @@ FAIL:
...
@@ -35,9 +35,10 @@ FAIL:
void
streamQueueClose
(
SStreamQueue
*
queue
)
{
void
streamQueueClose
(
SStreamQueue
*
queue
)
{
while
(
1
)
{
while
(
1
)
{
void
*
qItem
=
streamQueueNextItem
(
queue
);
void
*
qItem
=
streamQueueNextItem
(
queue
);
if
(
qItem
)
if
(
qItem
)
{
taosFreeQitem
(
qItem
);
taosFreeQitem
(
qItem
);
else
}
else
{
return
;
return
;
}
}
}
}
}
source/libs/stream/src/streamTask.c
浏览文件 @
d6cc88ed
...
@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
...
@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
}
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
streamId
=
streamId
;
pTask
->
streamId
=
streamId
;
pTask
->
execStatus
=
TASK_EXEC_STATUS__IDL
E
;
pTask
->
schedStatus
=
TASK_SCHED_STATUS__INACTIV
E
;
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
...
@@ -59,7 +59,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
...
@@ -59,7 +59,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
taskStatus
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
taskStatus
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
exec
Status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sched
Status
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
...
@@ -114,7 +114,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
...
@@ -114,7 +114,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
taskStatus
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
taskStatus
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
exec
Status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sched
Status
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录