mndScheduler.c 17.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndDb.h"
L
Liu Jicong 已提交
18
#include "mndSnode.h"
L
Liu Jicong 已提交
19
#include "mndVgroup.h"
X
Xiaoyu Wang 已提交
20
#include "parser.h"
L
Liu Jicong 已提交
21 22
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
23 24
#include "tuuid.h"

L
Liu Jicong 已提交
25
extern bool tsDeployOnSnode;
L
Liu Jicong 已提交
26

27 28
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
L
Liu Jicong 已提交
29

30
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
31
                           int64_t watermark, int64_t deleteMark) {
L
Liu Jicong 已提交
32 33 34 35 36 37 38 39 40
  SNode*      pAst = NULL;
  SQueryPlan* pPlan = NULL;
  terrno = TSDB_CODE_SUCCESS;

  if (nodesStringToNode(ast, &pAst) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

41
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
X
Xiaoyu Wang 已提交
42 43 44 45
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

L
Liu Jicong 已提交
46 47 48 49 50
  SPlanContext cxt = {
      .pAstRoot = pAst,
      .topicQuery = false,
      .streamQuery = true,
      .rSmaQuery = true,
L
Liu Jicong 已提交
51
      .triggerType = triggerType,
L
Liu Jicong 已提交
52
      .watermark = watermark,
53
      .deleteMark = deleteMark,
L
Liu Jicong 已提交
54
  };
L
Liu Jicong 已提交
55

L
Liu Jicong 已提交
56 57 58 59 60 61 62 63 64 65
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
  if (levelNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }
66
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
67 68 69 70 71 72 73

  int32_t opNum = LIST_LENGTH(inner->pNodeList);
  if (opNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

74 75
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
L
Liu Jicong 已提交
76 77 78 79 80 81
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

END:
  if (pAst) nodesDestroyNode(pAst);
82
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
L
Liu Jicong 已提交
83 84 85
  return terrno;
}

86
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
L
Liu Jicong 已提交
87
  if (pStream->smaId != 0) {
88
    pTask->outputType = TASK_OUTPUT__SMA;
L
Liu Jicong 已提交
89 90
    pTask->smaSink.smaId = pStream->smaId;
  } else {
91
    pTask->outputType = TASK_OUTPUT__TABLE;
L
Liu Jicong 已提交
92 93 94
    pTask->tbSink.stbUid = pStream->targetStbUid;
    memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
    pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
95 96 97
    if (pTask->tbSink.pSchemaWrapper == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
L
Liu Jicong 已提交
98
  }
99

L
Liu Jicong 已提交
100 101 102
  return 0;
}

103 104
#define SINK_NODE_LEVEL (0)

105
int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
L
Liu Jicong 已提交
106 107
  bool isShuffle = false;

L
Liu Jicong 已提交
108 109
  if (pStream->fixedSinkVgId == 0) {
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
wmmhello's avatar
wmmhello 已提交
110
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
111

L
Liu Jicong 已提交
112
      isShuffle = true;
113
      pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
L
Liu Jicong 已提交
114
      pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
L
Liu Jicong 已提交
115 116 117
      if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
        return -1;
      }
118
    }
L
Liu Jicong 已提交
119

120
    sdbRelease(pMnode->pSdb, pDb);
L
Liu Jicong 已提交
121
  }
122

123 124 125
  SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);

L
Liu Jicong 已提交
126
  if (isShuffle) {
L
Liu Jicong 已提交
127
    memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
128
    SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
129 130 131

    int32_t numOfVgroups = taosArrayGetSize(pVgs);
    for (int32_t i = 0; i < numOfVgroups; i++) {
132
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
133 134

      for (int32_t j = 0; j < numOfSinkNodes; j++) {
135
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
136 137
        if (pSinkTask->nodeId == pVgInfo->vgId) {
          pVgInfo->taskId = pSinkTask->id.taskId;
138
          break;
L
Liu Jicong 已提交
139 140 141
        }
      }
    }
142
  } else {
143 144
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
    setFixedDownstreamEpInfo(pTask, pOneSinkTask);
L
Liu Jicong 已提交
145
  }
146

L
Liu Jicong 已提交
147 148 149
  return 0;
}

150
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
L
Liu Jicong 已提交
151
  int32_t msgLen;
152

L
Liu Jicong 已提交
153 154 155
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

156
  plan->execNode.nodeId = pTask->nodeId;
L
Liu Jicong 已提交
157
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
158
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
159 160 161
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
162

L
Liu Jicong 已提交
163 164 165
  return 0;
}

L
Liu Jicong 已提交
166
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
L
Liu Jicong 已提交
167
  SSnodeObj* pObj = NULL;
L
Liu Jicong 已提交
168 169 170
  void*      pIter = NULL;
  // TODO random fetch
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
L
Liu Jicong 已提交
171 172 173
  return pObj;
}

174
int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
175
  int32_t msgLen;
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177
  pTask->nodeId = SNODE_HANDLE;
L
Liu Jicong 已提交
178 179
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

L
Liu Jicong 已提交
180
  plan->execNode.nodeId = SNODE_HANDLE;
L
Liu Jicong 已提交
181
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
182

L
Liu Jicong 已提交
183
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
184 185 186
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
187 188 189
  return 0;
}

L
Liu Jicong 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

205
// create sink node for each vgroup.
206
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
L
Liu Jicong 已提交
207 208 209 210
  SSdb*   pSdb = pMnode->pSdb;
  void*   pIter = NULL;

  while (1) {
211
    SVgObj* pVgroup = NULL;
L
Liu Jicong 已提交
212
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
213 214
    if (pIter == NULL) {
      break;
L
Liu Jicong 已提交
215
    }
S
Shengliang Guan 已提交
216 217

    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
218
      sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
219 220 221
      continue;
    }

222
    mndAddSinkTaskToStream(pStream, pMnode, pVgroup->vgId, pVgroup);
223
    sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
224
  }
225

L
Liu Jicong 已提交
226 227 228
  return 0;
}

229 230 231
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) {
  SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);

232
  SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory, 0, pTaskList);
L
Liu Jicong 已提交
233 234 235 236 237
  if (pTask == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

238
  pTask->nodeId = vgId;
L
Liu Jicong 已提交
239
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
240 241 242
  mndSetSinkTaskInfo(pStream, pTask);
  return 0;
}
L
Liu Jicong 已提交
243

244 245 246
static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStream) {
  return 0;
}
L
Liu Jicong 已提交
247

248 249 250
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream,
                                   SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory,
                                   bool hasExtraSink) {
251
  SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->triggerParam, pTaskList);
252 253 254 255 256 257 258
  if (pTask == NULL) {
    return terrno;
  }

  // sink or dispatch
  if (hasExtraSink) {
    mndAddDispatcherForInnerTask(pMnode, pStream, pTask);
L
Liu Jicong 已提交
259
  } else {
260
    mndSetSinkTaskInfo(pStream, pTask);
L
Liu Jicong 已提交
261
  }
L
Liu Jicong 已提交
262

263 264 265 266 267
  if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
    return terrno;
  }

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
268 269
}

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) {
  SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
  if (pEpInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pEpInfo->childId = pTask->selfChildId;
  pEpInfo->epSet = pTask->epSet;
  pEpInfo->nodeId = pTask->nodeId;
  pEpInfo->taskId = pTask->id.taskId;

  return pEpInfo;
}

void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
  STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher;
  pDispatcher->taskId = pTask->id.taskId;
  pDispatcher->nodeId = pTask->nodeId;
  pDispatcher->epSet = pTask->epSet;
290

291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
  pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
  pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
}

int32_t appendToUpstream(SStreamTask* pTask, SStreamTask* pUpstream) {
  SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
  if (pEpInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  if(pUpstream->childEpInfo == NULL) {
    pUpstream->childEpInfo = taosArrayInit(4, POINTER_BYTES);
  }
  
  taosArrayPush(pUpstream->childEpInfo, &pEpInfo);
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
307 308
}

309
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
310 311
  SSdb* pSdb = pMnode->pSdb;

L
Liu Jicong 已提交
312 313 314 315 316
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
317

318 319
  int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
  pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES);
L
Liu Jicong 已提交
320

321 322 323
  bool    hasExtraSink = false;
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
L
Liu Jicong 已提交
324 325 326 327
  if (pDbObj == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
328

329
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
L
Liu Jicong 已提交
330
  sdbRelease(pSdb, pDbObj);
331

332
  if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
333
    SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
L
Liu Jicong 已提交
334
    taosArrayPush(pStream->tasks, &taskOneLevel);
335

L
Liu Jicong 已提交
336 337 338
    // add extra sink
    hasExtraSink = true;
    if (pStream->fixedSinkVgId == 0) {
339 340 341 342
      if (mndAddShuffleSinkTasksToStream(pMnode, pStream) < 0) {
        // TODO free
        return -1;
      }
L
Liu Jicong 已提交
343
    } else {
344
      if (mndAddSinkTaskToStream(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) {
345 346 347
        // TODO free
        return -1;
      }
L
Liu Jicong 已提交
348 349
    }
  }
350

L
Liu Jicong 已提交
351
  pStream->totalLevel = planTotLevel + hasExtraSink;
352

L
Liu Jicong 已提交
353
  if (planTotLevel > 1) {
L
Liu Jicong 已提交
354 355
    SStreamTask* pInnerTask;
    // inner level
L
Liu Jicong 已提交
356
    {
357
      SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES);
L
Liu Jicong 已提交
358 359
      taosArrayPush(pStream->tasks, &taskInnerLevel);

360 361
      SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
      SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
362 363 364 365
      if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
        terrno = TSDB_CODE_QRY_INVALID_INPUT;
        return -1;
      }
L
Liu Jicong 已提交
366

367
      pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory, pStream->triggerParam, taskInnerLevel);
368 369 370 371 372
      if (pInnerTask == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        qDestroyQueryPlan(pPlan);
        return -1;
      }
373

L
Liu Jicong 已提交
374
      // dispatch
375
      if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) {
376 377 378
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
379

L
Liu Jicong 已提交
380
      if (tsDeployOnSnode) {
L
Liu Jicong 已提交
381 382 383
        SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
        if (pSnode == NULL) {
          SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
384
          if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
385 386 387 388
            sdbRelease(pSdb, pVgroup);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
389
          sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
390
        } else {
391
          if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) {
L
Liu Jicong 已提交
392 393 394 395 396 397 398
            sdbRelease(pSdb, pSnode);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
        }
      } else {
        SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
399
        if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
400 401 402 403
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
404

405
        sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
406 407 408
      }
    }

L
Liu Jicong 已提交
409
    // source level
410
    SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES);
L
Liu Jicong 已提交
411 412
    taosArrayPush(pStream->tasks, &taskSourceLevel);

413 414
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
    SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
415 416 417 418
    if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
419 420 421 422 423

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
424 425 426 427
      if (pIter == NULL) {
        break;
      }

S
Shengliang Guan 已提交
428
      if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
L
Liu Jicong 已提交
429 430 431
        sdbRelease(pSdb, pVgroup);
        continue;
      }
S
Shengliang Guan 已提交
432

433
      SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, 0, taskSourceLevel);
L
Liu Jicong 已提交
434
      if (pTask == NULL) {
435 436 437 438 439
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
440

441 442
      // all the source tasks dispatch result to a single agg node.
      setFixedDownstreamEpInfo(pTask, pInnerTask);
L
Liu Jicong 已提交
443

444
      if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
445 446 447 448
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
449

450 451 452 453 454
      int32_t code = appendToUpstream(pTask, pInnerTask);
      sdbRelease(pSdb, pVgroup);

      if (code != TSDB_CODE_SUCCESS) {
        terrno = code;
L
Liu Jicong 已提交
455 456 457
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
458
    }
459 460
  } else if (planTotLevel == 1) {
    // create exec stream task, since only one level, the exec task is also the source task
461 462
    SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
    taosArrayPush(pStream->tasks, &pTaskList);
L
Liu Jicong 已提交
463

464
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
465 466 467 468
    if (LIST_LENGTH(inner->pNodeList) != 1) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
469

470
    SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
471 472 473 474
    if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
475 476 477 478 479

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
480 481
      if (pIter == NULL) {
        break;
L
Liu Jicong 已提交
482
      }
S
Shengliang Guan 已提交
483 484

      if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
485
        sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
486
        continue;
487
      }
488

489
      // new stream task
490 491
      int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pStream, plan, pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, hasExtraSink);
      sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
492

493
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
494 495 496 497 498
        qDestroyQueryPlan(pPlan);
        return -1;
      }
    }
  }
499

L
Liu Jicong 已提交
500
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
501 502
  return 0;
}
L
Liu Jicong 已提交
503 504

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
505 506
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
L
Liu Jicong 已提交
507
  SQueryPlan* pPlan = NULL;
508
  SSubplan*   pSubplan = NULL;
L
Liu Jicong 已提交
509

L
Liu Jicong 已提交
510
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
511 512 513 514 515
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
    if (pPlan == NULL) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
  }else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL){
    SNode *pAst = NULL;
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
      return -1;
    }

    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
    if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
      nodesDestroyNode(pAst);
      return -1;
    }
    nodesDestroyNode(pAst);
  }
L
Liu Jicong 已提交
531

532
  if(pPlan){
L
Liu Jicong 已提交
533 534 535
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
    if (levelNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
536
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
537 538
      return -1;
    }
L
Liu Jicong 已提交
539

540
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
541

542
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
L
Liu Jicong 已提交
543 544
    if (opNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
545
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
546 547
      return -1;
    }
548 549

    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
L
Liu Jicong 已提交
550
  }
L
Liu Jicong 已提交
551 552 553 554

  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
555 556 557 558
    if (pIter == NULL) {
      break;
    }

S
Shengliang Guan 已提交
559
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
L
Liu Jicong 已提交
560 561 562 563 564 565
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;

566
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
L
Liu Jicong 已提交
567 568
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
    pVgEp->vgId = pVgroup->vgId;
569
    taosArrayPush(pSub->unassignedVgs, &pVgEp);
570

571
    mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
L
Liu Jicong 已提交
572

573 574
    sdbRelease(pSdb, pVgroup);
  }
L
Liu Jicong 已提交
575

576 577
  if (pSubplan) {
    int32_t msgLen;
L
Liu Jicong 已提交
578

579 580 581 582
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
      qDestroyQueryPlan(pPlan);
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
L
Liu Jicong 已提交
583
    }
584 585
  } else {
    pSub->qmsg = taosStrdup("");
L
Liu Jicong 已提交
586 587
  }

X
Xiaoyu Wang 已提交
588
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
589 590
  return 0;
}