提交 8c791578 编写于 作者: H Haojun Liao

fix(stream): close the inputQ if checkpoint msg received.

上级 2b85ea62
...@@ -143,7 +143,26 @@ int32_t streamSchedExec(SStreamTask* pTask) { ...@@ -143,7 +143,26 @@ int32_t streamSchedExec(SStreamTask* pTask) {
return 0; return 0;
} }
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
*pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (*pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
pDispatchRsp->inputStatus = status;
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
return TSDB_CODE_SUCCESS;
}
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
int8_t status = 0; int8_t status = 0;
SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
...@@ -158,23 +177,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp ...@@ -158,23 +177,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
} }
// rsp by input status return status;
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead));
pDispatchRsp->inputStatus = status;
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
return (status == TASK_INPUT_STATUS__NORMAL) ? 0 : -1;
} }
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
...@@ -239,22 +242,36 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -239,22 +242,36 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked int32_t status = 0;
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
}
streamTaskAppendInputBlocks(pTask, pReq, pRsp); SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
tDeleteStreamDispatchReq(pReq); if (!pInfo->dataAllowed) {
qWarn("s-task:%s data from task:0x%x is denied", pTask->id.idStr, pReq->upstreamTaskId);
status = TASK_INPUT_STATUS__BLOCKED;
} else {
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId);
}
if (exec) { status = streamTaskAppendInputBlocks(pTask, pReq);
if (streamTryExec(pTask) < 0) { }
return -1;
{
// do send response with the input status
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
if (code != TSDB_CODE_SUCCESS) {
// todo handle failure
return code;
} }
} else {
streamSchedExec(pTask); pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
} }
tDeleteStreamDispatchReq(pReq);
streamSchedExec(pTask);
return 0; return 0;
} }
...@@ -262,10 +279,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { ...@@ -262,10 +279,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
if (streamTryExec(pTask) < 0) { if (streamTryExec(pTask) < 0) {
return -1; return -1;
} }
/*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
/*streamDispatchStreamBlock(pTask);*/
/*}*/
return 0; return 0;
} }
...@@ -380,7 +393,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { ...@@ -380,7 +393,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
} }
} }
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < num; ++i) { for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
......
...@@ -259,7 +259,9 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { ...@@ -259,7 +259,9 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) {
int8_t prev = p->status.taskStatus; int8_t prev = p->status.taskStatus;
p->status.taskStatus = TASK_STATUS__NORMAL; p->status.taskStatus = TASK_STATUS__NORMAL;
// save the task
streamMetaSaveTask(pMeta, p); streamMetaSaveTask(pMeta, p);
streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks
qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64 qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64
" currentVer:%" PRId64 ", status to be normal, prev:%s", " currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer, pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册