diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8e7dd0bb0d0dca1f8465cc07eb5f7d9695fed267..7008c0ac40178eeeed8d9290a9a49a9eb8511e21 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -78,11 +78,11 @@ enum { TASK_TRIGGER_STATUS__ACTIVE, }; -enum { +typedef enum { TASK_LEVEL__SOURCE = 1, TASK_LEVEL__AGG, TASK_LEVEL__SINK, -}; +} ETASK_LEVEL; enum { TASK_OUTPUT__FIXED_DISPATCH = 1, @@ -284,13 +284,13 @@ struct SStreamTask { int16_t dispatchMsgType; SStreamStatus status; int32_t selfChildId; - int32_t nodeId; + int32_t nodeId; // vgroup id SEpSet epSet; SCheckpointInfo chkInfo; STaskExec exec; - - // fill history - int8_t fillHistory; + int8_t fillHistory; // fill history + int64_t ekey; // end ts key + int64_t endVer; // end version // children info SArray* childEpInfo; // SArray @@ -351,7 +351,7 @@ typedef struct SStreamMeta { int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); -SStreamTask* tNewStreamTask(int64_t streamId); +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 734f624be0e19c942c10244f28263570d6ea4504..18f10e92f9e10aa326e17396074692099f27be26 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -110,6 +110,8 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask return 0; } +#define SINK_NODE_LEVEL (0) + int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { bool isShuffle = false; @@ -130,15 +132,18 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream if (isShuffle) { memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(pVgs); - SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); - int32_t sinkLvSize = taosArrayGetSize(sinkLv); - for (int32_t i = 0; i < sz; i++) { + + SArray* pSinkNodes = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); + int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodes); + + int32_t numOfVgroups = taosArrayGetSize(pVgs); + for (int32_t i = 0; i < numOfVgroups; i++) { SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); - for (int32_t j = 0; j < sinkLvSize; j++) { - SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); - if (pLastLevelTask->nodeId == pVgInfo->vgId) { - pVgInfo->taskId = pLastLevelTask->id.taskId; + + for (int32_t j = 0; j < numOfSinkNodes; j++) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkNodes, j); + if (pSinkTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pSinkTask->id.taskId; break; } } @@ -146,28 +151,33 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream } else { pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - SArray* pArray = taosArrayGetP(pStream->tasks, 0); + SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); + // one sink only - SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->fixedEpDispatcher.taskId = lastLevelTask->id.taskId; - pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; - pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; + SStreamTask* lastLevelTask = taosArrayGetP(pSinkNodeList, 0); + STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; + + pDispatcher->taskId = lastLevelTask->id.taskId; + pDispatcher->nodeId = lastLevelTask->nodeId; + pDispatcher->epSet = lastLevelTask->epSet; } + return 0; } -int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { +int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t msgLen; + pTask->nodeId = pVgroup->vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); - plan->execNode.nodeId = pVgroup->vgId; + plan->execNode.nodeId = pTask->nodeId; plan->execNode.epSet = pTask->epSet; - if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } + return 0; } @@ -210,35 +220,35 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } +// create sink node for each vgroup. int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; - SArray* tasks = taosArrayGetP(pStream->tasks, 0); + SArray* pTaskList = taosArrayGetP(pStream->tasks, 0); while (1) { SVgObj* pVgroup = NULL; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) { sdbRelease(pSdb, pVgroup); continue; } - SStreamTask* pTask = tNewStreamTask(pStream->uid); + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory); if (pTask == NULL) { sdbRelease(pSdb, pVgroup); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pTask->fillHistory = pStream->fillHistory; - mndAddTaskToTaskSet(tasks, pTask); + mndAddTaskToTaskSet(pTaskList, pTask); pTask->nodeId = pVgroup->vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); - // type - pTask->taskLevel = TASK_LEVEL__SINK; - // sink if (pStream->smaId != 0) { pTask->outputType = TASK_OUTPUT__SMA; @@ -253,6 +263,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } } + sdbRelease(pSdb, pVgroup); } return 0; @@ -260,15 +271,15 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { SArray* tasks = taosArrayGetP(pStream->tasks, 0); - SStreamTask* pTask = tNewStreamTask(pStream->uid); + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pTask->fillHistory = pStream->fillHistory; - mndAddTaskToTaskSet(tasks, pTask); + mndAddTaskToTaskSet(tasks, pTask); pTask->nodeId = pStream->fixedSinkVgId; + #if 0 SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); if (pVgroup == NULL) { @@ -276,9 +287,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { } pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); #endif - pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); - pTask->taskLevel = TASK_LEVEL__SINK; + pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); // sink if (pStream->smaId != 0) { @@ -294,16 +304,19 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { return 0; } + + int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { - SSdb* pSdb = pMnode->pSdb; + SSdb* pSdb = pMnode->pSdb; + SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); - pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*)); + int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); + pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES); bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; @@ -313,13 +326,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - bool multiTarget = pDbObj->cfg.numOfVgroups > 1; + bool multiTarget = (pDbObj->cfg.numOfVgroups > 1); sdbRelease(pSdb, pDbObj); if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { - /*if (true) {*/ - SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES); taosArrayPush(pStream->tasks, &taskOneLevel); + // add extra sink hasExtraSink = true; if (pStream->fixedSinkVgId == 0) { @@ -334,6 +347,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } } } + pStream->totalLevel = planTotLevel + hasExtraSink; if (planTotLevel > 1) { @@ -350,22 +364,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - pInnerTask = tNewStreamTask(pStream->uid); + pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory); if (pInnerTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; qDestroyQueryPlan(pPlan); return -1; } - pInnerTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); - - pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); - - pInnerTask->taskLevel = TASK_LEVEL__AGG; - - // trigger - pInnerTask->triggerParam = pStream->triggerParam; + pInnerTask->childEpInfo = taosArrayInit(0, POINTER_BYTES); + pInnerTask->triggerParam = pStream->triggerParam; // trigger // dispatch if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) { @@ -377,7 +385,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); if (pSnode == NULL) { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); - if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) { + if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; @@ -392,11 +400,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } } else { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); - if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) { + if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; } + sdbRelease(pSdb, pVgroup); } } @@ -422,30 +431,25 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { continue; } - SStreamTask* pTask = tNewStreamTask(pStream->uid); + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; } - pTask->fillHistory = pStream->fillHistory; - mndAddTaskToTaskSet(taskSourceLevel, pTask); + mndAddTaskToTaskSet(taskSourceLevel, pTask); pTask->triggerParam = 0; - // source - pTask->taskLevel = TASK_LEVEL__SOURCE; - - // add fixed vg dispatch - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; // add fixed vg dispatch pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId; pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId; pTask->fixedEpDispatcher.epSet = pInnerTask->epSet; - if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { + if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; @@ -458,6 +462,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { qDestroyQueryPlan(pPlan); return -1; } + pEpInfo->childId = pTask->selfChildId; pEpInfo->epSet = pTask->epSet; pEpInfo->nodeId = pTask->nodeId; @@ -465,10 +470,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { taosArrayPush(pInnerTask->childEpInfo, &pEpInfo); sdbRelease(pSdb, pVgroup); } - } - - if (planTotLevel == 1) { - SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + } else if (planTotLevel == 1) { + // create exec stream task, since only one level, the exec task is also the source task + SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES); taosArrayPush(pStream->tasks, &taskOneLevel); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); @@ -476,6 +480,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } + SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); if (plan->subplanType != SUBPLAN_TYPE_SCAN) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -486,26 +491,25 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { while (1) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { sdbRelease(pSdb, pVgroup); continue; } - SStreamTask* pTask = tNewStreamTask(pStream->uid); + // new stream task + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory); if (pTask == NULL) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; } - pTask->fillHistory = pStream->fillHistory; - mndAddTaskToTaskSet(taskOneLevel, pTask); - - // source - pTask->taskLevel = TASK_LEVEL__SOURCE; - // trigger - pTask->triggerParam = pStream->triggerParam; + mndAddTaskToTaskSet(taskOneLevel, pTask); + pTask->triggerParam = pStream->triggerParam; // trigger // sink or dispatch if (hasExtraSink) { @@ -514,14 +518,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { mndAddSinkToTask(pMnode, pStream, pTask); } - if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { + if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; } + sdbRelease(pSdb, pVgroup); } } + qDestroyQueryPlan(pPlan); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0713150b486d953ccb42eb6acd5e907251d268d6..39a1fa223f13c1ca7137172b7f55ee9e09f82817 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -700,6 +700,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (pStream->sourceDbUid == streamObj.sourceDbUid) { ++numOfStream; } + sdbRelease(pMnode->pSdb, pStream); if (numOfStream > MND_STREAM_MAX_NUM) { mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); @@ -723,6 +724,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { pDb = NULL; goto _OVER; } + mndReleaseDb(pMnode, pDb); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8a038969785b9a4f2a64436cca1953d95805d128..c57015bc5f47313eafb603632c368b930a218a37 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -17,7 +17,7 @@ #include "tstream.h" #include "wal.h" -SStreamTask* tNewStreamTask(int64_t streamId) { +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return NULL; @@ -25,6 +25,8 @@ SStreamTask* tNewStreamTask(int64_t streamId) { pTask->id.taskId = tGenIdPI32(); pTask->id.streamId = streamId; + pTask->taskLevel = taskLevel; + pTask->fillHistory = fillHistory; char buf[128] = {0}; sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);