From 2977a4d07f18ffb1ef9bfc3280a64b7d04e659a6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 09:41:19 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/mnode/impl/inc/mndDef.h | 23 +++-- source/dnode/mnode/impl/src/mndDef.c | 20 ++--- source/dnode/mnode/impl/src/mndDump.c | 8 +- source/dnode/mnode/impl/src/mndScheduler.c | 95 ++++++++++---------- source/dnode/mnode/impl/src/mndSma.c | 20 ++--- source/dnode/mnode/impl/src/mndStream.c | 25 +++--- source/libs/stream/src/streamTask.c | 100 +++++++++++++++++++++ 7 files changed, 199 insertions(+), 92 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 82b714e6eb..5c288ec9e5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -637,6 +637,14 @@ typedef struct { SMqSubActionLogEntry* pLogEntry; } SMqRebOutputObj; +typedef struct SStreamConf { + int8_t igExpired; + int8_t trigger; + int8_t fillHistory; + int64_t triggerParam; + int64_t watermark; +} SStreamConf; + typedef struct { char name[TSDB_STREAM_FNAME_LEN]; // ctl @@ -650,12 +658,7 @@ typedef struct { // info int64_t uid; int8_t status; - // config - int8_t igExpired; - int8_t trigger; - int8_t fillHistory; - int64_t triggerParam; - int64_t watermark; + SStreamConf conf; // source and target int64_t sourceDbUid; int64_t targetDbUid; @@ -665,14 +668,18 @@ typedef struct { int64_t targetStbUid; // fixedSinkVg is not applicable for encode and decode - SVgObj fixedSinkVg; + SVgObj fixedSinkVg; int32_t fixedSinkVgId; // 0 for shuffle // transformation char* sql; char* ast; char* physicalPlan; - SArray* tasks; // SArray> + SArray* tasks; // SArray> + + SArray* pBatchTask; // generate the results for already stored ts data + int64_t batchTaskUid; // stream task for history ts data + SSchemaWrapper outputSchema; SSchemaWrapper tagSchema; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6dab018236..589d35afb8 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -30,11 +30,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->conf.igExpired) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->conf.trigger) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->conf.fillHistory) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->conf.triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->conf.watermark) < 0) return -1; if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1; if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1; @@ -97,11 +97,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->conf.igExpired) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->conf.trigger) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->conf.fillHistory) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->conf.triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->conf.watermark) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndDump.c b/source/dnode/mnode/impl/src/mndDump.c index d57053bb5b..62b5cb00e6 100644 --- a/source/dnode/mnode/impl/src/mndDump.c +++ b/source/dnode/mnode/impl/src/mndDump.c @@ -367,10 +367,10 @@ void dumpStream(SSdb *pSdb, SJson *json) { tjsonAddStringToObject(item, "smaId", i642str(pObj->smaId)); tjsonAddStringToObject(item, "uid", i642str(pObj->uid)); tjsonAddStringToObject(item, "status", i642str(pObj->status)); - tjsonAddStringToObject(item, "igExpired", i642str(pObj->igExpired)); - tjsonAddStringToObject(item, "trigger", i642str(pObj->trigger)); - tjsonAddStringToObject(item, "triggerParam", i642str(pObj->triggerParam)); - tjsonAddStringToObject(item, "watermark", i642str(pObj->watermark)); + tjsonAddStringToObject(item, "igExpired", i642str(pObj->conf.igExpired)); + tjsonAddStringToObject(item, "trigger", i642str(pObj->conf.trigger)); + tjsonAddStringToObject(item, "triggerParam", i642str(pObj->conf.triggerParam)); + tjsonAddStringToObject(item, "watermark", i642str(pObj->conf.watermark)); tjsonAddStringToObject(item, "sourceDbUid", i642str(pObj->sourceDbUid)); tjsonAddStringToObject(item, "targetDbUid", i642str(pObj->targetDbUid)); tjsonAddStringToObject(item, "sourceDb", mndGetDbStr(pObj->sourceDb)); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index df8c11a6f6..a6a9c9b6e2 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -22,9 +22,10 @@ #include "tname.h" #include "tuuid.h" +#define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; -static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup); +static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, @@ -100,9 +101,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { return 0; } -#define SINK_NODE_LEVEL (0) - -int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { +int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, SStreamTask* pTask) { bool isShuffle = false; if (pStream->fixedSinkVgId == 0) { @@ -120,7 +119,6 @@ int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStrea sdbRelease(pMnode->pSdb, pDb); } - SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList); if (isShuffle) { @@ -187,6 +185,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, return 0; } +// todo random choose a node to do compute SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { void* pIter = NULL; SVgObj* pVgroup = NULL; @@ -203,7 +202,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { } // create sink node for each vgroup. -int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { +int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; @@ -219,17 +218,15 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { continue; } - mndAddSinkTaskToStream(pStream, pMnode, pVgroup->vgId, pVgroup); + mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup); sdbRelease(pSdb, pVgroup); } return 0; } -int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) { - SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); - - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory, 0, pTaskList); +int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) { + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->conf.fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -245,17 +242,18 @@ static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStr return 0; } -static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream, +static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory, bool hasExtraSink) { - SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->triggerParam, pTaskList); + SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->conf.triggerParam, pTaskList); if (pTask == NULL) { return terrno; } // sink or dispatch if (hasExtraSink) { - mndAddDispatcherForInnerTask(pMnode, pStream, pTask); + + mndAddDispatcherForInnerTask(pMnode, pStream, pSinkTaskList, pTask); } else { mndSetSinkTaskInfo(pStream, pTask); } @@ -292,31 +290,23 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; } -int32_t appendToUpstream(SStreamTask* pTask, SStreamTask* pUpstream) { +int32_t appendToDownstream(SStreamTask* pTask, SStreamTask* pDownstream) { SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); if (pEpInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - if(pUpstream->childEpInfo == NULL) { - pUpstream->childEpInfo = taosArrayInit(4, POINTER_BYTES); + if(pDownstream->childEpInfo == NULL) { + pDownstream->childEpInfo = taosArrayInit(4, POINTER_BYTES); } - taosArrayPush(pUpstream->childEpInfo, &pEpInfo); + taosArrayPush(pDownstream->childEpInfo, &pEpInfo); return TSDB_CODE_SUCCESS; } -int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { +static int32_t doScheduleStream(uint64_t uid, SArray* pTasksList, SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) { SSdb* pSdb = pMnode->pSdb; - - SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); - if (pPlan == NULL) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); - pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES); bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; @@ -331,17 +321,17 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pStream->tasks, &taskOneLevel); + taosArrayPush(pTasksList, &taskOneLevel); // add extra sink hasExtraSink = true; if (pStream->fixedSinkVgId == 0) { - if (mndAddShuffleSinkTasksToStream(pMnode, pStream) < 0) { + if (mndAddShuffleSinkTasksToStream(pMnode, taskOneLevel, pStream) < 0) { // TODO free return -1; } } else { - if (mndAddSinkTaskToStream(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) { + if (mndAddSinkTaskToStream(pStream, taskOneLevel, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) { // TODO free return -1; } @@ -355,7 +345,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { // inner level { SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pStream->tasks, &taskInnerLevel); + taosArrayPush(pTasksList, &taskInnerLevel); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); @@ -364,16 +354,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory, pStream->triggerParam, taskInnerLevel); + pInnerTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pStream->conf.fillHistory, pStream->conf.triggerParam, taskInnerLevel); if (pInnerTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - qDestroyQueryPlan(pPlan); return -1; } // dispatch - if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) { - qDestroyQueryPlan(pPlan); + SArray* pSinkNodeList = taosArrayGet(pTasksList, SINK_NODE_LEVEL); + if (mndAddDispatcherForInnerTask(pMnode, pStream, pSinkNodeList, pInnerTask) < 0) { return -1; } @@ -383,14 +372,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); return -1; } sdbRelease(pSdb, pVgroup); } else { if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) { sdbRelease(pSdb, pSnode); - qDestroyQueryPlan(pPlan); return -1; } } @@ -398,7 +385,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); return -1; } @@ -408,7 +394,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { // source level SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pStream->tasks, &taskSourceLevel); + taosArrayPush(pTasksList, &taskSourceLevel); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); @@ -430,11 +416,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { continue; } - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, 0, taskSourceLevel); + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pStream->conf.fillHistory, 0, taskSourceLevel); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); return -1; } @@ -443,23 +428,21 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); return -1; } - int32_t code = appendToUpstream(pTask, pInnerTask); + int32_t code = appendToDownstream(pTask, pInnerTask); sdbRelease(pSdb, pVgroup); if (code != TSDB_CODE_SUCCESS) { terrno = code; - qDestroyQueryPlan(pPlan); return -1; } } } else if (planTotLevel == 1) { // create exec stream task, since only one level, the exec task is also the source task SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pStream->tasks, &pTaskList); + taosArrayPush(pTasksList, &pTaskList); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); if (LIST_LENGTH(inner->pNodeList) != 1) { @@ -487,20 +470,36 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } // new stream task - int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pStream, plan, pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, hasExtraSink); + SArray* pSinkNodeTaskList = taosArrayGet(pTasksList, SINK_NODE_LEVEL); + int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pSinkNodeTaskList, pStream, plan, uid, TASK_LEVEL__SOURCE, pStream->conf.fillHistory, hasExtraSink); sdbRelease(pSdb, pVgroup); if (code != TSDB_CODE_SUCCESS) { - qDestroyQueryPlan(pPlan); return -1; } } } - qDestroyQueryPlan(pPlan); return 0; } +int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { + SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); + if (pPlan == NULL) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + + int32_t code = doScheduleStream(pStream->uid, pStream->tasks, pStream, pMnode, pPlan); + + if (code == TSDB_CODE_SUCCESS) { + code = doScheduleStream(pStream->batchTaskUid, pStream->pBatchTask, pStream, pMnode, pPlan); + } + + qDestroyQueryPlan(pPlan); + return code; +} + int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) { SSdb* pSdb = pMnode->pSdb; SVgObj* pVgroup = NULL; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 42ad9e24d5..42325cb926 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -555,20 +555,20 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.version = 1; streamObj.sql = taosStrdup(pCreate->sql); streamObj.smaId = smaObj.uid; - streamObj.watermark = pCreate->watermark; + streamObj.conf.watermark = pCreate->watermark; streamObj.deleteMark = pCreate->deleteMark; - streamObj.fillHistory = STREAM_FILL_HISTORY_ON; - streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE; - streamObj.triggerParam = pCreate->maxDelay; + streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON; + streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE; + streamObj.conf.triggerParam = pCreate->maxDelay; streamObj.ast = taosStrdup(smaObj.ast); // check the maxDelay - if (streamObj.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) { + if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) { int64_t msInterval = convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND); - streamObj.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY; + streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY; } - if (streamObj.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) { - streamObj.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY; + if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) { + streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY; } if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) { @@ -597,8 +597,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea .pAstRoot = pAst, .topicQuery = false, .streamQuery = true, - .triggerType = streamObj.trigger, - .watermark = streamObj.watermark, + .triggerType = streamObj.conf.trigger, + .watermark = streamObj.conf.watermark, .deleteMark = streamObj.deleteMark, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 39a1fa223f..f5cd673225 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -239,7 +239,7 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { } static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) { - int8_t trigger = pStream->trigger; + int8_t trigger = pStream->conf.trigger; if (trigger == STREAM_TRIGGER_AT_ONCE) { strcpy(dst, "at once"); } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -301,11 +301,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name)); pObj->status = 0; - pObj->igExpired = pCreate->igExpired; - pObj->trigger = pCreate->triggerType; - pObj->triggerParam = pCreate->maxDelay; - pObj->watermark = pCreate->watermark; - pObj->fillHistory = pCreate->fillHistory; + pObj->conf.igExpired = pCreate->igExpired; + pObj->conf.trigger = pCreate->triggerType; + pObj->conf.triggerParam = pCreate->maxDelay; + pObj->conf.watermark = pCreate->watermark; + pObj->conf.fillHistory = pCreate->fillHistory; pObj->deleteMark = pCreate->deleteMark; pObj->igCheckUpdate = pCreate->igUpdate; @@ -387,9 +387,9 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, .pAstRoot = pAst, .topicQuery = false, .streamQuery = true, - .triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger, - .watermark = pObj->watermark, - .igExpired = pObj->igExpired, + .triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger, + .watermark = pObj->conf.watermark, + .igExpired = pObj->conf.igExpired, .deleteMark = pObj->deleteMark, .igCheckUpdate = pObj->igCheckUpdate, }; @@ -459,8 +459,9 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); - int32_t sz = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < sz; j++) { + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfTasks; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); if (mndPersistTaskDeployReq(pTrans, pTask) < 0) { return -1; @@ -1157,7 +1158,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB } pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->watermark, false); + colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false); char trigger[20 + VARSTR_HEADER_SIZE] = {0}; char trigger2[20] = {0}; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a0caffd41f..6d8ec11f44 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -49,6 +49,106 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto return pTask; } +SStreamTask* streamTaskClone(SStreamTask* pTask) { + SStreamTask* pDst = taosMemoryCalloc(1, sizeof(SStreamTask)); + /* pDst-> + + SStreamId id; + int32_t totalLevel; + int8_t taskLevel; + int8_t outputType; + int16_t dispatchMsgType; + SStreamStatus status; + int32_t selfChildId; + int32_t nodeId; // vgroup id + SEpSet epSet; + SCheckpointInfo chkInfo; + STaskExec exec; + int8_t fillHistory; // fill history + int64_t ekey; // end ts key + int64_t endVer; // end version + + // children info + SArray* childEpInfo; // SArray + int32_t nextCheckId; + SArray* checkpointInfo; // SArray + + // output + union { + STaskDispatcherFixedEp fixedEpDispatcher; + STaskDispatcherShuffle shuffleDispatcher; + STaskSinkTb tbSink; + STaskSinkSma smaSink; + STaskSinkFetch fetchSink; + }; + + int8_t inputStatus; + int8_t outputStatus; + SStreamQueue* inputQueue; + SStreamQueue* outputQueue; + + // trigger + int8_t triggerStatus; + int64_t triggerParam; + void* timer; + SMsgCb* pMsgCb; // msg handle + SStreamState* pState; // state backend + + // the followings attributes don't be serialized + int32_t recoverTryingDownstream; + int32_t recoverWaitingUpstream; + int64_t checkReqId; + SArray* checkReqIds; // shuffle + int32_t refCnt; + int64_t checkpointingId; + int32_t checkpointAlignCnt; + struct SStreamMeta* pMeta; + + int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); + if (pTask->inputQueue) { + streamQueueClose(pTask->inputQueue); + } + if (pTask->outputQueue) { + streamQueueClose(pTask->outputQueue); + } + if (pTask->exec.qmsg) { + taosMemoryFree(pTask->exec.qmsg); + } + + if (pTask->exec.pExecutor) { + qDestroyTask(pTask->exec.pExecutor); + pTask->exec.pExecutor = NULL; + } + + if (pTask->exec.pWalReader != NULL) { + walCloseReader(pTask->exec.pWalReader); + } + + taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree); + if (pTask->outputType == TASK_OUTPUT__TABLE) { + tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); + taosMemoryFree(pTask->tbSink.pTSchema); + tSimpleHashCleanup(pTask->tbSink.pTblInfo); + } + + if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + taosArrayDestroy(pTask->checkReqIds); + pTask->checkReqIds = NULL; + } + + if (pTask->pState) { + streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); + } + + if (pTask->id.idStr != NULL) { + taosMemoryFree((void*)pTask->id.idStr); + } + + taosMemoryFree(pTask);*/ + return NULL; +} + int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) { if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; -- GitLab