未验证 提交 01ac99a6 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22413 from taosdata/fix/liaohj

refactor: refactor the transfer state procedure.
......@@ -152,6 +152,8 @@ enum {
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__CHECKPOINT_TRIGGER,
STREAM_INPUT__TRANS_STATE,
STREAM_INPUT__REF_DATA_BLOCK,
STREAM_INPUT__DESTROY,
};
......@@ -168,7 +170,9 @@ typedef enum EStreamType {
STREAM_PULL_DATA,
STREAM_PULL_OVER,
STREAM_FILL_OVER,
STREAM_CHECKPOINT,
STREAM_CREATE_CHILD_TABLE,
STREAM_TRANS_STATE,
} EStreamType;
#pragma pack(push, 1)
......
......@@ -254,7 +254,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
......
......@@ -122,6 +122,7 @@ typedef struct {
int8_t type;
int32_t srcVgId;
int32_t srcTaskId;
int32_t childId;
int64_t sourceVer;
int64_t reqId;
......@@ -251,6 +252,7 @@ typedef struct SStreamChildEpInfo {
int32_t nodeId;
int32_t childId;
int32_t taskId;
int8_t dataAllowed;
SEpSet epSet;
} SStreamChildEpInfo;
......@@ -272,6 +274,7 @@ typedef struct SStreamStatus {
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus;
......@@ -399,8 +402,9 @@ typedef struct {
typedef struct {
int64_t streamId;
int32_t type;
int32_t taskId;
int32_t dataSrcVgId;
int32_t srcVgId;
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
......@@ -570,8 +574,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
int64_t dstTaskId);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
......@@ -579,6 +581,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
......@@ -626,7 +630,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
int32_t appendTranstateIntoInputQ(SStreamTask* pTask);
// agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
......
......@@ -742,7 +742,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
......
......@@ -77,6 +77,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->chkInfo.version = ver;
pTask->pMeta = pSnode->pMeta;
streamTaskOpenAllUpstreamInput(pTask);
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
return -1;
......
......@@ -250,7 +250,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
......
......@@ -926,6 +926,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta;
streamTaskOpenAllUpstreamInput(pTask);
// backup the initial status, and set it to be TASK_STATUS__INIT
pTask->chkInfo.version = ver;
pTask->chkInfo.currentVer = ver;
......@@ -1270,7 +1272,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs();
streamTaskEndScanWAL(pTask);
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
appendTranstateIntoInputQ(pTask);
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
......@@ -1335,44 +1338,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
// notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SStreamTransferReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
return -1;
}
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// transfer the ownership of executor state
tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
pTask->status.transferState = true;
streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
......@@ -1704,6 +1669,8 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
STQ* pTq = pVnode->pTq;
int32_t vgId = pVnode->config.vgId;
SMsgHead* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
......@@ -1720,7 +1687,9 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
tDecoderClear(&decoder);
int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
tqDebug("vgId:%d receive dispatch msg to s-task:0x%"PRIx64"-0x%x", vgId, req.streamId, taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
if (pTask != NULL) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp, false);
......@@ -1737,7 +1706,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
FAIL:
if (pMsg->info.handle == NULL) {
tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId);
tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", vgId, taskId);
return -1;
}
......
......@@ -210,13 +210,22 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
}
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, set the transfer state flag",
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
pTask->status.transferState = true;
const char* id = pTask->id.idStr;
/*int32_t code = */streamSchedExec(pTask);
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
if (!pTask->status.appendTranstateBlock) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, add transfer-state block into inputQ",
id, ver, pTask->dataRange.range.maxVer);
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
appendTranstateIntoInputQ(pTask);
/*int32_t code = */streamSchedExec(pTask);
} else {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
id, ver, pTask->dataRange.range.maxVer);
}
}
}
......@@ -262,7 +271,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
ASSERT(status == TASK_STATUS__NORMAL);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
......
......@@ -661,8 +661,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY:
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_STREAM_TRANSFER_STATE:
return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH:
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
......
......@@ -52,7 +52,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
......@@ -63,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
......
......@@ -142,40 +142,6 @@ int32_t streamSchedExec(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
int8_t status = 0;
SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId);
if (pBlock == NULL) {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
pTask->id.idStr);
} else {
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
// input queue is full, upstream is blocked now
status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED;
}
// rsp by input status
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead));
pDispatchRsp->inputStatus = status;
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
int8_t status = TASK_INPUT_STATUS__NORMAL;
......@@ -235,90 +201,115 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
return 0;
}
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
// todo add the input queue buffer limitation
streamTaskEnqueueBlocks(pTask, pReq, pRsp);
tDeleteStreamDispatchReq(pReq);
if (exec) {
if (streamTryExec(pTask) < 0) {
return -1;
}
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
int8_t status = 0;
SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
if (pBlock == NULL) {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
pTask->id.idStr);
} else {
streamSchedExec(pTask);
if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
pTask->status.appendTranstateBlock = true;
}
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
// input queue is full, upstream is blocked now
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
}
return 0;
return status;
}
// todo record the idle time for dispatch data
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available.
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast. todo handle the shuffle dispatch failure
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no-retry", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code));
return code;
} else {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
return streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
}
static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
*pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (*pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
// there are other dispatch message not response yet
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
if (leftRsp > 0) {
return 0;
}
pDispatchRsp->inputStatus = status;
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
return TSDB_CODE_SUCCESS;
}
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = false;
}
}
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
int32_t status = 0;
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL);
pTask->msgInfo.retryCount = 0;
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status);
// the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp
// todo blocking the output status
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
int32_t waitDuration = 300; // 300 ms
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration);
streamRetryDispatchStreamBlock(pTask, waitDuration);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el);
pTask->msgInfo.blockingTs = 0;
if (!pInfo->dataAllowed) {
qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId);
status = TASK_INPUT_STATUS__BLOCKED;
} else {
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId);
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
status = streamTaskAppendInputBlocks(pTask, pReq);
}
// otherwise, continue dispatch the first block to down stream task in pipeline
streamDispatchStreamBlock(pTask);
{
// do send response with the input status
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
if (code != TSDB_CODE_SUCCESS) {
// todo handle failure
return code;
}
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
}
tDeleteStreamDispatchReq(pReq);
streamSchedExec(pTask);
return 0;
}
//int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
// qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
// pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
//
// // todo add the input queue buffer limitation
// streamTaskEnqueueBlocks(pTask, pReq, pRsp);
// tDeleteStreamDispatchReq(pReq);
//
// if (exec) {
// if (streamTryExec(pTask) < 0) {
// return -1;
// }
// } else {
// streamSchedExec(pTask);
// }
//
// return 0;
//}
int32_t streamProcessRunReq(SStreamTask* pTask) {
if (streamTryExec(pTask) < 0) {
return -1;
......@@ -385,12 +376,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
destroyStreamDataBlock((SStreamDataBlock*) pItem);
return code;
}
} else if (type == STREAM_INPUT__CHECKPOINT) {
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
} else if (type == STREAM_INPUT__GET_RES) {
// use the default memory limit, refactor later.
taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} else {
ASSERT(0);
}
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
......@@ -433,4 +427,16 @@ SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
}
return NULL;
}
\ No newline at end of file
}
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
if (num == 0) {
return;
}
for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
pInfo->dataAllowed = true;
}
}
......@@ -204,7 +204,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
if (type == STREAM_INPUT__GET_RES) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
......
......@@ -25,6 +25,9 @@ typedef struct SBlockName {
char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->msgType = msgType;
pMsg->pCont = pCont;
......@@ -35,8 +38,9 @@ static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatc
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
......@@ -88,8 +92,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
......@@ -113,14 +118,15 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
}
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
int64_t dstTaskId) {
int64_t dstTaskId, int32_t type) {
pReq->streamId = pTask->id.streamId;
pReq->dataSrcVgId = vgId;
pReq->srcVgId = vgId;
pReq->upstreamTaskId = pTask->id.taskId;
pReq->upstreamChildId = pTask->info.selfChildId;
pReq->upstreamNodeId = pTask->info.nodeId;
pReq->blockNum = numOfBlocks;
pReq->taskId = dstTaskId;
pReq->type = type;
pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
......@@ -436,9 +442,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
return 0;
}
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0);
......@@ -446,15 +451,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
SStreamDispatchReq req = {0};
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId);
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
......@@ -487,7 +492,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
for (int32_t i = 0; i < vgSz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL_SHUFFLE_DISPATCH;
}
......@@ -497,8 +502,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
// TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
......@@ -518,14 +522,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
}
}
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId,
numOfBlocks, vgSz);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, vgSz);
for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId,
pReqs[i].blockNum, pVgInfo->vgId);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
if (code < 0) {
......@@ -536,7 +540,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
code = 0;
FAIL_SHUFFLE_DISPATCH:
FAIL_SHUFFLE_DISPATCH:
for (int32_t i = 0; i < vgSz; i++) {
taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroy(pReqs[i].dataLen);
......@@ -552,7 +556,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) {
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
......@@ -593,12 +597,13 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
pTask->msgInfo.pData = pBlock;
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
pBlock->type == STREAM_INPUT__TRANS_STATE);
int32_t retryCount = 0;
while (1) {
int32_t code = streamDispatchAllBlocks(pTask, pBlock);
int32_t code = doDispatchAllBlocks(pTask, pBlock);
if (code == TSDB_CODE_SUCCESS) {
break;
}
......@@ -715,3 +720,84 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
num);
return 0;
}
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
const char* id = pTask->id.idStr;
if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available.
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast.
// todo handle the shuffle dispatch failure
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
}
return TSDB_CODE_SUCCESS;
}
qDebug("s-task:%s recv dispatch rsp from 0x%x, downstream task input status:%d code:%d", id, pRsp->downstreamTaskId,
pRsp->inputStatus, code);
// there are other dispatch message not response yet
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("s-task:%s is shuffle, left waiting rsp %d", id, leftRsp);
if (leftRsp > 0) {
return 0;
}
}
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
SStreamDataBlock* p = pTask->msgInfo.pData;
if (p->type == STREAM_INPUT__TRANS_STATE) {
qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
return code;
}
}
pTask->msgInfo.retryCount = 0;
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status);
// the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", id,
pRsp->downstreamTaskId, el);
pTask->msgInfo.blockingTs = 0;
// put data into inputQ of current task is also allowed
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline
streamDispatchStreamBlock(pTask);
}
return 0;
}
......@@ -287,7 +287,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
}
}
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
......@@ -301,7 +301,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr);
}
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true);
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
......@@ -334,6 +334,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
}
// todo check the output queue for fill-history task, and wait for it complete
// 1. expand the query time window for stream task of WAL scanner
pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
......@@ -380,17 +383,18 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
if (!pTask->status.transferState) {
return code;
}
ASSERT(pTask->status.appendTranstateBlock == 1);
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask);
streamTaskEndScanWAL(pTask);
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
......@@ -400,14 +404,41 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return code;
}
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
const char* id) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
const char* id = pTask->id.idStr;
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one
while (1) {
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue;
}
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS;
}
qDebug("s-task:%s sink task handle result block one-by-one", id);
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
}
}
// non sink task
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
......@@ -415,49 +446,104 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue;
}
qDebug("===stream===break batchSize:%d", *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
// do not merge blocks for sink node
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
*numOfBlocks = 1;
*pInput = qItem;
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS;
}
if (*pInput == NULL) {
ASSERT((*numOfBlocks) == 0);
*pInput = qItem;
// do not merge blocks for sink node and check point data block
if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
qItem->type == STREAM_INPUT__TRANS_STATE) {
if (*pInput == NULL) {
qDebug("s-task:%s checkpoint/transtate msg extracted, start to process immediately", id);
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
} else {
// previous existed blocks needs to be handle, before handle the checkpoint msg block
qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id,
*numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
if (terrno == 0) {
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
} else {
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
tstrerror(terrno));
if (*pInput == NULL) {
ASSERT((*numOfBlocks) == 0);
*pInput = qItem;
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}
streamQueueProcessFail(pTask->inputQueue);
*pInput = newRet;
}
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputQueue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
return TSDB_CODE_SUCCESS;
}
}
}
}
int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
*pInput = newRet;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
streamFreeQitem((SStreamQueueItem*)pBlock);
qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain);
return 0;
}
}
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputQueue);
// dispatch the tran-state block to downstream task immediately
int32_t type = pTask->outputInfo.type;
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
return TSDB_CODE_SUCCESS;
// transfer the ownership of executor state
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (level == TASK_LEVEL__SOURCE) {
qDebug("s-task:%s add transfer-state block into outputQ", id);
} else {
qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
}
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
pBlock->srcVgId = pTask->pMeta->vgId;
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
if (code == 0) {
streamDispatchStreamBlock(pTask);
} else {
streamFreeQitem((SStreamQueueItem*)pBlock);
}
}
} else { // non-dispatch task, do task state transfer directly
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
streamFreeQitem((SStreamQueueItem*)pBlock);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
}
}
return code;
}
/**
......@@ -478,12 +564,17 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
/*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize);
if (pInput == NULL) {
ASSERT(batchSize == 0);
break;
}
if (pInput->type == STREAM_INPUT__TRANS_STATE) {
streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput);
return 0;
}
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
......@@ -557,18 +648,7 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
// 1. notify all downstream tasks to transfer executor state after handle all history blocks.
int32_t code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
// 2. do transfer stream task operator states.
pTask->status.transferState = true;
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle error
return code;
}
appendTranstateIntoInputQ(pTask);
return TSDB_CODE_SUCCESS;
}
......@@ -587,35 +667,36 @@ int32_t streamTryExec(SStreamTask* pTask) {
}
// todo the task should be commit here
if (taosQueueEmpty(pTask->inputQueue->queue)) {
// if (taosQueueEmpty(pTask->inputQueue->queue)) {
// fill-history WAL scan has completed
if (pTask->status.transferState) {
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
return code;
}
// if (pTask->status.transferState) {
// code = streamTransferStateToStreamTask(pTask);
// if (code != TSDB_CODE_SUCCESS) {
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// return code;
// }
// the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
// call this function (streamExecForAll) directly.
code = streamExecForAll(pTask);
if (code < 0) {
// do nothing
}
}
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
} else {
// code = streamExecForAll(pTask);
// if (code < 0) {
// do nothing
// }
// }
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
// } else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus);
if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}
}
// }
} else {
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
......
......@@ -372,68 +372,40 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
return 0;
}
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
return code;
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
tEncoderClear(&encoder);
pTranstate->type = STREAM_INPUT__TRANS_STATE;
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = TDMT_STREAM_TRANSFER_STATE;
msg.info.noResp = 1;
pBlock->info.type = STREAM_TRANS_STATE;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
tmsgSendReq(pEpSet, &msg);
qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
return 0;
}
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
taosArrayPush(pTranstate->blocks, pBlock);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
taosMemoryFree(pBlock);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTranstate) < 0) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
// serialize
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
pTask->status.appendTranstateBlock = true;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamTaskId = pVgInfo->taskId;
doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
}
qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
streamSchedExec(pTask);
return 0;
return TSDB_CODE_SUCCESS;
}
// agg
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册