提交 f2887278 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 8f61e2d6
...@@ -78,11 +78,11 @@ enum { ...@@ -78,11 +78,11 @@ enum {
TASK_TRIGGER_STATUS__ACTIVE, TASK_TRIGGER_STATUS__ACTIVE,
}; };
enum { typedef enum {
TASK_LEVEL__SOURCE = 1, TASK_LEVEL__SOURCE = 1,
TASK_LEVEL__AGG, TASK_LEVEL__AGG,
TASK_LEVEL__SINK, TASK_LEVEL__SINK,
}; } ETASK_LEVEL;
enum { enum {
TASK_OUTPUT__FIXED_DISPATCH = 1, TASK_OUTPUT__FIXED_DISPATCH = 1,
...@@ -284,13 +284,13 @@ struct SStreamTask { ...@@ -284,13 +284,13 @@ struct SStreamTask {
int16_t dispatchMsgType; int16_t dispatchMsgType;
SStreamStatus status; SStreamStatus status;
int32_t selfChildId; int32_t selfChildId;
int32_t nodeId; int32_t nodeId; // vgroup id
SEpSet epSet; SEpSet epSet;
SCheckpointInfo chkInfo; SCheckpointInfo chkInfo;
STaskExec exec; STaskExec exec;
int8_t fillHistory; // fill history
// fill history int64_t ekey; // end ts key
int8_t fillHistory; int64_t endVer; // end version
// children info // children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*> SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
...@@ -351,7 +351,7 @@ typedef struct SStreamMeta { ...@@ -351,7 +351,7 @@ typedef struct SStreamMeta {
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId); SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask);
......
...@@ -110,6 +110,8 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask ...@@ -110,6 +110,8 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask
return 0; return 0;
} }
#define SINK_NODE_LEVEL (0)
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
bool isShuffle = false; bool isShuffle = false;
...@@ -130,15 +132,18 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream ...@@ -130,15 +132,18 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
if (isShuffle) { if (isShuffle) {
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); SArray* pSinkNodes = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
int32_t sinkLvSize = taosArrayGetSize(sinkLv); int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodes);
for (int32_t i = 0; i < sz; i++) {
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); for (int32_t j = 0; j < numOfSinkNodes; j++) {
if (pLastLevelTask->nodeId == pVgInfo->vgId) { SStreamTask* pSinkTask = taosArrayGetP(pSinkNodes, j);
pVgInfo->taskId = pLastLevelTask->id.taskId; if (pSinkTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pSinkTask->id.taskId;
break; break;
} }
} }
...@@ -146,28 +151,33 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream ...@@ -146,28 +151,33 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
} else { } else {
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0); SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
// one sink only // one sink only
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); SStreamTask* lastLevelTask = taosArrayGetP(pSinkNodeList, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->id.taskId; STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; pDispatcher->taskId = lastLevelTask->id.taskId;
pDispatcher->nodeId = lastLevelTask->nodeId;
pDispatcher->epSet = lastLevelTask->epSet;
} }
return 0; return 0;
} }
int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen; int32_t msgLen;
pTask->nodeId = pVgroup->vgId; pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
plan->execNode.nodeId = pVgroup->vgId; plan->execNode.nodeId = pTask->nodeId;
plan->execNode.epSet = pTask->epSet; plan->execNode.epSet = pTask->epSet;
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
return 0; return 0;
} }
...@@ -210,35 +220,35 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { ...@@ -210,35 +220,35 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup; return pVgroup;
} }
// create sink node for each vgroup.
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL; void* pIter = NULL;
SArray* tasks = taosArrayGetP(pStream->tasks, 0); SArray* pTaskList = taosArrayGetP(pStream->tasks, 0);
while (1) { while (1) {
SVgObj* pVgroup = NULL; SVgObj* pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) {
break;
}
if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) { if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
SStreamTask* pTask = tNewStreamTask(pStream->uid); SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory);
if (pTask == NULL) { if (pTask == NULL) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(tasks, pTask);
mndAddTaskToTaskSet(pTaskList, pTask);
pTask->nodeId = pVgroup->vgId; pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// type
pTask->taskLevel = TASK_LEVEL__SINK;
// sink // sink
if (pStream->smaId != 0) { if (pStream->smaId != 0) {
pTask->outputType = TASK_OUTPUT__SMA; pTask->outputType = TASK_OUTPUT__SMA;
...@@ -253,6 +263,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -253,6 +263,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
return -1; return -1;
} }
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
return 0; return 0;
...@@ -260,15 +271,15 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -260,15 +271,15 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
SArray* tasks = taosArrayGetP(pStream->tasks, 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewStreamTask(pStream->uid); SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory);
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(tasks, pTask);
mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pStream->fixedSinkVgId; pTask->nodeId = pStream->fixedSinkVgId;
#if 0 #if 0
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
...@@ -276,9 +287,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -276,9 +287,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
} }
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
#endif #endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
pTask->taskLevel = TASK_LEVEL__SINK; pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
// sink // sink
if (pStream->smaId != 0) { if (pStream->smaId != 0) {
...@@ -294,16 +304,19 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -294,16 +304,19 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
return 0; return 0;
} }
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) { if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*)); int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES);
bool hasExtraSink = false; bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
...@@ -313,13 +326,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -313,13 +326,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1; return -1;
} }
bool multiTarget = pDbObj->cfg.numOfVgroups > 1; bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
sdbRelease(pSdb, pDbObj); sdbRelease(pSdb, pDbObj);
if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
/*if (true) {*/ SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink // add extra sink
hasExtraSink = true; hasExtraSink = true;
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
...@@ -334,6 +347,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -334,6 +347,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
} }
} }
} }
pStream->totalLevel = planTotLevel + hasExtraSink; pStream->totalLevel = planTotLevel + hasExtraSink;
if (planTotLevel > 1) { if (planTotLevel > 1) {
...@@ -350,22 +364,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -350,22 +364,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1; return -1;
} }
pInnerTask = tNewStreamTask(pStream->uid); pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory);
if (pInnerTask == NULL) { if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
pInnerTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
pInnerTask->childEpInfo = taosArrayInit(0, POINTER_BYTES);
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); pInnerTask->triggerParam = pStream->triggerParam; // trigger
pInnerTask->taskLevel = TASK_LEVEL__AGG;
// trigger
pInnerTask->triggerParam = pStream->triggerParam;
// dispatch // dispatch
if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) { if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) {
...@@ -377,7 +385,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -377,7 +385,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
if (pSnode == NULL) { if (pSnode == NULL) {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) { if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
...@@ -392,11 +400,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -392,11 +400,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
} }
} else { } else {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) { if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
} }
...@@ -422,30 +431,25 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -422,30 +431,25 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
continue; continue;
} }
SStreamTask* pTask = tNewStreamTask(pStream->uid); SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory);
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskSourceLevel, pTask);
mndAddTaskToTaskSet(taskSourceLevel, pTask);
pTask->triggerParam = 0; pTask->triggerParam = 0;
// source pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; // add fixed vg dispatch
pTask->taskLevel = TASK_LEVEL__SOURCE;
// add fixed vg dispatch
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId; pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId;
pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId; pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
pTask->fixedEpDispatcher.epSet = pInnerTask->epSet; pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
...@@ -458,6 +462,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -458,6 +462,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
pEpInfo->childId = pTask->selfChildId; pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet; pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId; pEpInfo->nodeId = pTask->nodeId;
...@@ -465,10 +470,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -465,10 +470,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
taosArrayPush(pInnerTask->childEpInfo, &pEpInfo); taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
} } else if (planTotLevel == 1) {
// create exec stream task, since only one level, the exec task is also the source task
if (planTotLevel == 1) { SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
...@@ -476,6 +480,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -476,6 +480,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
if (plan->subplanType != SUBPLAN_TYPE_SCAN) { if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
...@@ -486,26 +491,25 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -486,26 +491,25 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
while (1) { while (1) {
SVgObj* pVgroup; SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) {
break;
}
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
SStreamTask* pTask = tNewStreamTask(pStream->uid); // new stream task
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory);
if (pTask == NULL) { if (pTask == NULL) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskOneLevel, pTask);
// source
pTask->taskLevel = TASK_LEVEL__SOURCE;
// trigger mndAddTaskToTaskSet(taskOneLevel, pTask);
pTask->triggerParam = pStream->triggerParam; pTask->triggerParam = pStream->triggerParam; // trigger
// sink or dispatch // sink or dispatch
if (hasExtraSink) { if (hasExtraSink) {
...@@ -514,14 +518,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -514,14 +518,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
mndAddSinkToTask(pMnode, pStream, pTask); mndAddSinkToTask(pMnode, pStream, pTask);
} }
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
} }
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return 0; return 0;
} }
......
...@@ -700,6 +700,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -700,6 +700,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (pStream->sourceDbUid == streamObj.sourceDbUid) { if (pStream->sourceDbUid == streamObj.sourceDbUid) {
++numOfStream; ++numOfStream;
} }
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) { if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
...@@ -723,6 +724,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -723,6 +724,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
pDb = NULL; pDb = NULL;
goto _OVER; goto _OVER;
} }
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "tstream.h" #include "tstream.h"
#include "wal.h" #include "wal.h"
SStreamTask* tNewStreamTask(int64_t streamId) { SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
return NULL; return NULL;
...@@ -25,6 +25,8 @@ SStreamTask* tNewStreamTask(int64_t streamId) { ...@@ -25,6 +25,8 @@ SStreamTask* tNewStreamTask(int64_t streamId) {
pTask->id.taskId = tGenIdPI32(); pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId; pTask->id.streamId = streamId;
pTask->taskLevel = taskLevel;
pTask->fillHistory = fillHistory;
char buf[128] = {0}; char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId); sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册