diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 7d707f4cbaa4021223e8e8ed59342453b8b87584..82b714e6eb6e3a29cc3ea16d1fb4d0c1bd9d6a6d 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -663,9 +663,10 @@ typedef struct { char targetDb[TSDB_DB_FNAME_LEN]; char targetSTbName[TSDB_TABLE_FNAME_LEN]; int64_t targetStbUid; - int32_t fixedSinkVgId; // 0 for shuffle + // fixedSinkVg is not applicable for encode and decode SVgObj fixedSinkVg; + int32_t fixedSinkVgId; // 0 for shuffle // transformation char* sql; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 18f10e92f9e10aa326e17396074692099f27be26..b68565b14668f99c5e881be9ce98ecdacaab0f4e 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -14,18 +14,8 @@ */ #include "mndScheduler.h" -#include "mndConsumer.h" #include "mndDb.h" -#include "mndDnode.h" -#include "mndMnode.h" -#include "mndShow.h" #include "mndSnode.h" -#include "mndStb.h" -#include "mndStream.h" -#include "mndSubscribe.h" -#include "mndTopic.h" -#include "mndTrans.h" -#include "mndUser.h" #include "mndVgroup.h" #include "parser.h" #include "tcompare.h" @@ -34,7 +24,10 @@ extern bool tsDeployOnSnode; -static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) { +static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup); +static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); + +static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); pTask->selfChildId = childId; taosArrayPush(pArray, &pTask); @@ -97,7 +90,7 @@ END: return terrno; } -int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { +int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { if (pStream->smaId != 0) { pTask->outputType = TASK_OUTPUT__SMA; pTask->smaSink.smaId = pStream->smaId; @@ -106,18 +99,23 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask pTask->tbSink.stbUid = pStream->targetStbUid; memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); + if (pTask->tbSink.pSchemaWrapper == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } + return 0; } #define SINK_NODE_LEVEL (0) -int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { +int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { bool isShuffle = false; if (pStream->fixedSinkVgId == 0) { SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); if (pDb != NULL && pDb->cfg.numOfVgroups > 1) { + isShuffle = true; pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; @@ -129,19 +127,19 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream sdbRelease(pMnode->pSdb, pDb); } + SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); + int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList); + if (isShuffle) { memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - SArray* pSinkNodes = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); - int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodes); - int32_t numOfVgroups = taosArrayGetSize(pVgs); for (int32_t i = 0; i < numOfVgroups; i++) { SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); for (int32_t j = 0; j < numOfSinkNodes; j++) { - SStreamTask* pSinkTask = taosArrayGetP(pSinkNodes, j); + SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j); if (pSinkTask->nodeId == pVgInfo->vgId) { pVgInfo->taskId = pSinkTask->id.taskId; break; @@ -149,17 +147,8 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream } } } else { - pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); - - // one sink only - SStreamTask* lastLevelTask = taosArrayGetP(pSinkNodeList, 0); - STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; - - pDispatcher->taskId = lastLevelTask->id.taskId; - pDispatcher->nodeId = lastLevelTask->nodeId; - pDispatcher->epSet = lastLevelTask->epSet; + SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0); + setFixedDownstreamEpInfo(pTask, pOneSinkTask); } return 0; @@ -224,7 +213,6 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; - SArray* pTaskList = taosArrayGetP(pStream->tasks, 0); while (1) { SVgObj* pVgroup = NULL; @@ -238,73 +226,97 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { continue; } - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory); - if (pTask == NULL) { - sdbRelease(pSdb, pVgroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - mndAddTaskToTaskSet(pTaskList, pTask); - pTask->nodeId = pVgroup->vgId; - pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); - - // sink - if (pStream->smaId != 0) { - pTask->outputType = TASK_OUTPUT__SMA; - pTask->smaSink.smaId = pStream->smaId; - } else { - pTask->outputType = TASK_OUTPUT__TABLE; - pTask->tbSink.stbUid = pStream->targetStbUid; - memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); - pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); - if (pTask->tbSink.pSchemaWrapper == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } - + mndAddSinkTaskToStream(pStream, pMnode, pVgroup->vgId, pVgroup); sdbRelease(pSdb, pVgroup); } + return 0; } -int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { - SArray* tasks = taosArrayGetP(pStream->tasks, 0); +int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) { + SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - mndAddTaskToTaskSet(tasks, pTask); - pTask->nodeId = pStream->fixedSinkVgId; + mndAddToTaskset(pTaskList, pTask); -#if 0 - SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); - if (pVgroup == NULL) { - return -1; - } + pTask->nodeId = vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); -#endif + mndSetSinkTaskInfo(pStream, pTask); + return 0; +} - pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); +static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStream) { + return 0; +} - // sink - if (pStream->smaId != 0) { - pTask->outputType = TASK_OUTPUT__SMA; - pTask->smaSink.smaId = pStream->smaId; +static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream, + SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory, + bool hasExtraSink) { + SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory); + if (pTask == NULL) { + return terrno; + } + + mndAddToTaskset(pTaskList, pTask); + pTask->triggerParam = pStream->triggerParam; // trigger + + // sink or dispatch + if (hasExtraSink) { + mndAddDispatcherForInnerTask(pMnode, pStream, pTask); } else { - pTask->outputType = TASK_OUTPUT__TABLE; - pTask->tbSink.stbUid = pStream->targetStbUid; - memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); - pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); + mndSetSinkTaskInfo(pStream, pTask); } - return 0; + if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { + return terrno; + } + + return TSDB_CODE_SUCCESS; } +static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { + SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); + if (pEpInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pEpInfo->childId = pTask->selfChildId; + pEpInfo->epSet = pTask->epSet; + pEpInfo->nodeId = pTask->nodeId; + pEpInfo->taskId = pTask->id.taskId; + + return pEpInfo; +} + +void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { + STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher; + pDispatcher->taskId = pTask->id.taskId; + pDispatcher->nodeId = pTask->nodeId; + pDispatcher->epSet = pTask->epSet; + pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; + pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; +} + +int32_t appendToUpstream(SStreamTask* pTask, SStreamTask* pUpstream) { + SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); + if (pEpInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if(pUpstream->childEpInfo == NULL) { + pUpstream->childEpInfo = taosArrayInit(4, POINTER_BYTES); + } + + taosArrayPush(pUpstream->childEpInfo, &pEpInfo); + return TSDB_CODE_SUCCESS; +} int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; @@ -341,7 +353,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } } else { - if (mndAddFixedSinkTaskToStream(pMnode, pStream) < 0) { + if (mndAddSinkTaskToStream(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) { // TODO free return -1; } @@ -354,7 +366,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SStreamTask* pInnerTask; // inner level { - SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*)); + SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES); taosArrayPush(pStream->tasks, &taskInnerLevel); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); @@ -371,12 +383,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); - pInnerTask->childEpInfo = taosArrayInit(0, POINTER_BYTES); + mndAddToTaskset(taskInnerLevel, pInnerTask); pInnerTask->triggerParam = pStream->triggerParam; // trigger // dispatch - if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) { + if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) { qDestroyQueryPlan(pPlan); return -1; } @@ -411,7 +422,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } // source level - SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*)); + SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES); taosArrayPush(pStream->tasks, &taskSourceLevel); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1); @@ -425,7 +436,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { while (1) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { sdbRelease(pSdb, pVgroup); continue; @@ -439,15 +453,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - mndAddTaskToTaskSet(taskSourceLevel, pTask); - pTask->triggerParam = 0; - - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; // add fixed vg dispatch - pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; + mndAddToTaskset(taskSourceLevel, pTask); - pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId; - pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId; - pTask->fixedEpDispatcher.epSet = pInnerTask->epSet; + // all the source tasks dispatch result to a single agg node. + setFixedDownstreamEpInfo(pTask, pInnerTask); + pTask->triggerParam = 0; if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); @@ -455,25 +465,19 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); - if (pEpInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - sdbRelease(pSdb, pVgroup); + int32_t code = appendToUpstream(pTask, pInnerTask); + sdbRelease(pSdb, pVgroup); + + if (code != TSDB_CODE_SUCCESS) { + terrno = code; qDestroyQueryPlan(pPlan); return -1; } - - pEpInfo->childId = pTask->selfChildId; - pEpInfo->epSet = pTask->epSet; - pEpInfo->nodeId = pTask->nodeId; - pEpInfo->taskId = pTask->id.taskId; - taosArrayPush(pInnerTask->childEpInfo, &pEpInfo); - sdbRelease(pSdb, pVgroup); } } else if (planTotLevel == 1) { // create exec stream task, since only one level, the exec task is also the source task - SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pStream->tasks, &taskOneLevel); + SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); + taosArrayPush(pStream->tasks, &pTaskList); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); if (LIST_LENGTH(inner->pNodeList) != 1) { @@ -501,30 +505,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } // new stream task - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory); - if (pTask == NULL) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - return -1; - } - - mndAddTaskToTaskSet(taskOneLevel, pTask); - pTask->triggerParam = pStream->triggerParam; // trigger - - // sink or dispatch - if (hasExtraSink) { - mndAddDispatcherToInnerTask(pMnode, pStream, pTask); - } else { - mndAddSinkToTask(pMnode, pStream, pTask); - } + int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pStream, plan, pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, hasExtraSink); + sdbRelease(pSdb, pVgroup); - if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { - sdbRelease(pSdb, pVgroup); + if (code != TSDB_CODE_SUCCESS) { qDestroyQueryPlan(pPlan); return -1; } - - sdbRelease(pSdb, pVgroup); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c57015bc5f47313eafb603632c368b930a218a37..5152e43dbdbad6b60830167600d30eeaffe0d0a9 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -20,6 +20,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; }