提交 3628888e 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 cd2d225c
......@@ -351,7 +351,7 @@ typedef struct SStreamMeta {
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
......
......@@ -27,13 +27,6 @@ extern bool tsDeployOnSnode;
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->selfChildId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {
SNode* pAst = NULL;
......@@ -236,14 +229,12 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) {
SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory);
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory, 0, pTaskList);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
mndAddToTaskset(pTaskList, pTask);
pTask->nodeId = vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
mndSetSinkTaskInfo(pStream, pTask);
......@@ -257,14 +248,11 @@ static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStr
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream,
SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory,
bool hasExtraSink) {
SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory);
SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->triggerParam, pTaskList);
if (pTask == NULL) {
return terrno;
}
mndAddToTaskset(pTaskList, pTask);
pTask->triggerParam = pStream->triggerParam; // trigger
// sink or dispatch
if (hasExtraSink) {
mndAddDispatcherForInnerTask(pMnode, pStream, pTask);
......@@ -376,16 +364,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory);
pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory, pStream->triggerParam, taskInnerLevel);
if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qDestroyQueryPlan(pPlan);
return -1;
}
mndAddToTaskset(taskInnerLevel, pInnerTask);
pInnerTask->triggerParam = pStream->triggerParam; // trigger
// dispatch
if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) {
qDestroyQueryPlan(pPlan);
......@@ -445,7 +430,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
continue;
}
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory);
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, 0, taskSourceLevel);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
......@@ -453,11 +438,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
mndAddToTaskset(taskSourceLevel, pTask);
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo(pTask, pInnerTask);
pTask->triggerParam = 0;
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
......
......@@ -17,7 +17,14 @@
#include "tstream.h"
#include "wal.h"
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory) {
static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->selfChildId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -28,6 +35,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->id.streamId = streamId;
pTask->taskLevel = taskLevel;
pTask->fillHistory = fillHistory;
pTask->triggerParam = triggerParam;
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
......@@ -37,6 +45,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
mndAddToTaskset(pTaskList, pTask);
return pTask;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册