提交 2977a4d0 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 3628888e
......@@ -637,6 +637,14 @@ typedef struct {
SMqSubActionLogEntry* pLogEntry;
} SMqRebOutputObj;
typedef struct SStreamConf {
int8_t igExpired;
int8_t trigger;
int8_t fillHistory;
int64_t triggerParam;
int64_t watermark;
} SStreamConf;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
// ctl
......@@ -650,12 +658,7 @@ typedef struct {
// info
int64_t uid;
int8_t status;
// config
int8_t igExpired;
int8_t trigger;
int8_t fillHistory;
int64_t triggerParam;
int64_t watermark;
SStreamConf conf;
// source and target
int64_t sourceDbUid;
int64_t targetDbUid;
......@@ -665,14 +668,18 @@ typedef struct {
int64_t targetStbUid;
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
SVgObj fixedSinkVg;
int32_t fixedSinkVgId; // 0 for shuffle
// transformation
char* sql;
char* ast;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* pBatchTask; // generate the results for already stored ts data
int64_t batchTaskUid; // stream task for history ts data
SSchemaWrapper outputSchema;
SSchemaWrapper tagSchema;
......
......@@ -30,11 +30,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.trigger) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.watermark) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
......@@ -97,11 +97,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.trigger) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.watermark) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
......
......@@ -367,10 +367,10 @@ void dumpStream(SSdb *pSdb, SJson *json) {
tjsonAddStringToObject(item, "smaId", i642str(pObj->smaId));
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
tjsonAddStringToObject(item, "status", i642str(pObj->status));
tjsonAddStringToObject(item, "igExpired", i642str(pObj->igExpired));
tjsonAddStringToObject(item, "trigger", i642str(pObj->trigger));
tjsonAddStringToObject(item, "triggerParam", i642str(pObj->triggerParam));
tjsonAddStringToObject(item, "watermark", i642str(pObj->watermark));
tjsonAddStringToObject(item, "igExpired", i642str(pObj->conf.igExpired));
tjsonAddStringToObject(item, "trigger", i642str(pObj->conf.trigger));
tjsonAddStringToObject(item, "triggerParam", i642str(pObj->conf.triggerParam));
tjsonAddStringToObject(item, "watermark", i642str(pObj->conf.watermark));
tjsonAddStringToObject(item, "sourceDbUid", i642str(pObj->sourceDbUid));
tjsonAddStringToObject(item, "targetDbUid", i642str(pObj->targetDbUid));
tjsonAddStringToObject(item, "sourceDb", mndGetDbStr(pObj->sourceDb));
......
......@@ -22,9 +22,10 @@
#include "tname.h"
#include "tuuid.h"
#define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode;
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup);
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
......@@ -100,9 +101,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
return 0;
}
#define SINK_NODE_LEVEL (0)
int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, SStreamTask* pTask) {
bool isShuffle = false;
if (pStream->fixedSinkVgId == 0) {
......@@ -120,7 +119,6 @@ int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStrea
sdbRelease(pMnode->pSdb, pDb);
}
SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
if (isShuffle) {
......@@ -187,6 +185,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan,
return 0;
}
// todo random choose a node to do compute
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
void* pIter = NULL;
SVgObj* pVgroup = NULL;
......@@ -203,7 +202,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
}
// create sink node for each vgroup.
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
......@@ -219,17 +218,15 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
continue;
}
mndAddSinkTaskToStream(pStream, pMnode, pVgroup->vgId, pVgroup);
mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup);
sdbRelease(pSdb, pVgroup);
}
return 0;
}
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) {
SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory, 0, pTaskList);
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) {
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->conf.fillHistory, 0, pTaskList);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -245,17 +242,18 @@ static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStr
return 0;
}
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream,
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, SStreamObj* pStream,
SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory,
bool hasExtraSink) {
SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->triggerParam, pTaskList);
SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->conf.triggerParam, pTaskList);
if (pTask == NULL) {
return terrno;
}
// sink or dispatch
if (hasExtraSink) {
mndAddDispatcherForInnerTask(pMnode, pStream, pTask);
mndAddDispatcherForInnerTask(pMnode, pStream, pSinkTaskList, pTask);
} else {
mndSetSinkTaskInfo(pStream, pTask);
}
......@@ -292,31 +290,23 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
}
int32_t appendToUpstream(SStreamTask* pTask, SStreamTask* pUpstream) {
int32_t appendToDownstream(SStreamTask* pTask, SStreamTask* pDownstream) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if(pUpstream->childEpInfo == NULL) {
pUpstream->childEpInfo = taosArrayInit(4, POINTER_BYTES);
if(pDownstream->childEpInfo == NULL) {
pDownstream->childEpInfo = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pUpstream->childEpInfo, &pEpInfo);
taosArrayPush(pDownstream->childEpInfo, &pEpInfo);
return TSDB_CODE_SUCCESS;
}
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
static int32_t doScheduleStream(uint64_t uid, SArray* pTasksList, SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) {
SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES);
bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
......@@ -331,17 +321,17 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskOneLevel);
taosArrayPush(pTasksList, &taskOneLevel);
// add extra sink
hasExtraSink = true;
if (pStream->fixedSinkVgId == 0) {
if (mndAddShuffleSinkTasksToStream(pMnode, pStream) < 0) {
if (mndAddShuffleSinkTasksToStream(pMnode, taskOneLevel, pStream) < 0) {
// TODO free
return -1;
}
} else {
if (mndAddSinkTaskToStream(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) {
if (mndAddSinkTaskToStream(pStream, taskOneLevel, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) {
// TODO free
return -1;
}
......@@ -355,7 +345,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
// inner level
{
SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskInnerLevel);
taosArrayPush(pTasksList, &taskInnerLevel);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
......@@ -364,16 +354,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory, pStream->triggerParam, taskInnerLevel);
pInnerTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pStream->conf.fillHistory, pStream->conf.triggerParam, taskInnerLevel);
if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qDestroyQueryPlan(pPlan);
return -1;
}
// dispatch
if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) {
qDestroyQueryPlan(pPlan);
SArray* pSinkNodeList = taosArrayGet(pTasksList, SINK_NODE_LEVEL);
if (mndAddDispatcherForInnerTask(pMnode, pStream, pSinkNodeList, pInnerTask) < 0) {
return -1;
}
......@@ -383,14 +372,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pVgroup);
} else {
if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) {
sdbRelease(pSdb, pSnode);
qDestroyQueryPlan(pPlan);
return -1;
}
}
......@@ -398,7 +385,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
......@@ -408,7 +394,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
// source level
SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskSourceLevel);
taosArrayPush(pTasksList, &taskSourceLevel);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
......@@ -430,11 +416,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
continue;
}
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, 0, taskSourceLevel);
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pStream->conf.fillHistory, 0, taskSourceLevel);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
......@@ -443,23 +428,21 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
int32_t code = appendToUpstream(pTask, pInnerTask);
int32_t code = appendToDownstream(pTask, pInnerTask);
sdbRelease(pSdb, pVgroup);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qDestroyQueryPlan(pPlan);
return -1;
}
}
} else if (planTotLevel == 1) {
// create exec stream task, since only one level, the exec task is also the source task
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &pTaskList);
taosArrayPush(pTasksList, &pTaskList);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
if (LIST_LENGTH(inner->pNodeList) != 1) {
......@@ -487,20 +470,36 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
// new stream task
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pStream, plan, pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, hasExtraSink);
SArray* pSinkNodeTaskList = taosArrayGet(pTasksList, SINK_NODE_LEVEL);
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pSinkNodeTaskList, pStream, plan, uid, TASK_LEVEL__SOURCE, pStream->conf.fillHistory, hasExtraSink);
sdbRelease(pSdb, pVgroup);
if (code != TSDB_CODE_SUCCESS) {
qDestroyQueryPlan(pPlan);
return -1;
}
}
}
qDestroyQueryPlan(pPlan);
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
int32_t code = doScheduleStream(pStream->uid, pStream->tasks, pStream, pMnode, pPlan);
if (code == TSDB_CODE_SUCCESS) {
code = doScheduleStream(pStream->batchTaskUid, pStream->pBatchTask, pStream, pMnode, pPlan);
}
qDestroyQueryPlan(pPlan);
return code;
}
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL;
......
......@@ -555,20 +555,20 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.version = 1;
streamObj.sql = taosStrdup(pCreate->sql);
streamObj.smaId = smaObj.uid;
streamObj.watermark = pCreate->watermark;
streamObj.conf.watermark = pCreate->watermark;
streamObj.deleteMark = pCreate->deleteMark;
streamObj.fillHistory = STREAM_FILL_HISTORY_ON;
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
streamObj.triggerParam = pCreate->maxDelay;
streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON;
streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
streamObj.conf.triggerParam = pCreate->maxDelay;
streamObj.ast = taosStrdup(smaObj.ast);
// check the maxDelay
if (streamObj.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval = convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND);
streamObj.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
}
if (streamObj.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
streamObj.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
}
if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) {
......@@ -597,8 +597,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = streamObj.trigger,
.watermark = streamObj.watermark,
.triggerType = streamObj.conf.trigger,
.watermark = streamObj.conf.watermark,
.deleteMark = streamObj.deleteMark,
};
......
......@@ -239,7 +239,7 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
}
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->trigger;
int8_t trigger = pStream->conf.trigger;
if (trigger == STREAM_TRIGGER_AT_ONCE) {
strcpy(dst, "at once");
} else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
......@@ -301,11 +301,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
pObj->status = 0;
pObj->igExpired = pCreate->igExpired;
pObj->trigger = pCreate->triggerType;
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory;
pObj->conf.igExpired = pCreate->igExpired;
pObj->conf.trigger = pCreate->triggerType;
pObj->conf.triggerParam = pCreate->maxDelay;
pObj->conf.watermark = pCreate->watermark;
pObj->conf.fillHistory = pCreate->fillHistory;
pObj->deleteMark = pCreate->deleteMark;
pObj->igCheckUpdate = pCreate->igUpdate;
......@@ -387,9 +387,9 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
.igExpired = pObj->igExpired,
.triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
.watermark = pObj->conf.watermark,
.igExpired = pObj->conf.igExpired,
.deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate,
};
......@@ -459,8 +459,9 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
return -1;
......@@ -1157,7 +1158,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->watermark, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
char trigger[20 + VARSTR_HEADER_SIZE] = {0};
char trigger2[20] = {0};
......
......@@ -49,6 +49,106 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
return pTask;
}
SStreamTask* streamTaskClone(SStreamTask* pTask) {
SStreamTask* pDst = taosMemoryCalloc(1, sizeof(SStreamTask));
/* pDst->
SStreamId id;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
SStreamStatus status;
int32_t selfChildId;
int32_t nodeId; // vgroup id
SEpSet epSet;
SCheckpointInfo chkInfo;
STaskExec exec;
int8_t fillHistory; // fill history
int64_t ekey; // end ts key
int64_t endVer; // end version
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
// output
union {
STaskDispatcherFixedEp fixedEpDispatcher;
STaskDispatcherShuffle shuffleDispatcher;
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
int8_t inputStatus;
int8_t outputStatus;
SStreamQueue* inputQueue;
SStreamQueue* outputQueue;
// trigger
int8_t triggerStatus;
int64_t triggerParam;
void* timer;
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
// the followings attributes don't be serialized
int32_t recoverTryingDownstream;
int32_t recoverWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
struct SStreamMeta* pMeta;
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue);
}
if (pTask->outputQueue) {
streamQueueClose(pTask->outputQueue);
}
if (pTask->exec.qmsg) {
taosMemoryFree(pTask->exec.qmsg);
}
if (pTask->exec.pExecutor) {
qDestroyTask(pTask->exec.pExecutor);
pTask->exec.pExecutor = NULL;
}
if (pTask->exec.pWalReader != NULL) {
walCloseReader(pTask->exec.pWalReader);
}
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
}
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
}
if (pTask->pState) {
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
}
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);
}
taosMemoryFree(pTask);*/
return NULL;
}
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册