mndScheduler.c 17.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndDb.h"
L
Liu Jicong 已提交
18
#include "mndSnode.h"
L
Liu Jicong 已提交
19
#include "mndVgroup.h"
X
Xiaoyu Wang 已提交
20
#include "parser.h"
L
Liu Jicong 已提交
21 22
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
23 24
#include "tuuid.h"

L
Liu Jicong 已提交
25
extern bool tsDeployOnSnode;
L
Liu Jicong 已提交
26

27 28 29 30
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);

static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) {
L
Liu Jicong 已提交
31
  int32_t childId = taosArrayGetSize(pArray);
L
Liu Jicong 已提交
32
  pTask->selfChildId = childId;
L
Liu Jicong 已提交
33 34 35 36
  taosArrayPush(pArray, &pTask);
  return 0;
}

37
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
38
                           int64_t watermark, int64_t deleteMark) {
L
Liu Jicong 已提交
39 40 41 42 43 44 45 46 47
  SNode*      pAst = NULL;
  SQueryPlan* pPlan = NULL;
  terrno = TSDB_CODE_SUCCESS;

  if (nodesStringToNode(ast, &pAst) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

48
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
X
Xiaoyu Wang 已提交
49 50 51 52
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

L
Liu Jicong 已提交
53 54 55 56 57
  SPlanContext cxt = {
      .pAstRoot = pAst,
      .topicQuery = false,
      .streamQuery = true,
      .rSmaQuery = true,
L
Liu Jicong 已提交
58
      .triggerType = triggerType,
L
Liu Jicong 已提交
59
      .watermark = watermark,
60
      .deleteMark = deleteMark,
L
Liu Jicong 已提交
61
  };
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64 65 66 67 68 69 70 71 72
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
  if (levelNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }
73
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
74 75 76 77 78 79 80

  int32_t opNum = LIST_LENGTH(inner->pNodeList);
  if (opNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

81 82
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
L
Liu Jicong 已提交
83 84 85 86 87 88
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

END:
  if (pAst) nodesDestroyNode(pAst);
89
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
L
Liu Jicong 已提交
90 91 92
  return terrno;
}

93
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
L
Liu Jicong 已提交
94
  if (pStream->smaId != 0) {
95
    pTask->outputType = TASK_OUTPUT__SMA;
L
Liu Jicong 已提交
96 97
    pTask->smaSink.smaId = pStream->smaId;
  } else {
98
    pTask->outputType = TASK_OUTPUT__TABLE;
L
Liu Jicong 已提交
99 100 101
    pTask->tbSink.stbUid = pStream->targetStbUid;
    memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
    pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
102 103 104
    if (pTask->tbSink.pSchemaWrapper == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
L
Liu Jicong 已提交
105
  }
106

L
Liu Jicong 已提交
107 108 109
  return 0;
}

110 111
#define SINK_NODE_LEVEL (0)

112
int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
L
Liu Jicong 已提交
113 114
  bool isShuffle = false;

L
Liu Jicong 已提交
115 116
  if (pStream->fixedSinkVgId == 0) {
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
wmmhello's avatar
wmmhello 已提交
117
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
118

L
Liu Jicong 已提交
119
      isShuffle = true;
120
      pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
L
Liu Jicong 已提交
121
      pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
L
Liu Jicong 已提交
122 123 124
      if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
        return -1;
      }
125
    }
L
Liu Jicong 已提交
126

127
    sdbRelease(pMnode->pSdb, pDb);
L
Liu Jicong 已提交
128
  }
129

130 131 132
  SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);

L
Liu Jicong 已提交
133
  if (isShuffle) {
L
Liu Jicong 已提交
134
    memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
135
    SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
136 137 138

    int32_t numOfVgroups = taosArrayGetSize(pVgs);
    for (int32_t i = 0; i < numOfVgroups; i++) {
139
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
140 141

      for (int32_t j = 0; j < numOfSinkNodes; j++) {
142
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
143 144
        if (pSinkTask->nodeId == pVgInfo->vgId) {
          pVgInfo->taskId = pSinkTask->id.taskId;
145
          break;
L
Liu Jicong 已提交
146 147 148
        }
      }
    }
149
  } else {
150 151
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
    setFixedDownstreamEpInfo(pTask, pOneSinkTask);
L
Liu Jicong 已提交
152
  }
153

L
Liu Jicong 已提交
154 155 156
  return 0;
}

157
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
L
Liu Jicong 已提交
158
  int32_t msgLen;
159

L
Liu Jicong 已提交
160 161 162
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

163
  plan->execNode.nodeId = pTask->nodeId;
L
Liu Jicong 已提交
164
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
165
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
166 167 168
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
169

L
Liu Jicong 已提交
170 171 172
  return 0;
}

L
Liu Jicong 已提交
173
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
L
Liu Jicong 已提交
174
  SSnodeObj* pObj = NULL;
L
Liu Jicong 已提交
175 176 177
  void*      pIter = NULL;
  // TODO random fetch
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
L
Liu Jicong 已提交
178 179 180
  return pObj;
}

181
int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
182
  int32_t msgLen;
L
Liu Jicong 已提交
183

L
Liu Jicong 已提交
184
  pTask->nodeId = SNODE_HANDLE;
L
Liu Jicong 已提交
185 186
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

L
Liu Jicong 已提交
187
  plan->execNode.nodeId = SNODE_HANDLE;
L
Liu Jicong 已提交
188
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
191 192 193
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
194 195 196
  return 0;
}

L
Liu Jicong 已提交
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

212
// create sink node for each vgroup.
213
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
L
Liu Jicong 已提交
214 215 216 217
  SSdb*   pSdb = pMnode->pSdb;
  void*   pIter = NULL;

  while (1) {
218
    SVgObj* pVgroup = NULL;
L
Liu Jicong 已提交
219
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
220 221 222 223
    if (pIter == NULL) {
      break;
    }

S
Shengliang Guan 已提交
224
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
L
Liu Jicong 已提交
225 226 227
      sdbRelease(pSdb, pVgroup);
      continue;
    }
S
Shengliang Guan 已提交
228

229
    mndAddSinkTaskToStream(pStream, pMnode, pVgroup->vgId, pVgroup);
230
    sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
231
  }
232

L
Liu Jicong 已提交
233 234 235
  return 0;
}

236 237 238
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) {
  SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);

239
  SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory);
L
Liu Jicong 已提交
240 241 242 243 244
  if (pTask == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

245
  mndAddToTaskset(pTaskList, pTask);
246

247
  pTask->nodeId = vgId;
L
Liu Jicong 已提交
248
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
249 250 251
  mndSetSinkTaskInfo(pStream, pTask);
  return 0;
}
L
Liu Jicong 已提交
252

253 254 255
static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStream) {
  return 0;
}
L
Liu Jicong 已提交
256

257 258 259 260 261 262 263 264 265 266 267 268 269 270
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream,
                                   SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory,
                                   bool hasExtraSink) {
  SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory);
  if (pTask == NULL) {
    return terrno;
  }

  mndAddToTaskset(pTaskList, pTask);
  pTask->triggerParam = pStream->triggerParam; // trigger

  // sink or dispatch
  if (hasExtraSink) {
    mndAddDispatcherForInnerTask(pMnode, pStream, pTask);
L
Liu Jicong 已提交
271
  } else {
272
    mndSetSinkTaskInfo(pStream, pTask);
L
Liu Jicong 已提交
273
  }
L
Liu Jicong 已提交
274

275 276 277 278 279
  if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
    return terrno;
  }

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
280 281
}

282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) {
  SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
  if (pEpInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pEpInfo->childId = pTask->selfChildId;
  pEpInfo->epSet = pTask->epSet;
  pEpInfo->nodeId = pTask->nodeId;
  pEpInfo->taskId = pTask->id.taskId;

  return pEpInfo;
}

void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
  STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher;
  pDispatcher->taskId = pTask->id.taskId;
  pDispatcher->nodeId = pTask->nodeId;
  pDispatcher->epSet = pTask->epSet;
302

303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
  pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
  pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
}

int32_t appendToUpstream(SStreamTask* pTask, SStreamTask* pUpstream) {
  SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
  if (pEpInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  if(pUpstream->childEpInfo == NULL) {
    pUpstream->childEpInfo = taosArrayInit(4, POINTER_BYTES);
  }
  
  taosArrayPush(pUpstream->childEpInfo, &pEpInfo);
  return TSDB_CODE_SUCCESS;
}
320

321
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
322 323
  SSdb* pSdb = pMnode->pSdb;

L
Liu Jicong 已提交
324 325 326 327 328
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
329

330 331
  int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
  pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES);
L
Liu Jicong 已提交
332

333 334 335
  bool    hasExtraSink = false;
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
L
Liu Jicong 已提交
336 337 338 339
  if (pDbObj == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
340

341
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
L
Liu Jicong 已提交
342
  sdbRelease(pSdb, pDbObj);
343

344
  if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
345
    SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
L
Liu Jicong 已提交
346
    taosArrayPush(pStream->tasks, &taskOneLevel);
347

L
Liu Jicong 已提交
348 349 350
    // add extra sink
    hasExtraSink = true;
    if (pStream->fixedSinkVgId == 0) {
351 352 353 354
      if (mndAddShuffleSinkTasksToStream(pMnode, pStream) < 0) {
        // TODO free
        return -1;
      }
L
Liu Jicong 已提交
355
    } else {
356
      if (mndAddSinkTaskToStream(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) {
357 358 359
        // TODO free
        return -1;
      }
L
Liu Jicong 已提交
360 361
    }
  }
362

L
Liu Jicong 已提交
363
  pStream->totalLevel = planTotLevel + hasExtraSink;
364

L
Liu Jicong 已提交
365
  if (planTotLevel > 1) {
L
Liu Jicong 已提交
366 367
    SStreamTask* pInnerTask;
    // inner level
L
Liu Jicong 已提交
368
    {
369
      SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES);
L
Liu Jicong 已提交
370 371
      taosArrayPush(pStream->tasks, &taskInnerLevel);

372 373
      SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
      SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
374 375 376 377
      if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
        terrno = TSDB_CODE_QRY_INVALID_INPUT;
        return -1;
      }
L
Liu Jicong 已提交
378

379
      pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory);
380 381 382 383 384
      if (pInnerTask == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        qDestroyQueryPlan(pPlan);
        return -1;
      }
385

386
      mndAddToTaskset(taskInnerLevel, pInnerTask);
387
      pInnerTask->triggerParam = pStream->triggerParam;      // trigger
388

L
Liu Jicong 已提交
389
      // dispatch
390
      if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) {
391 392 393
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
394

L
Liu Jicong 已提交
395
      if (tsDeployOnSnode) {
L
Liu Jicong 已提交
396 397 398
        SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
        if (pSnode == NULL) {
          SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
399
          if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
400 401 402 403
            sdbRelease(pSdb, pVgroup);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
404
          sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
405
        } else {
406
          if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) {
L
Liu Jicong 已提交
407 408 409 410 411 412 413
            sdbRelease(pSdb, pSnode);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
        }
      } else {
        SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
414
        if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
415 416 417 418
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
419

420
        sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
421 422 423
      }
    }

L
Liu Jicong 已提交
424
    // source level
425
    SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES);
L
Liu Jicong 已提交
426 427
    taosArrayPush(pStream->tasks, &taskSourceLevel);

428 429
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
    SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
430 431 432 433
    if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
434 435 436 437 438

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
439 440 441 442
      if (pIter == NULL) {
        break;
      }

S
Shengliang Guan 已提交
443
      if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
L
Liu Jicong 已提交
444 445 446
        sdbRelease(pSdb, pVgroup);
        continue;
      }
S
Shengliang Guan 已提交
447

448
      SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory);
L
Liu Jicong 已提交
449
      if (pTask == NULL) {
450 451 452 453 454
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
455

456
      mndAddToTaskset(taskSourceLevel, pTask);
L
Liu Jicong 已提交
457

458 459 460
      // all the source tasks dispatch result to a single agg node.
      setFixedDownstreamEpInfo(pTask, pInnerTask);
      pTask->triggerParam = 0;
L
Liu Jicong 已提交
461

462
      if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
463 464 465 466
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
467

468 469 470 471 472
      int32_t code = appendToUpstream(pTask, pInnerTask);
      sdbRelease(pSdb, pVgroup);

      if (code != TSDB_CODE_SUCCESS) {
        terrno = code;
L
Liu Jicong 已提交
473 474 475
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
476
    }
477 478
  } else if (planTotLevel == 1) {
    // create exec stream task, since only one level, the exec task is also the source task
479 480
    SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
    taosArrayPush(pStream->tasks, &pTaskList);
L
Liu Jicong 已提交
481

482
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
483 484 485 486
    if (LIST_LENGTH(inner->pNodeList) != 1) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
487

488
    SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
489 490 491 492
    if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
493 494 495 496 497

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
498 499 500 501
      if (pIter == NULL) {
        break;
      }

S
Shengliang Guan 已提交
502
      if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
L
Liu Jicong 已提交
503 504 505
        sdbRelease(pSdb, pVgroup);
        continue;
      }
S
Shengliang Guan 已提交
506

507
      // new stream task
508 509
      int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pStream, plan, pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, hasExtraSink);
      sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
510

511
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
512 513 514 515 516
        qDestroyQueryPlan(pPlan);
        return -1;
      }
    }
  }
517

L
Liu Jicong 已提交
518
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
519 520
  return 0;
}
L
Liu Jicong 已提交
521 522

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
523 524
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
L
Liu Jicong 已提交
525
  SQueryPlan* pPlan = NULL;
526
  SSubplan*   pSubplan = NULL;
L
Liu Jicong 已提交
527

L
Liu Jicong 已提交
528
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
529 530 531 532 533
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
    if (pPlan == NULL) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
534

L
Liu Jicong 已提交
535 536 537
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
    if (levelNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
538
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
539 540
      return -1;
    }
L
Liu Jicong 已提交
541

542
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
543

544
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
L
Liu Jicong 已提交
545 546
    if (opNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
547
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
548 549
      return -1;
    }
550 551

    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
L
Liu Jicong 已提交
552
  }
L
Liu Jicong 已提交
553 554 555 556

  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
557 558 559 560
    if (pIter == NULL) {
      break;
    }

S
Shengliang Guan 已提交
561
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
L
Liu Jicong 已提交
562 563 564 565 566 567
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;

568
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
L
Liu Jicong 已提交
569 570
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
    pVgEp->vgId = pVgroup->vgId;
571
    taosArrayPush(pSub->unassignedVgs, &pVgEp);
572

573
    mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
L
Liu Jicong 已提交
574

L
Liu Jicong 已提交
575
    if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
576 577
      int32_t msgLen;

578 579
      pSubplan->execNode.epSet = pVgEp->epSet;
      pSubplan->execNode.nodeId = pVgEp->vgId;
L
Liu Jicong 已提交
580

581
      if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
582 583 584 585 586 587
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        terrno = TSDB_CODE_QRY_INVALID_INPUT;
        return -1;
      }
    } else {
588
      pVgEp->qmsg = taosStrdup("");
L
Liu Jicong 已提交
589
    }
S
Shengliang Guan 已提交
590 591

    sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
592 593
  }

X
Xiaoyu Wang 已提交
594
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
595 596
  return 0;
}