提交 16d7707b 编写于 作者: H Haojun Liao

fix(stream): align the scan real time data for stream task.

上级 d0da988b
...@@ -336,6 +336,7 @@ struct SStreamTask { ...@@ -336,6 +336,7 @@ struct SStreamTask {
void* launchTaskTimer; void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend SStreamState* pState; // state backend
SArray* pRspMsgList;
// the followings attributes don't be serialized // the followings attributes don't be serialized
int32_t notReadyTasks; int32_t notReadyTasks;
...@@ -457,7 +458,9 @@ typedef struct { ...@@ -457,7 +458,9 @@ typedef struct {
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t upstreamTaskId;
int32_t downstreamTaskId;
int32_t upstreamNodeId;
int32_t childId; int32_t childId;
} SStreamScanHistoryFinishReq, SStreamTransferReq; } SStreamScanHistoryFinishReq, SStreamTransferReq;
...@@ -518,6 +521,17 @@ int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pR ...@@ -518,6 +521,17 @@ int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pR
int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp); int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp);
int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp); int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp);
typedef struct {
int64_t streamId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t downstreamId;
int32_t downstreamNode;
} SStreamCompleteHistoryMsg;
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t downstreamTaskId; int32_t downstreamTaskId;
...@@ -567,6 +581,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus); ...@@ -567,6 +581,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskIsIdle(const SStreamTask* pTask);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
...@@ -607,8 +622,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); ...@@ -607,8 +622,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level // agg level
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask); int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId); int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq *pReq, SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta // stream task meta
void streamMetaInit(); void streamMetaInit();
......
...@@ -79,6 +79,7 @@ SArray *smGetMsgHandles() { ...@@ -79,6 +79,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0; code = 0;
_OVER: _OVER:
......
...@@ -740,6 +740,7 @@ SArray *vmGetMsgHandles() { ...@@ -740,6 +740,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
......
...@@ -274,7 +274,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { ...@@ -274,7 +274,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return 0; return 0;
} }
int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
...@@ -287,12 +287,12 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -287,12 +287,12 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
// do process request // do process request
if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) { if (streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1; return -1;
} }
...@@ -415,7 +415,7 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -415,7 +415,7 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
case TDMT_STREAM_RETRIEVE_RSP: case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRetrieveRsp(pSnode, pMsg); return sndProcessTaskRetrieveRsp(pSnode, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH: case TDMT_STREAM_SCAN_HISTORY_FINISH:
return sndProcessTaskRecoverFinishReq(pSnode, pMsg); return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg); return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK: case TDMT_STREAM_TASK_CHECK:
......
...@@ -250,8 +250,8 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); ...@@ -250,8 +250,8 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version); int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
// sma // sma
......
...@@ -1084,14 +1084,18 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1084,14 +1084,18 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus);
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
// we have to continue retrying to successfully execute the scan history task.
while (1) {
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
TASK_SCHED_STATUS__WAITING); TASK_SCHED_STATUS__WAITING);
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
tqDebug("s-task:%s failed to launch scan history data in current time window, unexpected sched status:%d", id, break;
schedStatus); }
streamMetaReleaseTask(pMeta, pTask); tqError("s-task:%s failed to start scan history in current time window, unexpected sched-status:%d, retry in 100ms",
return 0; id, schedStatus);
taosMsleep(100);
} }
if (!streamTaskRecoverScanStep1Finished(pTask)) { if (!streamTaskRecoverScanStep1Finished(pTask)) {
...@@ -1195,12 +1199,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1195,12 +1199,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) { if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug( tqDebug(
"s-task:%s scan history in current time window completed, no related fill history task, reset the time " "s-task:%s scan history in stream time window completed, no related fill history task, reset the time "
"window:%" PRId64 " - %" PRId64, "window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey); id, pWindow->skey, pWindow->ekey);
} else { } else {
tqDebug( tqDebug(
"s-task:%s scan history in current time window completed, now start to handle data from WAL, start " "s-task:%s scan history in stream time window completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
} }
...@@ -1209,11 +1213,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1209,11 +1213,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
code = streamTaskScanHistoryDataComplete(pTask); code = streamTaskScanHistoryDataComplete(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
// let's start the stream task by extracting data from wal // when all source task complete to scan history data in stream time window, they are allowed to handle stream data
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // at the same time.
tqStartStreamTasks(pTq);
}
return code; return code;
} }
...@@ -1232,17 +1233,17 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1232,17 +1233,17 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.taskId); tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.taskId); tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
return -1; return -1;
} }
int32_t remain = streamAlignTransferState(pTask); int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) { if (remain > 0) {
tqDebug("s-task:%s receive transfer state msg, remain:%d", pTask->id.idStr, remain); tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
return 0; return 0;
} }
...@@ -1257,7 +1258,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1257,7 +1258,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
...@@ -1269,20 +1270,49 @@ int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1269,20 +1270,49 @@ int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("failed to find task:0x%x, it may be destroyed, vgId:%d", req.taskId, pTq->pStreamMeta->vgId); tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
pTq->pStreamMeta->vgId, req.downstreamTaskId);
return -1; return -1;
} }
int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId); tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId);
int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return code; return code;
} }
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
// char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamCompleteHistoryMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeCompleteHistoryDataMsg(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
pTq->pStreamMeta->vgId, req.upstreamTaskId);
return -1;
}
tqDebug("s-task:%s scan-history finish rsp received from task:0x%x", pTask->id.idStr, req.downstreamId);
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
if (remain > 0) {
tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain);
} else {
streamProcessScanHistoryFinishRsp(pTask);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
......
...@@ -664,9 +664,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) ...@@ -664,9 +664,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
case TDMT_STREAM_TRANSFER_STATE: case TDMT_STREAM_TRANSFER_STATE:
return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg); return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH: case TDMT_STREAM_SCAN_HISTORY_FINISH:
return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg); return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in stream queue", pMsg->msgType); vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
......
...@@ -54,6 +54,9 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc ...@@ -54,6 +54,9 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;
......
...@@ -420,3 +420,15 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { ...@@ -420,3 +420,15 @@ void* streamQueueNextItem(SStreamQueue* pQueue) {
} }
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
if (pInfo->taskId == taskId) {
return pInfo;
}
}
return NULL;
}
\ No newline at end of file
...@@ -25,6 +25,12 @@ typedef struct SBlockName { ...@@ -25,6 +25,12 @@ typedef struct SBlockName {
char parTbName[TSDB_TABLE_NAME_LEN]; char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName; } SBlockName;
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->msgType = msgType;
pMsg->pCont = pCont;
pMsg->contLen = contLen;
}
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
...@@ -311,13 +317,12 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc ...@@ -311,13 +317,12 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc
msg.contLen = tlen + sizeof(SMsgHead); msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf; msg.pCont = buf;
msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH; msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
pReq->taskId, vgId); pReq->downstreamTaskId, vgId);
return 0; return 0;
} }
...@@ -620,3 +625,99 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ...@@ -620,3 +625,99 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
// this block can not be deleted until it has been sent to downstream task successfully. // this block can not be deleted until it has been sent to downstream task successfully.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
typedef struct {
SEpSet epset;
int32_t taskId;
SRpcMsg msg;
} SStreamContinueExecInfo;
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
SStreamCompleteHistoryMsg msg = {
.streamId = pReq->streamId,
.upstreamTaskId = pReq->upstreamTaskId,
.upstreamNodeId = pReq->upstreamNodeId,
.downstreamId = pReq->downstreamTaskId,
.downstreamNode = pTask->pMeta->vgId,
};
tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code);
if (code < 0) {
return code;
}
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeCompleteHistoryDataMsg(&encoder, &msg);
tEncoderClear(&encoder);
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet};
initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
info.msg.info = *pRpcInfo;
// todo: fix race condition here
if (pTask->pRspMsgList == NULL) {
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
}
taosArrayPush(pTask->pRspMsgList, &info);
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
qDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
num);
return TSDB_CODE_SUCCESS;
}
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
for (int32_t i = 0; i < num; ++i) {
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
tmsgSendRsp(&pInfo->msg);
qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel,
pInfo->taskId);
}
taosArrayClear(pTask->pRspMsgList);
qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
num);
return 0;
}
...@@ -72,9 +72,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { ...@@ -72,9 +72,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
int32_t code = doLaunchScanHistoryTask(pTask); return doLaunchScanHistoryTask(pTask);
streamTaskEnablePause(pTask);
return code;
} else { } else {
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr, qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
...@@ -83,12 +81,11 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { ...@@ -83,12 +81,11 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
} }
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamSetParamForScanHistory(pTask); streamSetParamForScanHistory(pTask);
streamAggScanHistoryPrepare(pTask); streamTaskScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
streamTaskScanHistoryPrepare(pTask);
} }
streamTaskEnablePause(pTask);
return 0; return 0;
} }
...@@ -143,6 +140,12 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { ...@@ -143,6 +140,12 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
streamTaskSetForReady(pTask, 0); streamTaskSetForReady(pTask, 0);
streamTaskSetRangeStreamCalc(pTask); streamTaskSetRangeStreamCalc(pTask);
streamTaskLaunchScanHistory(pTask); streamTaskLaunchScanHistory(pTask);
// enable pause when init completed.
if (pTask->historyTaskId.taskId == 0) {
streamTaskEnablePause(pTask);
}
launchFillHistoryTask(pTask); launchFillHistoryTask(pTask);
} }
...@@ -195,14 +198,14 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { ...@@ -195,14 +198,14 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
streamTaskSetRangeStreamCalc(pTask); streamTaskSetRangeStreamCalc(pTask);
if (status == TASK_STATUS__SCAN_HISTORY) { if (status == TASK_STATUS__SCAN_HISTORY) {
qDebug("s-task:%s enter into scan-history-data stage, status:%s", id, str); qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
streamTaskLaunchScanHistory(pTask); streamTaskLaunchScanHistory(pTask);
} else { } else {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
} }
// enable pause when init completed. // enable pause when init completed.
if (pTask->historyTaskId.taskId == 0 && pTask->info.fillHistory == 0) { if (pTask->historyTaskId.taskId == 0) {
streamTaskEnablePause(pTask); streamTaskEnablePause(pTask);
} }
...@@ -296,7 +299,7 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) { ...@@ -296,7 +299,7 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
} }
int32_t streamRestoreParam(SStreamTask* pTask) { int32_t streamRestoreParam(SStreamTask* pTask) {
qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr); qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
return qRestoreStreamOperatorOption(pTask->exec.pExecutor); return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
} }
...@@ -334,23 +337,33 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) { ...@@ -334,23 +337,33 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
} }
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; SStreamScanHistoryFinishReq req = {
.streamId = pTask->id.streamId,
.childId = pTask->info.selfChildId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->pMeta->vgId,
};
// serialize // serialize
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.taskId = pTask->fixedEpDispatcher.taskId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->notReadyTasks = 1;
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo); int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr, qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus)); numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
for (int32_t i = 0; i < numOfVgs; i++) { for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.taskId = pVgInfo->taskId; req.downstreamTaskId = pVgInfo->taskId;
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
} else {
qDebug("s-task:%s no downstream tasks, invoke history finish rsp directly", pTask->id.idStr);
streamProcessScanHistoryFinishRsp(pTask);
} }
return 0; return 0;
...@@ -394,7 +407,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe ...@@ -394,7 +407,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->taskId, vgId); pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
return 0; return 0;
} }
...@@ -404,7 +417,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { ...@@ -404,7 +417,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
// serialize // serialize
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.taskId = pTask->fixedEpDispatcher.taskId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
...@@ -412,7 +425,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { ...@@ -412,7 +425,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
int32_t numOfVgs = taosArrayGetSize(vgInfo); int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) { for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.taskId = pVgInfo->taskId; req.downstreamTaskId = pVgInfo->taskId;
doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
} }
...@@ -421,10 +434,11 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { ...@@ -421,10 +434,11 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
} }
// agg // agg
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) { int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s agg task wait for %d upstream tasks complete scan-history procedure, status:%s", pTask->id.idStr, qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->numOfWaitingUpstream, streamGetTaskStatusStr(pTask->status.taskStatus)); pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
streamGetTaskStatusStr(pTask->status.taskStatus));
return 0; return 0;
} }
...@@ -440,25 +454,54 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { ...@@ -440,25 +454,54 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
return 0; return 0;
} }
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) { int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
if (pTask->info.taskLevel == TASK_LEVEL__AGG) { SRpcHandleInfo* pRpcInfo) {
int32_t taskLevel = pTask->info.taskLevel;
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
// sink node do not send end of scan history msg to its upstream, which is agg task.
streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data", qDebug(
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
"rsp to all upstream tasks",
pTask->id.idStr, numOfTasks); pTask->id.idStr, numOfTasks);
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamAggUpstreamScanHistoryFinish(pTask); streamAggUpstreamScanHistoryFinish(pTask);
}
streamNotifyUpstreamContinue(pTask);
} else { } else {
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
pTask->id.idStr, taskId, childId, left); pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
} }
return 0;
}
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ
streamSetStatusNormal(pTask);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamSchedExec(pTask);
} }
return 0; return TSDB_CODE_SUCCESS;
} }
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
...@@ -579,7 +622,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { ...@@ -579,7 +622,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
} }
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
return 0; return 0;
} }
...@@ -596,16 +638,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { ...@@ -596,16 +638,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
return -1; return -1;
} }
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
// ready to process data from inputQ
streamSetStatusNormal(pTask);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }
...@@ -702,15 +734,20 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) ...@@ -702,15 +734,20 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) { int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) { int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册