提交 0e189f70 编写于 作者: H Haojun Liao

refactor

上级 03c26a9d
...@@ -306,71 +306,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -306,71 +306,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
// return 0; // return 0;
//} //}
// todo record the idle time for dispatch data
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available.
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast. todo handle the shuffle dispatch failure
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no-retry", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code));
return code;
} else {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
return doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
}
}
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
// there are other dispatch message not response yet
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
if (leftRsp > 0) {
return 0;
}
}
pTask->msgInfo.retryCount = 0;
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status);
// the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp
// todo blocking the output status
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
int32_t waitDuration = 300; // 300 ms
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration);
streamRetryDispatchStreamBlock(pTask, waitDuration);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el);
pTask->msgInfo.blockingTs = 0;
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline
streamDispatchStreamBlock(pTask);
}
return 0;
}
int32_t streamProcessRunReq(SStreamTask* pTask) { int32_t streamProcessRunReq(SStreamTask* pTask) {
if (streamTryExec(pTask) < 0) { if (streamTryExec(pTask) < 0) {
return -1; return -1;
......
...@@ -718,3 +718,69 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { ...@@ -718,3 +718,69 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
num); num);
return 0; return 0;
} }
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available.
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast.
// todo handle the shuffle dispatch failure
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
}
return TSDB_CODE_SUCCESS;
}
qDebug("s-task:%s recv dispatch rsp, downstream task input status:%d code:%d", pTask->id.idStr, pRsp->inputStatus,
code);
// there are other dispatch message not response yet
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
if (leftRsp > 0) {
return 0;
}
}
pTask->msgInfo.retryCount = 0;
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status);
// the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, pRsp->downstreamTaskId, el);
pTask->msgInfo.blockingTs = 0;
// put data into inputQ of current task is also allowed
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline
streamDispatchStreamBlock(pTask);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册