提交 8f9de93c 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 30c2a9c6
...@@ -251,6 +251,7 @@ typedef struct SStreamChildEpInfo { ...@@ -251,6 +251,7 @@ typedef struct SStreamChildEpInfo {
int32_t nodeId; int32_t nodeId;
int32_t childId; int32_t childId;
int32_t taskId; int32_t taskId;
int8_t dataAllowed;
SEpSet epSet; SEpSet epSet;
} SStreamChildEpInfo; } SStreamChildEpInfo;
...@@ -400,6 +401,7 @@ typedef struct { ...@@ -400,6 +401,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t type;
int32_t taskId; int32_t taskId;
int32_t dataSrcVgId; int32_t dataSrcVgId;
int32_t upstreamTaskId; int32_t upstreamTaskId;
......
...@@ -239,21 +239,61 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -239,21 +239,61 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
// todo add the input queue buffer limitation int32_t status = 0;
streamTaskEnqueueBlocks(pTask, pReq, pRsp);
tDeleteStreamDispatchReq(pReq);
if (exec) { SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
if (streamTryExec(pTask) < 0) { ASSERT(pInfo != NULL);
return -1;
} if (!pInfo->dataAllowed) {
qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId);
status = TASK_INPUT_STATUS__BLOCKED;
} else { } else {
streamSchedExec(pTask); // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId);
}
status = streamTaskAppendInputBlocks(pTask, pReq);
} }
{
// do send response with the input status
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
if (code != TSDB_CODE_SUCCESS) {
// todo handle failure
return code;
}
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
}
tDeleteStreamDispatchReq(pReq);
streamSchedExec(pTask);
return 0; return 0;
} }
//int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
// qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
// pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
//
// // todo add the input queue buffer limitation
// streamTaskEnqueueBlocks(pTask, pReq, pRsp);
// tDeleteStreamDispatchReq(pReq);
//
// if (exec) {
// if (streamTryExec(pTask) < 0) {
// return -1;
// }
// } else {
// streamSchedExec(pTask);
// }
//
// return 0;
//}
// todo record the idle time for dispatch data // todo record the idle time for dispatch data
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册