mndScheduler.c 17.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
L
Liu Jicong 已提交
22
#include "mndSnode.h"
L
Liu Jicong 已提交
23
#include "mndStb.h"
L
Liu Jicong 已提交
24
#include "mndStream.h"
L
Liu Jicong 已提交
25 26 27 28 29
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
X
Xiaoyu Wang 已提交
30
#include "parser.h"
L
Liu Jicong 已提交
31 32
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
33 34
#include "tuuid.h"

L
Liu Jicong 已提交
35
extern bool tsDeployOnSnode;
L
Liu Jicong 已提交
36

L
Liu Jicong 已提交
37 38
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
  int32_t childId = taosArrayGetSize(pArray);
L
Liu Jicong 已提交
39
  pTask->selfChildId = childId;
L
Liu Jicong 已提交
40 41 42 43
  taosArrayPush(pArray, &pTask);
  return 0;
}

44
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
45
                           int64_t watermark) {
L
Liu Jicong 已提交
46 47 48 49 50 51 52 53 54
  SNode*      pAst = NULL;
  SQueryPlan* pPlan = NULL;
  terrno = TSDB_CODE_SUCCESS;

  if (nodesStringToNode(ast, &pAst) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

55
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
X
Xiaoyu Wang 已提交
56 57 58 59
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

L
Liu Jicong 已提交
60 61 62 63 64
  SPlanContext cxt = {
      .pAstRoot = pAst,
      .topicQuery = false,
      .streamQuery = true,
      .rSmaQuery = true,
L
Liu Jicong 已提交
65
      .triggerType = triggerType,
L
Liu Jicong 已提交
66 67
      .watermark = watermark,
  };
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69 70 71 72 73 74 75 76 77 78
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
  if (levelNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }
79
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
80 81 82 83 84 85 86

  int32_t opNum = LIST_LENGTH(inner->pNodeList);
  if (opNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

87 88
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
L
Liu Jicong 已提交
89 90 91 92 93 94
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

END:
  if (pAst) nodesDestroyNode(pAst);
95
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
L
Liu Jicong 已提交
96 97 98
  return terrno;
}

99
int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
L
Liu Jicong 已提交
100
  if (pStream->smaId != 0) {
101
    pTask->outputType = TASK_OUTPUT__SMA;
L
Liu Jicong 已提交
102 103
    pTask->smaSink.smaId = pStream->smaId;
  } else {
104
    pTask->outputType = TASK_OUTPUT__TABLE;
L
Liu Jicong 已提交
105 106 107 108 109 110 111
    pTask->tbSink.stbUid = pStream->targetStbUid;
    memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
    pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
  }
  return 0;
}

112
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
L
Liu Jicong 已提交
113 114
  bool isShuffle = false;

L
Liu Jicong 已提交
115 116 117
  if (pStream->fixedSinkVgId == 0) {
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
    ASSERT(pDb);
L
Liu Jicong 已提交
118 119
    if (pDb->cfg.numOfVgroups > 1) {
      isShuffle = true;
120
      pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
L
Liu Jicong 已提交
121
      pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
L
Liu Jicong 已提交
122 123 124 125
      if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
        ASSERT(0);
        return -1;
      }
126
    }
L
Liu Jicong 已提交
127

128
    sdbRelease(pMnode->pSdb, pDb);
L
Liu Jicong 已提交
129
  }
130

L
Liu Jicong 已提交
131
  if (isShuffle) {
L
Liu Jicong 已提交
132
    memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
133 134 135 136 137 138 139 140 141
    SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t sz = taosArrayGetSize(pVgs);
    SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
    int32_t sinkLvSize = taosArrayGetSize(sinkLv);
    for (int32_t i = 0; i < sz; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
      for (int32_t j = 0; j < sinkLvSize; j++) {
        SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
        if (pLastLevelTask->nodeId == pVgInfo->vgId) {
L
Liu Jicong 已提交
142
          ASSERT(pVgInfo->vgId > 0);
143 144 145
          pVgInfo->taskId = pLastLevelTask->taskId;
          ASSERT(pVgInfo->taskId != 0);
          break;
L
Liu Jicong 已提交
146 147 148
        }
      }
    }
149
  } else {
150
    pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
151 152 153 154 155 156 157 158
    pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
    SArray* pArray = taosArrayGetP(pStream->tasks, 0);
    // one sink only
    ASSERT(taosArrayGetSize(pArray) == 1);
    SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
    pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
    pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
    pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
L
Liu Jicong 已提交
159 160 161 162
  }
  return 0;
}

163
int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
L
Liu Jicong 已提交
164
  int32_t msgLen;
L
Liu Jicong 已提交
165 166 167
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

L
Liu Jicong 已提交
168
  plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
169
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
170

L
Liu Jicong 已提交
171
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
172
    ASSERT(0);
L
Liu Jicong 已提交
173 174 175 176 177 178
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
179
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
L
Liu Jicong 已提交
180
  SSnodeObj* pObj = NULL;
L
Liu Jicong 已提交
181 182 183
  void*      pIter = NULL;
  // TODO random fetch
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
L
Liu Jicong 已提交
184 185 186
  return pObj;
}

187
int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
188
  int32_t msgLen;
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190
  pTask->nodeId = SNODE_HANDLE;
L
Liu Jicong 已提交
191 192
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

L
Liu Jicong 已提交
193
  plan->execNode.nodeId = SNODE_HANDLE;
L
Liu Jicong 已提交
194
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
195

L
Liu Jicong 已提交
196
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
197
    ASSERT(0);
L
Liu Jicong 已提交
198 199 200
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
201 202 203
  return 0;
}

L
Liu Jicong 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

219
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
L
Liu Jicong 已提交
220 221 222 223 224 225 226
  SSdb*   pSdb = pMnode->pSdb;
  void*   pIter = NULL;
  SArray* tasks = taosArrayGetP(pStream->tasks, 0);

  ASSERT(taosArrayGetSize(pStream->tasks) == 1);

  while (1) {
227
    SVgObj* pVgroup = NULL;
L
Liu Jicong 已提交
228 229
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
S
Shengliang Guan 已提交
230
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
L
Liu Jicong 已提交
231 232 233
      sdbRelease(pSdb, pVgroup);
      continue;
    }
S
Shengliang Guan 已提交
234

L
Liu Jicong 已提交
235
    SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
236
    if (pTask == NULL) {
237
      sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
238 239 240
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
241
    pTask->fillHistory = pStream->fillHistory;
L
Liu Jicong 已提交
242
    mndAddTaskToTaskSet(tasks, pTask);
L
Liu Jicong 已提交
243 244 245 246

    pTask->nodeId = pVgroup->vgId;
    pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

247 248
    // type
    pTask->taskLevel = TASK_LEVEL__SINK;
L
Liu Jicong 已提交
249 250

    // sink
L
Liu Jicong 已提交
251
    if (pStream->smaId != 0) {
252
      pTask->outputType = TASK_OUTPUT__SMA;
L
Liu Jicong 已提交
253
      pTask->smaSink.smaId = pStream->smaId;
L
Liu Jicong 已提交
254
    } else {
255
      pTask->outputType = TASK_OUTPUT__TABLE;
L
Liu Jicong 已提交
256
      pTask->tbSink.stbUid = pStream->targetStbUid;
L
Liu Jicong 已提交
257
      memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
L
Liu Jicong 已提交
258 259
      pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
      ASSERT(pTask->tbSink.pSchemaWrapper);
L
Liu Jicong 已提交
260
    }
261
    sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
262 263 264 265
  }
  return 0;
}

266
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
L
Liu Jicong 已提交
267 268
  ASSERT(pStream->fixedSinkVgId != 0);
  SArray*      tasks = taosArrayGetP(pStream->tasks, 0);
L
Liu Jicong 已提交
269
  SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
270 271 272 273
  if (pTask == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
274
  pTask->fillHistory = pStream->fillHistory;
L
Liu Jicong 已提交
275
  mndAddTaskToTaskSet(tasks, pTask);
L
Liu Jicong 已提交
276

L
Liu Jicong 已提交
277 278
  ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId);

L
Liu Jicong 已提交
279
  pTask->nodeId = pStream->fixedSinkVgId;
L
Liu Jicong 已提交
280
#if 0
L
Liu Jicong 已提交
281 282 283 284 285
  SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
  if (pVgroup == NULL) {
    return -1;
  }
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
286 287
#endif
  pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
L
Liu Jicong 已提交
288

289
  pTask->taskLevel = TASK_LEVEL__SINK;
L
Liu Jicong 已提交
290 291

  // sink
L
Liu Jicong 已提交
292
  if (pStream->smaId != 0) {
293
    pTask->outputType = TASK_OUTPUT__SMA;
L
Liu Jicong 已提交
294 295
    pTask->smaSink.smaId = pStream->smaId;
  } else {
296
    pTask->outputType = TASK_OUTPUT__TABLE;
L
Liu Jicong 已提交
297
    pTask->tbSink.stbUid = pStream->targetStbUid;
L
Liu Jicong 已提交
298
    memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
L
Liu Jicong 已提交
299
    pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
L
Liu Jicong 已提交
300
  }
L
Liu Jicong 已提交
301

L
Liu Jicong 已提交
302 303 304
  return 0;
}

305
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
L
Liu Jicong 已提交
306 307 308 309 310 311
  SSdb*       pSdb = pMnode->pSdb;
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
312 313 314
  int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
  ASSERT(planTotLevel <= 2);
  pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*));
L
Liu Jicong 已提交
315

316 317 318 319 320 321 322 323
  bool    hasExtraSink = false;
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
  ASSERT(pDbObj != NULL);
  sdbRelease(pSdb, pDbObj);

  bool multiTarget = pDbObj->cfg.numOfVgroups > 1;

324
  if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
325
    /*if (true) {*/
L
Liu Jicong 已提交
326 327 328 329 330
    SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
    taosArrayPush(pStream->tasks, &taskOneLevel);
    // add extra sink
    hasExtraSink = true;
    if (pStream->fixedSinkVgId == 0) {
331 332 333 334
      if (mndAddShuffleSinkTasksToStream(pMnode, pStream) < 0) {
        // TODO free
        return -1;
      }
L
Liu Jicong 已提交
335
    } else {
336 337 338 339
      if (mndAddFixedSinkTaskToStream(pMnode, pStream) < 0) {
        // TODO free
        return -1;
      }
L
Liu Jicong 已提交
340 341
    }
  }
L
Liu Jicong 已提交
342
  pStream->totalLevel = planTotLevel + hasExtraSink;
343

L
Liu Jicong 已提交
344
  if (planTotLevel > 1) {
L
Liu Jicong 已提交
345 346
    SStreamTask* pInnerTask;
    // inner level
L
Liu Jicong 已提交
347 348 349 350
    {
      SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
      taosArrayPush(pStream->tasks, &taskInnerLevel);

351 352
      SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
      SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
353 354
      ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);

L
Liu Jicong 已提交
355
      pInnerTask = tNewSStreamTask(pStream->uid);
356 357 358 359 360
      if (pInnerTask == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
361
      pInnerTask->fillHistory = pStream->fillHistory;
L
Liu Jicong 已提交
362
      mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
L
Liu Jicong 已提交
363 364 365

      pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));

366
      pInnerTask->taskLevel = TASK_LEVEL__AGG;
L
Liu Jicong 已提交
367

368
      // trigger
L
Liu Jicong 已提交
369
      pInnerTask->triggerParam = pStream->triggerParam;
370

L
Liu Jicong 已提交
371
      // dispatch
372
      if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) {
373 374 375
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
376

L
Liu Jicong 已提交
377
      if (tsDeployOnSnode) {
L
Liu Jicong 已提交
378 379 380
        SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
        if (pSnode == NULL) {
          SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
381
          if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
382 383 384 385
            sdbRelease(pSdb, pVgroup);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
386
          sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
387
        } else {
388
          if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) {
L
Liu Jicong 已提交
389 390 391 392 393 394 395
            sdbRelease(pSdb, pSnode);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
        }
      } else {
        SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
396
        if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
397 398 399 400
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
401
        sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
402 403 404
      }
    }

L
Liu Jicong 已提交
405
    // source level
L
Liu Jicong 已提交
406 407 408
    SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
    taosArrayPush(pStream->tasks, &taskSourceLevel);

409 410
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
    SSubplan*      plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
411 412 413 414 415 416 417
    ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
S
Shengliang Guan 已提交
418
      if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
L
Liu Jicong 已提交
419 420 421
        sdbRelease(pSdb, pVgroup);
        continue;
      }
S
Shengliang Guan 已提交
422

L
Liu Jicong 已提交
423
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
424
      if (pTask == NULL) {
425 426 427 428 429
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
430
      pTask->fillHistory = pStream->fillHistory;
L
Liu Jicong 已提交
431 432
      mndAddTaskToTaskSet(taskSourceLevel, pTask);

L
Liu Jicong 已提交
433 434
      pTask->triggerParam = 0;

435
      // source
436
      pTask->taskLevel = TASK_LEVEL__SOURCE;
L
Liu Jicong 已提交
437

L
Liu Jicong 已提交
438 439
      // add fixed vg dispatch
      pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
440
      pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
L
Liu Jicong 已提交
441

L
Liu Jicong 已提交
442 443 444
      pTask->fixedEpDispatcher.taskId = pInnerTask->taskId;
      pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
      pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
L
Liu Jicong 已提交
445

446
      if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
447 448 449 450
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463 464

      SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
      if (pEpInfo == NULL) {
        ASSERT(0);
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
      pEpInfo->childId = pTask->selfChildId;
      pEpInfo->epSet = pTask->epSet;
      pEpInfo->nodeId = pTask->nodeId;
      pEpInfo->taskId = pTask->taskId;
      taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
465
      sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
466 467 468
    }
  }

L
Liu Jicong 已提交
469
  if (planTotLevel == 1) {
L
Liu Jicong 已提交
470 471 472
    SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
    taosArrayPush(pStream->tasks, &taskOneLevel);

473
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
474
    ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
475
    SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
476 477 478 479 480 481 482
    ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);

    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
S
Shengliang Guan 已提交
483
      if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
L
Liu Jicong 已提交
484 485 486
        sdbRelease(pSdb, pVgroup);
        continue;
      }
S
Shengliang Guan 已提交
487

L
Liu Jicong 已提交
488
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
489 490 491 492 493
      if (pTask == NULL) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
L
Liu Jicong 已提交
494
      pTask->fillHistory = pStream->fillHistory;
L
Liu Jicong 已提交
495 496
      mndAddTaskToTaskSet(taskOneLevel, pTask);

L
Liu Jicong 已提交
497
      // source
498
      pTask->taskLevel = TASK_LEVEL__SOURCE;
L
Liu Jicong 已提交
499

500 501 502
      // trigger
      pTask->triggerParam = pStream->triggerParam;

L
Liu Jicong 已提交
503 504
      // sink or dispatch
      if (hasExtraSink) {
505
        mndAddDispatcherToInnerTask(pMnode, pStream, pTask);
L
Liu Jicong 已提交
506
      } else {
507
        mndAddSinkToTask(pMnode, pStream, pTask);
L
Liu Jicong 已提交
508 509
      }

510
      if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
511 512 513 514
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
515
      sdbRelease(pSdb, pVgroup);
L
Liu Jicong 已提交
516 517
    }
  }
L
Liu Jicong 已提交
518
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
519 520
  return 0;
}
L
Liu Jicong 已提交
521 522

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
523 524
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
L
Liu Jicong 已提交
525 526
  SQueryPlan* pPlan = NULL;
  SSubplan*   plan = NULL;
L
Liu Jicong 已提交
527

L
Liu Jicong 已提交
528
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
529 530 531 532 533
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
    if (pPlan == NULL) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
534

L
Liu Jicong 已提交
535 536 537
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
    if (levelNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
538
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
539 540
      return -1;
    }
L
Liu Jicong 已提交
541

542
    SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
543

L
Liu Jicong 已提交
544 545 546
    int32_t opNum = LIST_LENGTH(inner->pNodeList);
    if (opNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
547
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
548 549
      return -1;
    }
550
    plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
551
  }
L
Liu Jicong 已提交
552

553 554
  ASSERT(pSub->unassignedVgs);
  ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
555

L
Liu Jicong 已提交
556 557 558 559
  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
S
Shengliang Guan 已提交
560
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
L
Liu Jicong 已提交
561 562 563 564 565 566
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;

567
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
L
Liu Jicong 已提交
568 569
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
    pVgEp->vgId = pVgroup->vgId;
570
    taosArrayPush(pSub->unassignedVgs, &pVgEp);
571

L
Liu Jicong 已提交
572
    mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
L
Liu Jicong 已提交
573

L
Liu Jicong 已提交
574
    if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
575 576 577 578 579 580 581 582 583 584 585 586 587
      int32_t msgLen;

      plan->execNode.epSet = pVgEp->epSet;
      plan->execNode.nodeId = pVgEp->vgId;

      if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        terrno = TSDB_CODE_QRY_INVALID_INPUT;
        return -1;
      }
    } else {
      pVgEp->qmsg = strdup("");
L
Liu Jicong 已提交
588
    }
L
Liu Jicong 已提交
589 590
  }

591
  ASSERT(pSub->unassignedVgs->size > 0);
L
Liu Jicong 已提交
592

593
  ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
L
Liu Jicong 已提交
594

X
Xiaoyu Wang 已提交
595
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
596

L
Liu Jicong 已提交
597 598
  return 0;
}