提交 d392bb60 编写于 作者: H Haojun Liao

refactor(stream): set the initial version from the checkpoint.

上级 a049e310
...@@ -41,23 +41,22 @@ typedef struct { ...@@ -41,23 +41,22 @@ typedef struct {
} SLocalFetch; } SLocalFetch;
typedef struct { typedef struct {
void* tqReader; void* tqReader;
void* config; void* config;
void* vnode; void* vnode;
void* mnd; void* mnd;
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
int64_t version; int64_t version;
bool initMetaReader; bool initMetaReader;
bool initTableReader; bool initTableReader;
bool initTqReader; bool initTqReader;
int32_t numOfVgroups; int32_t numOfVgroups;
void* sContext; // SSnapContext* void* sContext; // SSnapContext*
void* pStateBackend;
void* pStateBackend; int8_t fillHistory;
struct SStorageAPI api; STimeWindow winRange;
int8_t fillHistory; struct SStorageAPI api;
STimeWindow winRange;
} SReadHandle; } SReadHandle;
// in queue mode, data streams are seperated by msg // in queue mode, data streams are seperated by msg
......
...@@ -757,14 +757,22 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -757,14 +757,22 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta; pTask->pMeta = pTq->pStreamMeta;
pTask->chkInfo.currentVer = ver; // checkpoint exists, restore from the last checkpoint
if (pTask->chkInfo.keptCheckpointId != 0) {
pTask->dataRange.range.maxVer = ver; ASSERT(pTask->chkInfo.version > 0);
pTask->dataRange.range.minVer = ver; pTask->chkInfo.currentVer = pTask->chkInfo.version;
pTask->dataRange.range.maxVer = pTask->chkInfo.version;
pTask->dataRange.range.minVer = pTask->chkInfo.version;
pTask->chkInfo.currentVer = pTask->chkInfo.version;
} else {
pTask->chkInfo.currentVer = ver;
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask* pSateTask = pTask; SStreamTask* pSateTask = pTask;
SStreamTask task = {0}; SStreamTask task = {0};
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
task.id = pTask->streamTaskId; task.id = pTask->streamTaskId;
task.pMeta = pTask->pMeta; task.pMeta = pTask->pMeta;
...@@ -777,12 +785,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -777,12 +785,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
SReadHandle handle = { SReadHandle handle = {
.version = pTask->chkInfo.currentVer,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.initTqReader = 1, .initTqReader = 1,
.pStateBackend = pTask->pState, .pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory, .fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window, .winRange = pTask->dataRange.window,
}; };
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
...@@ -793,12 +803,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -793,12 +803,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
SStreamTask* pSateTask = pTask; SStreamTask* pSateTask = pTask;
SStreamTask task = {0}; SStreamTask task = {0};
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
task.id = pTask->streamTaskId; task.id = pTask->streamTaskId;
task.pMeta = pTask->pMeta; task.pMeta = pTask->pMeta;
pSateTask = &task; pSateTask = &task;
} }
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
return -1; return -1;
...@@ -806,6 +817,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -806,6 +817,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList); int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
SReadHandle handle = { SReadHandle handle = {
.version = pTask->chkInfo.currentVer,
.vnode = NULL, .vnode = NULL,
.numOfVgroups = numOfVgroups, .numOfVgroups = numOfVgroups,
.pStateBackend = pTask->pState, .pStateBackend = pTask->pState,
...@@ -844,6 +856,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -844,6 +856,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
if (pTask->tbSink.pTSchema == NULL) { if (pTask->tbSink.pTSchema == NULL) {
return -1; return -1;
} }
pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr); tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
} }
...@@ -861,6 +874,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -861,6 +874,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
vgId, pTask->id.idStr, pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer, vgId, pTask->id.idStr, pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer,
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
if (pTask->chkInfo.keptCheckpointId != 0) {
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer);
}
return 0; return 0;
} }
...@@ -1283,14 +1301,17 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -1283,14 +1301,17 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask) { if (pTask != NULL) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamProcessDispatchMsg(pTask, &req, &rsp, exec);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} else { } else {
tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
pTq->pStreamMeta->vgId, req.taskId);
tDeleteStreamDispatchReq(&req); tDeleteStreamDispatchReq(&req);
return -1; return -1;
} }
...@@ -1565,27 +1586,25 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1565,27 +1586,25 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) {
if (tDecodeStreamCheckpointReq(&decoder, &req) < 0) { if (tDecodeStreamCheckpointReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR; code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder); tDecoderClear(&decoder);
goto FAIL; return code;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId); tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId);
goto FAIL; return TSDB_CODE_SUCCESS;
} }
code = streamAddCheckpointRspMsg(&req, &pMsg->info, pTask); code = streamAddCheckpointRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto FAIL; streamMetaReleaseTask(pMeta, pTask);
return code;
} }
streamProcessCheckpointReq(pTask, &req); streamProcessCheckpointReq(pTask, &req);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return code;
FAIL:
return code;
} }
// downstream task has complete the stream task checkpoint procedure // downstream task has complete the stream task checkpoint procedure
...@@ -1605,14 +1624,14 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1605,14 +1624,14 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) {
if (tDecodeStreamCheckpointRsp(&decoder, &req) < 0) { if (tDecodeStreamCheckpointRsp(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR; code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder); tDecoderClear(&decoder);
goto FAIL; return code;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId); tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId);
goto FAIL; return code;
} }
tqDebug("vgId:%d s-task:%s received the checkpoint rsp, handle it", vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s received the checkpoint rsp, handle it", vgId, pTask->id.idStr);
...@@ -1620,7 +1639,4 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1620,7 +1639,4 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) {
streamProcessCheckpointRsp(pMeta, pTask); streamProcessCheckpointRsp(pMeta, pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return code;
FAIL:
return code;
} }
...@@ -48,12 +48,12 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -48,12 +48,12 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id); int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
......
...@@ -174,7 +174,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR ...@@ -174,7 +174,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; return (status == TASK_INPUT_STATUS__NORMAL) ? 0 : -1;
} }
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
...@@ -239,7 +239,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -239,7 +239,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
// todo add the input queue buffer limitation // if current task has received the checkpoint req from the upstream t#1, the msg from t#1 should all blocked
streamTaskEnqueueBlocks(pTask, pReq, pRsp); streamTaskEnqueueBlocks(pTask, pReq, pRsp);
tDeleteStreamDispatchReq(pReq); tDeleteStreamDispatchReq(pReq);
...@@ -254,69 +255,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -254,69 +255,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
return 0; return 0;
} }
// todo record the idle time for dispatch data
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available.
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast. todo handle the shuffle dispatch failure
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
}
return TSDB_CODE_SUCCESS;
}
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
// there are other dispatch message not response yet
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
if (leftRsp > 0) {
return 0;
}
}
pTask->msgInfo.retryCount = 0;
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus);
// the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp
// todo blocking the output status
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
int32_t waitDuration = 300; // 300 ms
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration);
streamRetryDispatchStreamBlock(pTask, waitDuration);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%" PRId64 "ms", pTask->id.idStr, el);
pTask->msgInfo.blockingTs = 0;
}
// now ready for next data output
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline
streamDispatchStreamBlock(pTask);
}
return 0;
}
int32_t streamProcessRunReq(SStreamTask* pTask) { int32_t streamProcessRunReq(SStreamTask* pTask) {
if (streamTryExec(pTask) < 0) { if (streamTryExec(pTask) < 0) {
return -1; return -1;
......
...@@ -219,7 +219,7 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe ...@@ -219,7 +219,7 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
// anymore // anymore
ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0); ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0);
// there are still some upstream tasks not send checkpoint request // there are still some upstream tasks not send checkpoint request, do nothing and wait for then
int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId); int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId);
if (notReady > 0) { if (notReady > 0) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
...@@ -230,12 +230,13 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe ...@@ -230,12 +230,13 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
qDebug("s-task:%s received checkpoint req, all upstream sent checkpoint msg, dispatch checkpoint msg to downstream", qDebug("s-task:%s received checkpoint req, all upstream sent checkpoint msg, dispatch checkpoint msg to downstream",
pTask->id.idStr); pTask->id.idStr);
pTask->checkpointNotReadyTasks = (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH)
? 1 // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this node
: taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); // can start local checkpoint procedure
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
// 2. dispatch check point msg to all downstream tasks // dispatch check point msg to all downstream tasks
streamTaskDispatchCheckpointMsg(pTask, checkpointId); streamTaskDispatchCheckpointMsg(pTask, checkpointId);
} }
...@@ -257,7 +258,8 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -257,7 +258,8 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) {
appendCheckpointIntoInputQ(pTask); appendCheckpointIntoInputQ(pTask);
streamSchedExec(pTask); streamSchedExec(pTask);
} else { } else {
qDebug("s-task:%s %d downstream tasks are not ready, wait", pTask->id.idStr, notReady); int32_t total = streamTaskGetNumOfDownstream(pTask);
qDebug("s-task:%s %d/%d downstream tasks are not ready, wait", pTask->id.idStr, notReady, total);
} }
return 0; return 0;
......
...@@ -238,7 +238,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR ...@@ -238,7 +238,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0; return 0;
} }
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0; int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks); int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
...@@ -807,3 +807,66 @@ int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pR ...@@ -807,3 +807,66 @@ int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo record the idle time for dispatch data
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available.
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast. todo handle the shuffle dispatch failure
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
}
return TSDB_CODE_SUCCESS;
}
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
// there are other dispatch message not response yet
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
if (leftRsp > 0) {
return 0;
}
}
pTask->msgInfo.retryCount = 0;
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus);
// the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp
// todo blocking the output status
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
int32_t waitDuration = 300; // 300 ms
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration);
streamRetryDispatchStreamBlock(pTask, waitDuration);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%" PRId64 "ms", pTask->id.idStr, el);
pTask->msgInfo.blockingTs = 0;
}
// now ready for next data output
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline
streamDispatchStreamBlock(pTask);
}
return 0;
}
\ No newline at end of file
...@@ -254,3 +254,18 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -254,3 +254,18 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
return 0;
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
return 1;
} else {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
return 1;
} else {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
return taosArrayGetSize(vgInfo);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册