mndScheduler.c 17.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
L
Liu Jicong 已提交
23
#include "mndSnode.h"
L
Liu Jicong 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndStream.h"
L
Liu Jicong 已提交
26 27 28 29 30
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
X
Xiaoyu Wang 已提交
31
#include "parser.h"
L
Liu Jicong 已提交
32 33
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
34 35
#include "tuuid.h"

L
Liu Jicong 已提交
36 37
extern bool tsStreamSchedV;

L
Liu Jicong 已提交
38 39 40 41 42 43 44
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
  int32_t childId = taosArrayGetSize(pArray);
  pTask->childId = childId;
  taosArrayPush(pArray, &pTask);
  return 0;
}

45 46
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
                           int64_t watermark, double filesFactor) {
L
Liu Jicong 已提交
47 48 49 50 51 52 53 54 55
  SNode*      pAst = NULL;
  SQueryPlan* pPlan = NULL;
  terrno = TSDB_CODE_SUCCESS;

  if (nodesStringToNode(ast, &pAst) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

56
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
X
Xiaoyu Wang 已提交
57 58 59 60
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

L
Liu Jicong 已提交
61 62 63 64 65
  SPlanContext cxt = {
      .pAstRoot = pAst,
      .topicQuery = false,
      .streamQuery = true,
      .rSmaQuery = true,
L
Liu Jicong 已提交
66
      .triggerType = triggerType,
L
Liu Jicong 已提交
67 68
      .watermark = watermark,
  };
L
Liu Jicong 已提交
69

L
Liu Jicong 已提交
70 71 72 73 74 75 76 77 78 79
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
  if (levelNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }
80
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
81 82 83 84 85 86 87

  int32_t opNum = LIST_LENGTH(inner->pNodeList);
  if (opNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

88 89
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
L
Liu Jicong 已提交
90 91 92 93 94 95
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

END:
  if (pAst) nodesDestroyNode(pAst);
96
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
L
Liu Jicong 已提交
97 98 99
  return terrno;
}

L
Liu Jicong 已提交
100
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
H
Hongze Cheng 已提交
101 102
  SEncoder encoder;
  tEncoderInit(&encoder, NULL, 0);
L
Liu Jicong 已提交
103
  tEncodeSStreamTask(&encoder, pTask);
L
Liu Jicong 已提交
104 105
  int32_t size = encoder.pos;
  int32_t tlen = sizeof(SMsgHead) + size;
H
Hongze Cheng 已提交
106
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
107
  void* buf = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
108 109 110 111
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
112
  ((SMsgHead*)buf)->vgId = htonl(nodeId);
L
Liu Jicong 已提交
113
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
H
Hongze Cheng 已提交
114
  tEncoderInit(&encoder, abuf, size);
L
Liu Jicong 已提交
115
  tEncodeSStreamTask(&encoder, pTask);
H
Hongze Cheng 已提交
116
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
117 118 119 120 121

  STransAction action = {0};
  memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
122
  action.msgType = type;
L
Liu Jicong 已提交
123
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
124
    taosMemoryFree(buf);
L
Liu Jicong 已提交
125 126 127 128 129
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
130 131 132
int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
  pTask->dispatchType = TASK_DISPATCH__NONE;
  // sink
L
Liu Jicong 已提交
133
  if (pStream->smaId != 0) {
L
Liu Jicong 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
    pTask->sinkType = TASK_SINK__SMA;
    pTask->smaSink.smaId = pStream->smaId;
  } else {
    pTask->sinkType = TASK_SINK__TABLE;
    pTask->tbSink.stbUid = pStream->targetStbUid;
    memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
    pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
  }
  return 0;
}

int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
  pTask->sinkType = TASK_SINK__NONE;
  if (pStream->fixedSinkVgId == 0) {
    pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
    pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
    ASSERT(pDb);

    if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
154 155 156 157 158
      ASSERT(0);
      return -1;
    }
    sdbRelease(pMnode->pSdb, pDb);

L
Liu Jicong 已提交
159
    memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
160 161 162 163 164 165 166 167 168
    SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t sz = taosArrayGetSize(pVgs);
    SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
    int32_t sinkLvSize = taosArrayGetSize(sinkLv);
    for (int32_t i = 0; i < sz; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
      for (int32_t j = 0; j < sinkLvSize; j++) {
        SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
        if (pLastLevelTask->nodeId == pVgInfo->vgId) {
L
Liu Jicong 已提交
169
          ASSERT(pVgInfo->vgId > 0);
170 171 172
          pVgInfo->taskId = pLastLevelTask->taskId;
          ASSERT(pVgInfo->taskId != 0);
          break;
L
Liu Jicong 已提交
173 174 175
        }
      }
    }
176 177 178 179 180 181 182 183 184 185
  } else {
    pTask->dispatchType = TASK_DISPATCH__FIXED;
    pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
    SArray* pArray = taosArrayGetP(pStream->tasks, 0);
    // one sink only
    ASSERT(taosArrayGetSize(pArray) == 1);
    SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
    pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
    pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
    pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
L
Liu Jicong 已提交
186 187 188 189
  }
  return 0;
}

L
Liu Jicong 已提交
190 191
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
  int32_t msgLen;
L
Liu Jicong 已提交
192 193 194
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

L
Liu Jicong 已提交
195
  plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
196
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
197

L
Liu Jicong 已提交
198
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
199 200 201
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
202
  ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE);
203
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
L
Liu Jicong 已提交
204 205 206
  return 0;
}

L
Liu Jicong 已提交
207 208 209 210 211 212
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
  SSnodeObj* pObj = NULL;
  pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
  return pObj;
}

L
Liu Jicong 已提交
213 214
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
                             const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
215
  int32_t msgLen;
L
Liu Jicong 已提交
216 217 218 219 220 221

  pTask->nodeId = 0;
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

  plan->execNode.nodeId = 0;
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
222

L
Liu Jicong 已提交
223
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
224 225 226
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
227
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, 0);
L
Liu Jicong 已提交
228 229 230
  return 0;
}

L
Liu Jicong 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

L
Liu Jicong 已提交
246
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
247 248 249 250 251 252 253 254 255 256
  SSdb*   pSdb = pMnode->pSdb;
  void*   pIter = NULL;
  SArray* tasks = taosArrayGetP(pStream->tasks, 0);

  ASSERT(taosArrayGetSize(pStream->tasks) == 1);

  while (1) {
    SVgObj* pVgroup;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
257
    if (strcmp(pVgroup->dbName, pStream->targetDb) != 0) {
L
Liu Jicong 已提交
258 259 260
      sdbRelease(pSdb, pVgroup);
      continue;
    }
L
Liu Jicong 已提交
261
    SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
262 263 264 265
    if (pTask == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
266
    mndAddTaskToTaskSet(tasks, pTask);
L
Liu Jicong 已提交
267 268 269 270 271

    pTask->nodeId = pVgroup->vgId;
    pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

    // source
L
Liu Jicong 已提交
272
    pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
L
Liu Jicong 已提交
273 274 275 276 277

    // exec
    pTask->execType = TASK_EXEC__NONE;

    // sink
L
Liu Jicong 已提交
278
    if (pStream->smaId != 0) {
L
Liu Jicong 已提交
279
      pTask->sinkType = TASK_SINK__SMA;
L
Liu Jicong 已提交
280
      pTask->smaSink.smaId = pStream->smaId;
L
Liu Jicong 已提交
281 282
    } else {
      pTask->sinkType = TASK_SINK__TABLE;
L
Liu Jicong 已提交
283
      pTask->tbSink.stbUid = pStream->targetStbUid;
L
Liu Jicong 已提交
284
      memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
L
Liu Jicong 已提交
285 286
      pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
      ASSERT(pTask->tbSink.pSchemaWrapper);
L
Liu Jicong 已提交
287 288 289 290 291
    }

    // dispatch
    pTask->dispatchType = TASK_DISPATCH__NONE;

292
    mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
L
Liu Jicong 已提交
293 294 295 296
  }
  return 0;
}

L
Liu Jicong 已提交
297
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
298 299
  ASSERT(pStream->fixedSinkVgId != 0);
  SArray*      tasks = taosArrayGetP(pStream->tasks, 0);
L
Liu Jicong 已提交
300
  SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
301 302 303 304
  if (pTask == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
305
  mndAddTaskToTaskSet(tasks, pTask);
L
Liu Jicong 已提交
306 307

  pTask->nodeId = pStream->fixedSinkVgId;
L
Liu Jicong 已提交
308
#if 0
L
Liu Jicong 已提交
309 310 311 312 313
  SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
  if (pVgroup == NULL) {
    return -1;
  }
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
314 315
#endif
  pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
L
Liu Jicong 已提交
316
  // source
L
Liu Jicong 已提交
317
  pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
L
Liu Jicong 已提交
318 319 320 321 322

  // exec
  pTask->execType = TASK_EXEC__NONE;

  // sink
L
Liu Jicong 已提交
323
  if (pStream->smaId != 0) {
L
Liu Jicong 已提交
324 325 326 327
    pTask->sinkType = TASK_SINK__SMA;
    pTask->smaSink.smaId = pStream->smaId;
  } else {
    pTask->sinkType = TASK_SINK__TABLE;
L
Liu Jicong 已提交
328
    pTask->tbSink.stbUid = pStream->targetStbUid;
L
Liu Jicong 已提交
329
    memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
L
Liu Jicong 已提交
330
    pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
L
Liu Jicong 已提交
331
  }
L
Liu Jicong 已提交
332

L
Liu Jicong 已提交
333 334 335
  // dispatch
  pTask->dispatchType = TASK_DISPATCH__NONE;

336
  mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pStream->fixedSinkVg.vgId);
L
Liu Jicong 已提交
337 338 339 340

  return 0;
}

341
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
342 343 344 345 346 347 348 349 350 351
  SSdb*       pSdb = pMnode->pSdb;
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
  int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
  ASSERT(totLevel <= 2);
  pStream->tasks = taosArrayInit(totLevel, sizeof(void*));

352 353 354 355 356 357 358 359 360
  bool    hasExtraSink = false;
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
  ASSERT(pDbObj != NULL);
  sdbRelease(pSdb, pDbObj);

  bool multiTarget = pDbObj->cfg.numOfVgroups > 1;

  if (totLevel == 2 || externalTargetDB || multiTarget) {
L
Liu Jicong 已提交
361 362 363 364 365 366 367 368 369 370
    SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
    taosArrayPush(pStream->tasks, &taskOneLevel);
    // add extra sink
    hasExtraSink = true;
    if (pStream->fixedSinkVgId == 0) {
      mndAddShuffleSinkTasksToStream(pMnode, pTrans, pStream);
    } else {
      mndAddFixedSinkTaskToStream(pMnode, pTrans, pStream);
    }
  }
371

L
Liu Jicong 已提交
372 373 374 375 376 377 378
  if (totLevel > 1) {
    SStreamTask* pFinalTask;
    // inner plan
    {
      SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
      taosArrayPush(pStream->tasks, &taskInnerLevel);

379 380
      SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
      SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
381 382 383 384 385 386 387
      ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);

      pFinalTask = tNewSStreamTask(pStream->uid);
      mndAddTaskToTaskSet(taskInnerLevel, pFinalTask);
      // input
      pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;

388 389 390
      // trigger
      pFinalTask->triggerParam = pStream->triggerParam;

L
Liu Jicong 已提交
391
      // dispatch
392 393 394 395
      if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) {
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
396 397 398

      // exec
      pFinalTask->execType = TASK_EXEC__PIPE;
L
Liu Jicong 已提交
399
      SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
L
Liu Jicong 已提交
400 401 402 403 404 405 406 407 408 409 410
      if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
    }

    // source plan
    SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
    taosArrayPush(pStream->tasks, &taskSourceLevel);

411 412
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
    SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
413 414 415 416 417 418 419
    ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
L
Liu Jicong 已提交
420
      if (pVgroup->dbUid != pStream->sourceDbUid) {
L
Liu Jicong 已提交
421 422 423 424 425 426
        sdbRelease(pSdb, pVgroup);
        continue;
      }
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
      mndAddTaskToTaskSet(taskSourceLevel, pTask);

L
Liu Jicong 已提交
427 428
      pTask->dataScan = 1;

L
Liu Jicong 已提交
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
      // input
      pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;

      // add fixed vg dispatch
      pTask->sinkType = TASK_SINK__NONE;
      pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
      pTask->dispatchType = TASK_DISPATCH__FIXED;

      pTask->fixedEpDispatcher.taskId = pFinalTask->taskId;
      pTask->fixedEpDispatcher.nodeId = pFinalTask->nodeId;
      pTask->fixedEpDispatcher.epSet = pFinalTask->epSet;

      // exec
      pTask->execType = TASK_EXEC__PIPE;
      if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
    }
  }

  if (totLevel == 1) {
    SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
    taosArrayPush(pStream->tasks, &taskOneLevel);

455
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
456
    ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
457
    SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
458 459 460 461 462 463 464
    ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
L
Liu Jicong 已提交
465
      if (pVgroup->dbUid != pStream->sourceDbUid) {
L
Liu Jicong 已提交
466 467 468 469 470 471
        sdbRelease(pSdb, pVgroup);
        continue;
      }
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
      mndAddTaskToTaskSet(taskOneLevel, pTask);

L
Liu Jicong 已提交
472 473
      pTask->dataScan = 1;

L
Liu Jicong 已提交
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
      // input
      pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;

      // sink or dispatch
      if (hasExtraSink) {
        mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pTask);
      } else {
        mndAddSinkToTask(pMnode, pTrans, pStream, pTask);
      }

      // exec
      pTask->execType = TASK_EXEC__PIPE;
      if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
    }
  }
L
Liu Jicong 已提交
493
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
494 495
  return 0;
}
L
Liu Jicong 已提交
496 497

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
498 499
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
L
Liu Jicong 已提交
500 501
  SQueryPlan* pPlan = NULL;
  SSubplan*   plan = NULL;
L
Liu Jicong 已提交
502

L
Liu Jicong 已提交
503
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
504 505 506 507 508
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
    if (pPlan == NULL) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
509

L
Liu Jicong 已提交
510 511 512
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
    if (levelNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
513
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
514 515
      return -1;
    }
L
Liu Jicong 已提交
516

517
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
518

L
Liu Jicong 已提交
519 520 521
    int32_t opNum = LIST_LENGTH(inner->pNodeList);
    if (opNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
522
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
523 524
      return -1;
    }
525
    plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
526
  }
L
Liu Jicong 已提交
527

528 529
  ASSERT(pSub->unassignedVgs);
  ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
530

L
Liu Jicong 已提交
531 532 533 534 535 536 537 538 539 540 541
  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;

542
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
L
Liu Jicong 已提交
543 544
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
    pVgEp->vgId = pVgroup->vgId;
545
    taosArrayPush(pSub->unassignedVgs, &pVgEp);
546

L
Liu Jicong 已提交
547
    mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
L
Liu Jicong 已提交
548

L
Liu Jicong 已提交
549
    if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562
      int32_t msgLen;

      plan->execNode.epSet = pVgEp->epSet;
      plan->execNode.nodeId = pVgEp->vgId;

      if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        terrno = TSDB_CODE_QRY_INVALID_INPUT;
        return -1;
      }
    } else {
      pVgEp->qmsg = strdup("");
L
Liu Jicong 已提交
563
    }
L
Liu Jicong 已提交
564 565
  }

566
  ASSERT(pSub->unassignedVgs->size > 0);
L
Liu Jicong 已提交
567

568
  ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
L
Liu Jicong 已提交
569

X
Xiaoyu Wang 已提交
570
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
571

L
Liu Jicong 已提交
572 573
  return 0;
}