mndScheduler.c 17.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
L
Liu Jicong 已提交
22
#include "mndSnode.h"
L
Liu Jicong 已提交
23
#include "mndStb.h"
L
Liu Jicong 已提交
24
#include "mndStream.h"
L
Liu Jicong 已提交
25 26 27 28 29
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
X
Xiaoyu Wang 已提交
30
#include "parser.h"
L
Liu Jicong 已提交
31 32
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
33 34
#include "tuuid.h"

L
Liu Jicong 已提交
35
extern bool tsDeployOnSnode;
L
Liu Jicong 已提交
36

L
Liu Jicong 已提交
37 38
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
  int32_t childId = taosArrayGetSize(pArray);
L
Liu Jicong 已提交
39
  pTask->selfChildId = childId;
L
Liu Jicong 已提交
40 41 42 43
  taosArrayPush(pArray, &pTask);
  return 0;
}

44
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
45
                           int64_t watermark, int64_t deleteMark) {
L
Liu Jicong 已提交
46 47 48 49 50 51 52 53 54
  SNode*      pAst = NULL;
  SQueryPlan* pPlan = NULL;
  terrno = TSDB_CODE_SUCCESS;

  if (nodesStringToNode(ast, &pAst) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

55
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
X
Xiaoyu Wang 已提交
56 57 58 59
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

L
Liu Jicong 已提交
60 61 62 63 64
  SPlanContext cxt = {
      .pAstRoot = pAst,
      .topicQuery = false,
      .streamQuery = true,
      .rSmaQuery = true,
L
Liu Jicong 已提交
65
      .triggerType = triggerType,
L
Liu Jicong 已提交
66
      .watermark = watermark,
67
      .deleteMark = deleteMark,
L
Liu Jicong 已提交
68
  };
L
Liu Jicong 已提交
69

L
Liu Jicong 已提交
70 71 72 73 74 75 76 77 78 79
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
  if (levelNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }
80
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
81 82 83 84 85 86 87

  int32_t opNum = LIST_LENGTH(inner->pNodeList);
  if (opNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

88 89
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
L
Liu Jicong 已提交
90 91 92 93 94 95
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

END:
  if (pAst) nodesDestroyNode(pAst);
96
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
L
Liu Jicong 已提交
97 98 99
  return terrno;
}

100
int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
L
Liu Jicong 已提交
101
  if (pStream->smaId != 0) {
102
    pTask->outputType = TASK_OUTPUT__SMA;
L
Liu Jicong 已提交
103 104
    pTask->smaSink.smaId = pStream->smaId;
  } else {
105
    pTask->outputType = TASK_OUTPUT__TABLE;
L
Liu Jicong 已提交
106 107 108 109 110 111 112
    pTask->tbSink.stbUid = pStream->targetStbUid;
    memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
    pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
  }
  return 0;
}

113 114
#define SINK_NODE_LEVEL (0)

115
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
L
Liu Jicong 已提交
116 117
  bool isShuffle = false;

L
Liu Jicong 已提交
118 119
  if (pStream->fixedSinkVgId == 0) {
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
wmmhello's avatar
wmmhello 已提交
120
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
L
Liu Jicong 已提交
121
      isShuffle = true;
122
      pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
L
Liu Jicong 已提交
123
      pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
L
Liu Jicong 已提交
124 125 126
      if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
        return -1;
      }
127
    }
L
Liu Jicong 已提交
128

129
    sdbRelease(pMnode->pSdb, pDb);
L
Liu Jicong 已提交
130
  }
131

L
Liu Jicong 已提交
132
  if (isShuffle) {
L
Liu Jicong 已提交
133
    memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
134
    SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
135 136 137 138 139 140

    SArray* pSinkNodes = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
    int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodes);

    int32_t numOfVgroups = taosArrayGetSize(pVgs);
    for (int32_t i = 0; i < numOfVgroups; i++) {
141
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
142 143 144 145 146

      for (int32_t j = 0; j < numOfSinkNodes; j++) {
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodes, j);
        if (pSinkTask->nodeId == pVgInfo->vgId) {
          pVgInfo->taskId = pSinkTask->id.taskId;
147
          break;
L
Liu Jicong 已提交
148 149 150
        }
      }
    }
151
  } else {
152
    pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
153
    pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
154 155
    SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);

156
    // one sink only
157 158 159 160 161 162
    SStreamTask* lastLevelTask = taosArrayGetP(pSinkNodeList, 0);
    STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;

    pDispatcher->taskId = lastLevelTask->id.taskId;
    pDispatcher->nodeId = lastLevelTask->nodeId;
    pDispatcher->epSet = lastLevelTask->epSet;
L
Liu Jicong 已提交
163
  }
164

L
Liu Jicong 已提交
165 166 167
  return 0;
}

168
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
L
Liu Jicong 已提交
169
  int32_t msgLen;
170

L
Liu Jicong 已提交
171 172 173
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

174
  plan->execNode.nodeId = pTask->nodeId;
L
Liu Jicong 已提交
175
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
176
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
177 178 179
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
180

L
Liu Jicong 已提交
181 182 183
  return 0;
}

L
Liu Jicong 已提交
184
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
L
Liu Jicong 已提交
185
  SSnodeObj* pObj = NULL;
L
Liu Jicong 已提交
186 187 188
  void*      pIter = NULL;
  // TODO random fetch
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
L
Liu Jicong 已提交
189 190 191
  return pObj;
}

192
int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
193
  int32_t msgLen;
L
Liu Jicong 已提交
194

L
Liu Jicong 已提交
195
  pTask->nodeId = SNODE_HANDLE;
L
Liu Jicong 已提交
196 197
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

L
Liu Jicong 已提交
198
  plan->execNode.nodeId = SNODE_HANDLE;
L
Liu Jicong 已提交
199
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
202 203 204
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
205 206 207
  return 0;
}

L
Liu Jicong 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

223
// create sink node for each vgroup.
224
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
L
Liu Jicong 已提交
225 226
  SSdb*   pSdb = pMnode->pSdb;
  void*   pIter = NULL;
227
  SArray* pTaskList = taosArrayGetP(pStream->tasks, 0);
L
Liu Jicong 已提交
228 229

  while (1) {
230
    SVgObj* pVgroup = NULL;
L
Liu Jicong 已提交
231
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
232 233 234 235
    if (pIter == NULL) {
      break;
    }

S
Shengliang Guan 已提交
236
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
L
Liu Jicong 已提交
237 238 239
      sdbRelease(pSdb, pVgroup);
      continue;
    }
S
Shengliang Guan 已提交
240

241
    SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory);
L
Liu Jicong 已提交
242
    if (pTask == NULL) {
243
      sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
244 245 246 247
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

248
    mndAddTaskToTaskSet(pTaskList, pTask);
L
Liu Jicong 已提交
249 250 251 252
    pTask->nodeId = pVgroup->vgId;
    pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

    // sink
L
Liu Jicong 已提交
253
    if (pStream->smaId != 0) {
254
      pTask->outputType = TASK_OUTPUT__SMA;
L
Liu Jicong 已提交
255
      pTask->smaSink.smaId = pStream->smaId;
L
Liu Jicong 已提交
256
    } else {
257
      pTask->outputType = TASK_OUTPUT__TABLE;
L
Liu Jicong 已提交
258
      pTask->tbSink.stbUid = pStream->targetStbUid;
L
Liu Jicong 已提交
259
      memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
L
Liu Jicong 已提交
260
      pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
L
Liu Jicong 已提交
261 262 263 264
      if (pTask->tbSink.pSchemaWrapper == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
L
Liu Jicong 已提交
265
    }
266

267
    sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
268 269 270 271
  }
  return 0;
}

272
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
L
Liu Jicong 已提交
273
  SArray*      tasks = taosArrayGetP(pStream->tasks, 0);
274
  SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory);
L
Liu Jicong 已提交
275 276 277 278 279
  if (pTask == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

280
  mndAddTaskToTaskSet(tasks, pTask);
L
Liu Jicong 已提交
281
  pTask->nodeId = pStream->fixedSinkVgId;
282

L
Liu Jicong 已提交
283
#if 0
L
Liu Jicong 已提交
284 285 286 287 288
  SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
  if (pVgroup == NULL) {
    return -1;
  }
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
289
#endif
L
Liu Jicong 已提交
290

291
  pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
L
Liu Jicong 已提交
292 293

  // sink
L
Liu Jicong 已提交
294
  if (pStream->smaId != 0) {
295
    pTask->outputType = TASK_OUTPUT__SMA;
L
Liu Jicong 已提交
296 297
    pTask->smaSink.smaId = pStream->smaId;
  } else {
298
    pTask->outputType = TASK_OUTPUT__TABLE;
L
Liu Jicong 已提交
299
    pTask->tbSink.stbUid = pStream->targetStbUid;
L
Liu Jicong 已提交
300
    memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
L
Liu Jicong 已提交
301
    pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
L
Liu Jicong 已提交
302
  }
L
Liu Jicong 已提交
303

L
Liu Jicong 已提交
304 305 306
  return 0;
}

307 308


309
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
310 311
  SSdb* pSdb = pMnode->pSdb;

L
Liu Jicong 已提交
312 313 314 315 316
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
317

318 319
  int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
  pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES);
L
Liu Jicong 已提交
320

321 322 323
  bool    hasExtraSink = false;
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
L
Liu Jicong 已提交
324 325 326 327
  if (pDbObj == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
328

329
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
L
Liu Jicong 已提交
330
  sdbRelease(pSdb, pDbObj);
331

332
  if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
333
    SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
L
Liu Jicong 已提交
334
    taosArrayPush(pStream->tasks, &taskOneLevel);
335

L
Liu Jicong 已提交
336 337 338
    // add extra sink
    hasExtraSink = true;
    if (pStream->fixedSinkVgId == 0) {
339 340 341 342
      if (mndAddShuffleSinkTasksToStream(pMnode, pStream) < 0) {
        // TODO free
        return -1;
      }
L
Liu Jicong 已提交
343
    } else {
344 345 346 347
      if (mndAddFixedSinkTaskToStream(pMnode, pStream) < 0) {
        // TODO free
        return -1;
      }
L
Liu Jicong 已提交
348 349
    }
  }
350

L
Liu Jicong 已提交
351
  pStream->totalLevel = planTotLevel + hasExtraSink;
352

L
Liu Jicong 已提交
353
  if (planTotLevel > 1) {
L
Liu Jicong 已提交
354 355
    SStreamTask* pInnerTask;
    // inner level
L
Liu Jicong 已提交
356 357 358 359
    {
      SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
      taosArrayPush(pStream->tasks, &taskInnerLevel);

360 361
      SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
      SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
362 363 364 365
      if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
        terrno = TSDB_CODE_QRY_INVALID_INPUT;
        return -1;
      }
L
Liu Jicong 已提交
366

367
      pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory);
368 369 370 371 372
      if (pInnerTask == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        qDestroyQueryPlan(pPlan);
        return -1;
      }
373

L
Liu Jicong 已提交
374
      mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
375 376
      pInnerTask->childEpInfo = taosArrayInit(0, POINTER_BYTES);
      pInnerTask->triggerParam = pStream->triggerParam;      // trigger
377

L
Liu Jicong 已提交
378
      // dispatch
379
      if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) {
380 381 382
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
383

L
Liu Jicong 已提交
384
      if (tsDeployOnSnode) {
L
Liu Jicong 已提交
385 386 387
        SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
        if (pSnode == NULL) {
          SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
388
          if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
389 390 391 392
            sdbRelease(pSdb, pVgroup);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
393
          sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
394
        } else {
395
          if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) {
L
Liu Jicong 已提交
396 397 398 399 400 401 402
            sdbRelease(pSdb, pSnode);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
        }
      } else {
        SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
403
        if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
404 405 406 407
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
408

409
        sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
410 411 412
      }
    }

L
Liu Jicong 已提交
413
    // source level
L
Liu Jicong 已提交
414 415 416
    SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
    taosArrayPush(pStream->tasks, &taskSourceLevel);

417 418
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
    SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
419 420 421 422
    if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
423 424 425 426 427 428

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
S
Shengliang Guan 已提交
429
      if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
L
Liu Jicong 已提交
430 431 432
        sdbRelease(pSdb, pVgroup);
        continue;
      }
S
Shengliang Guan 已提交
433

434
      SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory);
L
Liu Jicong 已提交
435
      if (pTask == NULL) {
436 437 438 439 440
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
441

442
      mndAddTaskToTaskSet(taskSourceLevel, pTask);
L
Liu Jicong 已提交
443 444
      pTask->triggerParam = 0;

445
      pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; // add fixed vg dispatch
446
      pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
L
Liu Jicong 已提交
447

448
      pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId;
L
Liu Jicong 已提交
449 450
      pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
      pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
L
Liu Jicong 已提交
451

452
      if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
453 454 455 456
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
457 458 459 460 461 462 463 464

      SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
      if (pEpInfo == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
465

L
Liu Jicong 已提交
466 467 468
      pEpInfo->childId = pTask->selfChildId;
      pEpInfo->epSet = pTask->epSet;
      pEpInfo->nodeId = pTask->nodeId;
469
      pEpInfo->taskId = pTask->id.taskId;
L
Liu Jicong 已提交
470
      taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
471
      sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
472
    }
473 474 475
  } else if (planTotLevel == 1) {
    // create exec stream task, since only one level, the exec task is also the source task
    SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
L
Liu Jicong 已提交
476 477
    taosArrayPush(pStream->tasks, &taskOneLevel);

478
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
479 480 481 482
    if (LIST_LENGTH(inner->pNodeList) != 1) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
483

484
    SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
485 486 487 488
    if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
489 490 491 492 493

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
494 495 496 497
      if (pIter == NULL) {
        break;
      }

S
Shengliang Guan 已提交
498
      if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
L
Liu Jicong 已提交
499 500 501
        sdbRelease(pSdb, pVgroup);
        continue;
      }
S
Shengliang Guan 已提交
502

503 504
      // new stream task
      SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory);
505 506 507 508 509
      if (pTask == NULL) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
510

511 512
      mndAddTaskToTaskSet(taskOneLevel, pTask);
      pTask->triggerParam = pStream->triggerParam; // trigger
513

L
Liu Jicong 已提交
514 515
      // sink or dispatch
      if (hasExtraSink) {
516
        mndAddDispatcherToInnerTask(pMnode, pStream, pTask);
L
Liu Jicong 已提交
517
      } else {
518
        mndAddSinkToTask(pMnode, pStream, pTask);
L
Liu Jicong 已提交
519 520
      }

521
      if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
522 523 524 525
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
526

527
      sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
528 529
    }
  }
530

L
Liu Jicong 已提交
531
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
532 533
  return 0;
}
L
Liu Jicong 已提交
534 535

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
536 537
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
L
Liu Jicong 已提交
538
  SQueryPlan* pPlan = NULL;
539
  SSubplan*   pSubplan = NULL;
L
Liu Jicong 已提交
540

L
Liu Jicong 已提交
541
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
542 543 544 545 546
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
    if (pPlan == NULL) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
547

L
Liu Jicong 已提交
548 549 550
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
    if (levelNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
551
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
552 553
      return -1;
    }
L
Liu Jicong 已提交
554

555
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
556

557
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
L
Liu Jicong 已提交
558 559
    if (opNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
560
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
561 562
      return -1;
    }
563 564

    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
L
Liu Jicong 已提交
565
  }
L
Liu Jicong 已提交
566 567 568 569

  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
570 571 572 573
    if (pIter == NULL) {
      break;
    }

S
Shengliang Guan 已提交
574
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
L
Liu Jicong 已提交
575 576 577 578 579 580
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;

581
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
L
Liu Jicong 已提交
582 583
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
    pVgEp->vgId = pVgroup->vgId;
584
    taosArrayPush(pSub->unassignedVgs, &pVgEp);
585

586
    mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
L
Liu Jicong 已提交
587

L
Liu Jicong 已提交
588
    if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
589 590
      int32_t msgLen;

591 592
      pSubplan->execNode.epSet = pVgEp->epSet;
      pSubplan->execNode.nodeId = pVgEp->vgId;
L
Liu Jicong 已提交
593

594
      if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
595 596 597 598 599 600
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        terrno = TSDB_CODE_QRY_INVALID_INPUT;
        return -1;
      }
    } else {
601
      pVgEp->qmsg = taosStrdup("");
L
Liu Jicong 已提交
602
    }
S
Shengliang Guan 已提交
603 604

    sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
605 606
  }

X
Xiaoyu Wang 已提交
607
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
608 609
  return 0;
}