From be8fd9e48bb25aaf49008aeab8c6f39186a081cd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 14:47:10 +0800 Subject: [PATCH] enh(stream): create additional task for history data processing. --- include/libs/stream/tstream.h | 3 +- source/dnode/mnode/impl/inc/mndDef.h | 4 +- source/dnode/mnode/impl/src/mndScheduler.c | 409 ++++++++++++++------- source/dnode/mnode/impl/src/mndStream.c | 30 ++ 4 files changed, 301 insertions(+), 145 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1d4bbf073e..6d54790b2f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -289,9 +289,10 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; int8_t fillHistory; // fill history + int64_t ekey; // end ts key int64_t endVer; // end version - + SStreamId historyTaskId; // children info SArray* childEpInfo; // SArray int32_t nextCheckId; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5c288ec9e5..0343e91ac2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -677,8 +677,8 @@ typedef struct { char* physicalPlan; SArray* tasks; // SArray> - SArray* pBatchTask; // generate the results for already stored ts data - int64_t batchTaskUid; // stream task for history ts data + SArray* pHTasksList; // generate the results for already stored ts data + int64_t hTaskUid; // stream task for history ts data SSchemaWrapper outputSchema; SSchemaWrapper tagSchema; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a6a9c9b6e2..e8cb8604ea 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -169,7 +169,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) { return pObj; } -int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) { +int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) { int32_t msgLen; pTask->nodeId = SNODE_HANDLE; @@ -242,17 +242,16 @@ static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStr return 0; } -static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, SStreamObj* pStream, - SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory, +static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, + SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t fillHistory, bool hasExtraSink) { - SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->conf.triggerParam, pTaskList); + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList); if (pTask == NULL) { return terrno; } // sink or dispatch if (hasExtraSink) { - mndAddDispatcherForInnerTask(pMnode, pStream, pSinkTaskList, pTask); } else { mndSetSinkTaskInfo(pStream, pTask); @@ -290,7 +289,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; } -int32_t appendToDownstream(SStreamTask* pTask, SStreamTask* pDownstream) { +int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); if (pEpInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -304,182 +303,312 @@ int32_t appendToDownstream(SStreamTask* pTask, SStreamTask* pDownstream) { return TSDB_CODE_SUCCESS; } -static int32_t doScheduleStream(uint64_t uid, SArray* pTasksList, SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) { +static SArray* addNewTaskList(SArray* pTasksList) { + SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); + taosArrayPush(pTasksList, &pTaskList); + return pTaskList; +} + +// set the history task id +static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { + for(int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) { + SStreamTask* pStreamTask = taosArrayGet(pTaskList, i); + SStreamTask* pHTask = taosArrayGet(pHTaskList, i); + + pStreamTask->historyTaskId.taskId = pHTask->id.taskId; + pStreamTask->historyTaskId.streamId = pHTask->id.streamId; + } +} + +static int32_t addSourceTasksForSingleLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream, + bool hasExtraSink) { + // create exec stream task, since only one level, the exec task is also the source task + SArray* pTaskList = addNewTaskList(pStream->tasks); + + SArray* pHTaskList = NULL; + if (pStream->conf.fillHistory) { + pHTaskList = addNewTaskList(pStream->pHTasksList); + } + SSdb* pSdb = pMnode->pSdb; - int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); + SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); + if (LIST_LENGTH(inner->pNodeList) != 1) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } - bool hasExtraSink = false; - bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; - SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); - if (pDbObj == NULL) { + SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); + if (plan->subplanType != SUBPLAN_TYPE_SCAN) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - bool multiTarget = (pDbObj->cfg.numOfVgroups > 1); - sdbRelease(pSdb, pDbObj); + void* pIter = NULL; + while (1) { + SVgObj* pVgroup; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) { + break; + } - if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { - SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pTasksList, &taskOneLevel); + if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { + sdbRelease(pSdb, pVgroup); + continue; + } - // add extra sink - hasExtraSink = true; - if (pStream->fixedSinkVgId == 0) { - if (mndAddShuffleSinkTasksToStream(pMnode, taskOneLevel, pStream) < 0) { - // TODO free - return -1; - } - } else { - if (mndAddSinkTaskToStream(pStream, taskOneLevel, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) { - // TODO free - return -1; - } + // new stream task + SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); + int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, + pStream->conf.fillHistory, hasExtraSink); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pSdb, pVgroup); + return -1; + } + + if (pStream->conf.fillHistory) { + SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); + code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, 0, + hasExtraSink); + setHTasksId(pTaskList, pHTaskList); } + + sdbRelease(pSdb, pVgroup); + if (code != TSDB_CODE_SUCCESS) { + return -1; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask, SMnode* pMnode, + SSubplan* pPlan, SVgObj* pVgroup) { + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList); + if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - pStream->totalLevel = planTotLevel + hasExtraSink; + // all the source tasks dispatch result to a single agg node. + setFixedDownstreamEpInfo(pTask, pDownstreamTask); + if (mndAssignStreamTaskToVgroup(pMnode, pTask, pPlan, pVgroup) < 0) { + return -1; + } - if (planTotLevel > 1) { - SStreamTask* pInnerTask; - // inner level - { - SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pTasksList, &taskInnerLevel); - - SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); - SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - if (plan->subplanType != SUBPLAN_TYPE_MERGE) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } + return setEpToDownstreamTask(pTask, pDownstreamTask); +} - pInnerTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pStream->conf.fillHistory, pStream->conf.triggerParam, taskInnerLevel); - if (pInnerTask == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } +static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, + SStreamTask** pAggTask) { + *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pStream->conf.fillHistory, + pStream->conf.triggerParam, pTaskList); + if (*pAggTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - // dispatch - SArray* pSinkNodeList = taosArrayGet(pTasksList, SINK_NODE_LEVEL); - if (mndAddDispatcherForInnerTask(pMnode, pStream, pSinkNodeList, pInnerTask) < 0) { - return -1; - } + // dispatch + if (mndAddDispatcherForInnerTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) { + return -1; + } - if (tsDeployOnSnode) { - SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); - if (pSnode == NULL) { - SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); - if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) { - sdbRelease(pSdb, pVgroup); - return -1; - } - sdbRelease(pSdb, pVgroup); - } else { - if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) { - sdbRelease(pSdb, pSnode); - return -1; - } - } - } else { - SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); - if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) { - sdbRelease(pSdb, pVgroup); - return -1; - } + return 0; +} - sdbRelease(pSdb, pVgroup); - } - } +static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) { + SArray* pAggTaskList = addNewTaskList(pStream->tasks); + SSdb* pSdb = pMnode->pSdb; - // source level - SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pTasksList, &taskSourceLevel); + SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); + SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); + if (plan->subplanType != SUBPLAN_TYPE_MERGE) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } - SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1); - SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - if (plan->subplanType != SUBPLAN_TYPE_SCAN) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; + SStreamTask* pAggTask = NULL; + SArray* pSinkNodeList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); + + int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, &pAggTask); + if (code != TSDB_CODE_SUCCESS) { + return -1; + } + + SVgObj* pVgroup = NULL; + SSnodeObj* pSnode = NULL; + + if (tsDeployOnSnode) { + pSnode = mndSchedFetchOneSnode(pMnode); + if (pSnode == NULL) { + pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); } + } else { + pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); + } - void* pIter = NULL; - while (1) { - SVgObj* pVgroup; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) { - break; - } + if (pSnode != NULL) { + code = mndAssignStreamTaskToSnode(pMnode, pAggTask, plan, pSnode); + } else { + code = mndAssignStreamTaskToVgroup(pMnode, pAggTask, plan, pVgroup); + } - if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { - sdbRelease(pSdb, pVgroup); - continue; - } + if (pStream->conf.fillHistory) { + SArray* pHAggTaskList = addNewTaskList(pStream->pHTasksList); - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pStream->conf.fillHistory, 0, taskSourceLevel); - if (pTask == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + SStreamTask* pHAggTask = NULL; + code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, &pHAggTask); + if (code != TSDB_CODE_SUCCESS) { + if (pSnode != NULL) { + sdbRelease(pSdb, pSnode); + } else { sdbRelease(pSdb, pVgroup); - return -1; } + return code; + } - // all the source tasks dispatch result to a single agg node. - setFixedDownstreamEpInfo(pTask, pInnerTask); + if (pSnode != NULL) { + code = mndAssignStreamTaskToSnode(pMnode, pHAggTask, plan, pSnode); + } else { + code = mndAssignStreamTaskToVgroup(pMnode, pHAggTask, plan, pVgroup); + } - if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { - sdbRelease(pSdb, pVgroup); - return -1; - } + setHTasksId(pAggTaskList, pHAggTaskList); + } + + if (pSnode != NULL) { + sdbRelease(pSdb, pSnode); + } else { + sdbRelease(pSdb, pVgroup); + } + + return code; +} + +static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream, + SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask) { + SArray* pSourceTaskList = addNewTaskList(pStream->tasks); + + SArray* pHSourceTaskList = NULL; + if (pStream->conf.fillHistory) { + pHSourceTaskList = addNewTaskList(pStream->pHTasksList); + } + + SSdb* pSdb = pMnode->pSdb; + SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1); + SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); + if (plan->subplanType != SUBPLAN_TYPE_SCAN) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + + void* pIter = NULL; + while (1) { + SVgObj* pVgroup; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) { + break; + } - int32_t code = appendToDownstream(pTask, pInnerTask); + if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { + sdbRelease(pSdb, pVgroup); + continue; + } + + int32_t code = doAddSourceTask(pSourceTaskList, pStream->conf.fillHistory, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pSdb, pVgroup); + terrno = code; + return -1; + } + + if (pStream->conf.fillHistory) { + code = doAddSourceTask(pHSourceTaskList, 0, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup); sdbRelease(pSdb, pVgroup); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; + return code; } + + setHTasksId(pSourceTaskList, pHSourceTaskList); } - } else if (planTotLevel == 1) { - // create exec stream task, since only one level, the exec task is also the source task - SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pTasksList, &pTaskList); + } - SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); - if (LIST_LENGTH(inner->pNodeList) != 1) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; + return TSDB_CODE_SUCCESS; +} + +static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList) { + SArray* pSinkTaskList = addNewTaskList(pTasksList); + if (pStream->fixedSinkVgId == 0) { + if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream) < 0) { + // TODO free return -1; } - - SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - if (plan->subplanType != SUBPLAN_TYPE_SCAN) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; + } else { + if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) { + // TODO free return -1; } + } - void* pIter = NULL; - while (1) { - SVgObj* pVgroup; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) { - break; - } + *pCreatedTaskList = pSinkTaskList; + return TSDB_CODE_SUCCESS; +} - if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { - sdbRelease(pSdb, pVgroup); - continue; - } +static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) { + SSdb* pSdb = pMnode->pSdb; + int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); - // new stream task - SArray* pSinkNodeTaskList = taosArrayGet(pTasksList, SINK_NODE_LEVEL); - int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pSinkNodeTaskList, pStream, plan, uid, TASK_LEVEL__SOURCE, pStream->conf.fillHistory, hasExtraSink); - sdbRelease(pSdb, pVgroup); + bool hasExtraSink = false; + bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; + SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); + if (pDbObj == NULL) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + bool multiTarget = (pDbObj->cfg.numOfVgroups > 1); + sdbRelease(pSdb, pDbObj); + + pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); + pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); + + if (numOfPlanLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { + // add extra sink + hasExtraSink = true; + + SArray* pSinkTaskList = NULL; + int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // check for fill history + if (pStream->conf.fillHistory) { + SArray* pHSinkTaskList = NULL; + code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList); if (code != TSDB_CODE_SUCCESS) { - return -1; + return code; } + + setHTasksId(pSinkTaskList, pHSinkTaskList); } } + pStream->totalLevel = numOfPlanLevel + hasExtraSink; + + if (numOfPlanLevel > 1) { + SStreamTask* pInnerTask; + int32_t code = addAggTask(pStream, pMnode, pPlan); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // source level + return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pInnerTask, NULL); + } else if (numOfPlanLevel == 1) { + return addSourceTasksForSingleLevelStream(pMnode, pPlan, pStream, hasExtraSink); + } + return 0; } @@ -490,13 +619,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - int32_t code = doScheduleStream(pStream->uid, pStream->tasks, pStream, pMnode, pPlan); - - if (code == TSDB_CODE_SUCCESS) { - code = doScheduleStream(pStream->batchTaskUid, pStream->pBatchTask, pStream, pMnode, pPlan); - } - + int32_t code = doScheduleStream(pStream, pMnode, pPlan); qDestroyQueryPlan(pPlan); + return code; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f5cd673225..81b985f515 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -428,17 +428,22 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { SEncoder encoder; tEncoderInit(&encoder, NULL, 0); tEncodeStreamTask(&encoder, pTask); + int32_t size = encoder.pos; int32_t tlen = sizeof(SMsgHead) + size; tEncoderClear(&encoder); + void *buf = taosMemoryCalloc(1, tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + ((SMsgHead *)buf)->vgId = htonl(pTask->nodeId); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncoderInit(&encoder, abuf, size); + tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); @@ -448,10 +453,12 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { action.pCont = buf; action.contLen = tlen; action.msgType = TDMT_STREAM_TASK_DEPLOY; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); return -1; } + return 0; } @@ -468,6 +475,25 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea } } } + + // persistent stream task for history data + if (pStream->conf.fillHistory) { + level = taosArrayGetSize(pStream->pHTasksList); + + for (int32_t i = 0; i < level; i++) { + SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfTasks; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + if (mndPersistTaskDeployReq(pTrans, pTask) < 0) { + return -1; + } + } + } + } + + return 0; } @@ -475,11 +501,13 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { if (mndPersistStreamTasks(pMnode, pTrans, pStream) < 0) { return -1; } + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); return -1; } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); return 0; } @@ -491,6 +519,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr mndTransDrop(pTrans); return -1; } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); return 0; } @@ -733,6 +762,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); goto _OVER; } + mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); -- GitLab