diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 4e24ff5b377f93d0f0351925e09b082d45c4558d..e98453f57179bc5ac6771bce5a8dbdfc4b40f9a0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -253,7 +253,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 09583572edf1b895b9314152f23b324db855e863..62a4dcc50d3971d4f0342fe44d49a345858c48c2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -568,12 +568,15 @@ bool streamTaskIsIdle(const SStreamTask* pTask); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); + // recover and fill history +void streamPrepareNdoCheckDownstream(SStreamTask* pTask); int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); -int32_t streamTaskStartHistoryTask(SStreamTask* pTask); +int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index c098d546b693b88eee078dcdc6a67a6a606663d0..b2fb7243ff28a116dbe6bf8b97a57e16aee5207b 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -76,6 +76,9 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7267bd5ebf3803056c38b190c45098fc439c1b49..86681903c0baaa102824989aaca96b102ed3b75c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -730,7 +730,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 58532cf94c755321ace358854daef20b2c722825..f347b4d036a243e8ba5ecd278cb778cfba105253 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -177,6 +177,7 @@ int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan->execNode.nodeId = SNODE_HANDLE; plan->execNode.epSet = pTask->info.epSet; + mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE); if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -240,7 +241,7 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t fillHistory, - bool hasExtraSink) { + bool hasExtraSink, int64_t firstWindowSkey) { SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList); if (pTask == NULL) { return terrno; @@ -249,6 +250,7 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas // todo set the correct ts, which should be last key of queried table. pTask->dataRange.window.skey = INT64_MIN; pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); +// pTask->dataRange.window.ekey = firstWindowSkey - 1;//taosGetTimestampMs(); mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); @@ -330,7 +332,7 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { } static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream, - bool hasExtraSink) { + bool hasExtraSink, int64_t lastTs) { // create exec stream task, since only one level, the exec task is also the source task SArray* pTaskList = addNewTaskList(pStream->tasks); @@ -368,7 +370,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* // new stream task SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, - 0, hasExtraSink); + 0, hasExtraSink, lastTs); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return -1; @@ -377,7 +379,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* if (pStream->conf.fillHistory) { SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, - pStream->conf.fillHistory, hasExtraSink); + pStream->conf.fillHistory, hasExtraSink, lastTs); setHTasksId(pTaskList, pHTaskList); } @@ -402,7 +404,8 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui pTask->dataRange.window.skey = INT64_MIN; pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); - mDebug("s-task:0x%x set time window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + mDebug("s-task:0x%x level:%d set time window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->info.taskLevel, + pTask->dataRange.window.skey, pTask->dataRange.window.ekey); // all the source tasks dispatch result to a single agg node. setFixedDownstreamEpInfo(pTask, pDownstreamTask); @@ -570,7 +573,7 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr return TSDB_CODE_SUCCESS; } -static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) { +static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t lastTs) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -624,7 +627,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* // source level return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask); } else if (numOfPlanLevel == 1) { - return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink); + return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink, lastTs); } return 0; @@ -637,7 +640,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - int32_t code = doScheduleStream(pStream, pMnode, pPlan); + int32_t code = doScheduleStream(pStream, pMnode, pPlan, 0); qDestroyQueryPlan(pPlan); return code; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 42bb606c4e194f9eee31541df70a0bab8c41e5b6..ad57bdbdc63333e6580433d84cb01e65e8c8ce59 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -52,10 +52,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { FAIL: if (pMsg->info.handle == NULL) return; - SRpcMsg rsp = { - .code = code, - .info = pMsg->info, - }; + SRpcMsg rsp = { .code = code, .info = pMsg->info}; tmsgSendRsp(&rsp); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -65,10 +62,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamEpInfoList) != 0); pTask->refCnt = 1; - pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; + pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); - pTask->inputQueue = streamQueueOpen(0); - pTask->outputQueue = streamQueueOpen(0); + pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; + pTask->inputQueue = streamQueueOpen(512 << 10); + pTask->outputQueue = streamQueueOpen(512 << 10); if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { return -1; @@ -93,6 +91,10 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ASSERT(pTask->exec.pExecutor); streamSetupTrigger(pTask); + + qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, + pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel); + return 0; } @@ -149,6 +151,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { taosMemoryFree(pTask); return -1; } + tDecoderClear(&decoder); ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG); @@ -161,19 +164,20 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { return -1; } + int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); taosWUnLockLatch(&pSnode->pMeta->lock); - // 3.go through recover steps to fill history - if (pTask->info.fillHistory) { - streamSetParamForScanHistoryData(pTask); - streamAggRecoverPrepare(pTask); - } + streamPrepareNdoCheckDownstream(pTask); + qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); return 0; } int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; + qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); + streamMetaRemoveTask(pSnode->pMeta, pReq->taskId); return 0; } @@ -255,13 +259,15 @@ int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) { } int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { - void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); switch (pMsg->msgType) { - case TDMT_STREAM_TASK_DEPLOY: + case TDMT_STREAM_TASK_DEPLOY: { + void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); return sndProcessTaskDeployReq(pSnode, pReq, len); + } + case TDMT_STREAM_TASK_DROP: - return sndProcessTaskDropReq(pSnode, pReq, len); + return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen); default: ASSERT(0); } @@ -300,6 +306,102 @@ int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) { return 0; } +int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { + char *msgStr = pMsg->pCont; + char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + SStreamTaskCheckReq req; + SDecoder decoder; + + tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); + tDecodeStreamTaskCheckReq(&decoder, &req); + tDecoderClear(&decoder); + + int32_t taskId = req.downstreamTaskId; + + SStreamTaskCheckRsp rsp = { + .reqId = req.reqId, + .streamId = req.streamId, + .childId = req.childId, + .downstreamNodeId = req.downstreamNodeId, + .downstreamTaskId = req.downstreamTaskId, + .upstreamNodeId = req.upstreamNodeId, + .upstreamTaskId = req.upstreamTaskId, + }; + + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + + if (pTask != NULL) { + rsp.status = streamTaskCheckStatus(pTask); + streamMetaReleaseTask(pSnode->pMeta, pTask); + + qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", + pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, + streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); + } else { + rsp.status = 0; + qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp status %d", + taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + } + + SEncoder encoder; + int32_t code; + int32_t len; + + tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code); + if (code < 0) { + qError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId); + return -1; + } + + void *buf = rpcMallocCont(sizeof(SMsgHead) + len); + ((SMsgHead *)buf)->vgId = htonl(req.upstreamNodeId); + + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncoderInit(&encoder, (uint8_t *)abuf, len); + tEncodeStreamTaskCheckRsp(&encoder, &rsp); + tEncoderClear(&encoder); + + SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; + + tmsgSendRsp(&rspMsg); + return 0; +} + +int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) { + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + + int32_t code; + SStreamTaskCheckRsp rsp; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pReq, len); + code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); + + if (code < 0) { + tDecoderClear(&decoder); + return -1; + } + + tDecoderClear(&decoder); + qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); + + SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.upstreamTaskId); + if (pTask == NULL) { + qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, + pSnode->pMeta->vgId); + return -1; + } + + code = streamProcessCheckRsp(pTask, &rsp); + streamMetaReleaseTask(pSnode->pMeta, pTask); + return code; +} + int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: @@ -312,10 +414,14 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return sndProcessTaskRetrieveReq(pSnode, pMsg); case TDMT_STREAM_RETRIEVE_RSP: return sndProcessTaskRetrieveRsp(pSnode, pMsg); - case TDMT_STREAM_RECOVER_FINISH: + case TDMT_STREAM_SCAN_HISTORY_FINISH: return sndProcessTaskRecoverFinishReq(pSnode, pMsg); - case TDMT_STREAM_RECOVER_FINISH_RSP: + case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: return sndProcessTaskRecoverFinishRsp(pSnode, pMsg); + case TDMT_STREAM_TASK_CHECK: + return sndProcessStreamTaskCheckReq(pSnode, pMsg); + case TDMT_STREAM_TASK_CHECK_RSP: + return sndProcessStreamTaskCheckRsp(pSnode, pMsg); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 22baea736ddaab02476cad881478fca0f16931c8..7f95e48c419d38656bb2ab1de4b6f521d97e0195 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -171,7 +171,6 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq); // tq util int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); -char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0eb2817e22bd1cee7f065dcbe6c0b9af907869e3..d85b78b80cf3f41b1c97a8ab2d8205ef24683d0c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -218,7 +218,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. -int tqCheckforStreamStatus(STQ* pTq); +int tqCheckStreamStatus(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); @@ -240,7 +240,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e389f77a30ed283ad20f319ee1e6dea07b44a5bd..b4eb5957c71c988db1fb7b4a48fa56e7c2b579f0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -920,12 +920,15 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { +int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code; SStreamTaskCheckRsp rsp; SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecoderInit(&decoder, (uint8_t*)pReq, len); code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); if (code < 0) { @@ -993,37 +996,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosWUnLockLatch(&pStreamMeta->lock); // 3. It's an fill history task, do nothing. wait for the main task to start it - if (pTask->info.fillHistory) { - tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); - } else { - // calculate the correct start time window, and start the handle the history data for the main task. - if (pTask->historyTaskId.taskId != 0) { - // launch the history fill stream task - streamTaskStartHistoryTask(pTask); - - // launch current task - SHistDataRange* pRange = &pTask->dataRange; - int64_t ekey = pRange->window.ekey; - int64_t ver = pRange->range.minVer; - - pRange->window.skey = ekey; - pRange->window.ekey = INT64_MAX; - pRange->range.minVer = 0; - pRange->range.maxVer = ver; - - tqDebug("s-task:%s fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 - ", ver range:%" PRId64 " - %" PRId64, - pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); - } else { - SHistDataRange* pRange = &pTask->dataRange; - tqDebug("s-task:%s no associated task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 - " - %" PRId64, - pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); - } - - ASSERT(pTask->status.checkDownstream == 0); - streamTaskCheckDownstreamTasks(pTask); - } + streamPrepareNdoCheckDownstream(pTask); tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); @@ -1160,9 +1133,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of // history scan. The current version of chkInfo.current is not updated during the history scan - tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64 - ", window:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + if (pTask->historyTaskId.taskId == 0) { + pTask->dataRange.window.ekey = INT64_MAX; + pTask->dataRange.window.skey = INT64_MIN; + tqDebug("s-task:%s without associated stream task, reset the time window:%"PRId64" - %"PRId64, pTask->id.idStr, + pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + } else { + tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64 + ", window:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + } code = streamTaskScanHistoryDataComplete(pTask); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 6fbc4197ee2959e018fb5d49c5f651a24fc8e74e..5c0e735a8124d6ff781d775e656c24a69e550993 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -87,7 +87,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { return 0; } -int32_t tqCheckforStreamStatus(STQ* pTq) { +int32_t tqCheckStreamStatus(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a301d82c30a603f6c45ccff200e90b6b3f87494b..723e844346e4bc4b16fac38a2eb0f288e45f8853 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -20,12 +20,6 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); -char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { - char buf[128] = {0}; - sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId); - return taosStrdup(buf); -} - int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); if (code < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a099f3123e2e300c6e916856b454718266d257b9..0f7bcd25a03b4a837d4658815f9970d3637e2d87 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -579,11 +579,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECK: return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECK_RSP: { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pReq, len); - } + case TDMT_STREAM_TASK_CHECK_RSP: + return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg); case TDMT_STREAM_RETRIEVE: return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: @@ -595,9 +592,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) int32_t len = pMsg->contLen - sizeof(SMsgHead); return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len); } - case TDMT_STREAM_RECOVER_FINISH: + case TDMT_STREAM_SCAN_HISTORY_FINISH: return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); - case TDMT_STREAM_RECOVER_FINISH_RSP: + case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 3e4655ff759fd55f555fa69667abb06781b3621e..360da41482c43cc13236b104052220c87bd216cf 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId); } else { vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); - tqCheckforStreamStatus(pVnode->pTq); + tqCheckStreamStatus(pVnode->pTq); } } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index e93c280d62ee2671021a71e2826f0e439383ae55..504348613571c766ff91e771dcc3b025710e79c8 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -55,6 +55,12 @@ void streamCleanUp() { } } +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { + char buf[128] = {0}; + sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId); + return taosStrdup(buf); +} + void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d48bfbb1890bae3993fecdfd266b6d0d616e5b75..7ce85b4e6514ea28a4c1b9baae241f071b2af72c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -308,7 +308,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe msg.contLen = tlen + sizeof(SMsgHead); msg.pCont = buf; - msg.msgType = TDMT_STREAM_RECOVER_FINISH; + msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH; msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f041b17d2e79cd4c178647d3cd1aae9dfc8f9518..577022bbef8468c845fa5181edc7e081e6f433cf 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -98,7 +98,6 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; @@ -126,8 +125,9 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { } } else { pTask->status.checkDownstream = 1; - qDebug("s-task:%s (vgId:%d) set downstream task checked for task without downstream tasks, try to launch scan-history-data, status:%s", + qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s", pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus)); + streamTaskLaunchScanHistory(pTask); } @@ -460,11 +460,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // todo fix the bug: 2. race condition // an fill history task needs to be started. -int32_t streamTaskStartHistoryTask(SStreamTask* pTask) { +int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; - if (pTask->historyTaskId.taskId == 0) { - return TSDB_CODE_SUCCESS; - } // Set the execute conditions, including the query time window and the version range SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId)); @@ -585,3 +582,40 @@ int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishRe tEndDecode(pDecoder); return 0; } + +void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { + if (pTask->info.fillHistory) { + qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); + } else { + // calculate the correct start time window, and start the handle the history data for the main task. + if (pTask->historyTaskId.taskId != 0) { + // check downstream tasks for associated scan-history-data tasks + streamCheckHistoryTaskDownstrem(pTask); + + // launch current task + SHistDataRange* pRange = &pTask->dataRange; + int64_t ekey = pRange->window.ekey; + int64_t ver = pRange->range.minVer; + + pRange->window.skey = ekey; + pRange->window.ekey = INT64_MAX; + pRange->range.minVer = 0; + pRange->range.maxVer = ver; + + qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 + ", ver range:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, + pRange->range.maxVer); + } else { + SHistDataRange* pRange = &pTask->dataRange; + qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 + " - %" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); + } + + ASSERT(pTask->status.checkDownstream == 0); + + // check downstream tasks for itself + streamTaskCheckDownstreamTasks(pTask); + } +} diff --git a/tests/script/tsim/stream/partitionby.sim b/tests/script/tsim/stream/partitionby.sim index df4b60314fc8b0b52fa2f7075262cea2cd0106ce..9a660741e7eab7242564225377ba2fc77691604a 100644 --- a/tests/script/tsim/stream/partitionby.sim +++ b/tests/script/tsim/stream/partitionby.sim @@ -14,6 +14,7 @@ sql create table ts3 using st tags(3,2,2); sql create table ts4 using st tags(4,2,2); sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into test0.streamtST1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st partition by ta,tb,tc interval(10s); +sleep 500 sql insert into ts1 values(1648791213001,1,12,3,1.0); sql insert into ts2 values(1648791213001,1,12,3,1.0);