mndScheduler.c 16.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
L
Liu Jicong 已提交
23
#include "mndSnode.h"
L
Liu Jicong 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndStream.h"
L
Liu Jicong 已提交
26 27 28 29 30 31 32
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
33 34
#include "tuuid.h"

L
Liu Jicong 已提交
35 36
extern bool tsStreamSchedV;

L
Liu Jicong 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen) {
  SNode*      pAst = NULL;
  SQueryPlan* pPlan = NULL;
  terrno = TSDB_CODE_SUCCESS;

  if (nodesStringToNode(ast, &pAst) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

  SPlanContext cxt = {
      .pAstRoot = pAst,
      .topicQuery = false,
      .streamQuery = true,
      .rSmaQuery = true,
      .triggerType = triggerType,
      .watermark = watermark,
  };
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
  if (levelNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }
  SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);

  int32_t opNum = LIST_LENGTH(inner->pNodeList);
  if (opNum != 1) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

  SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
  if (qSubPlanToString(plan, pStr, pLen) < 0) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    goto END;
  }

END:
  if (pAst) nodesDestroyNode(pAst);
  if (pPlan) nodesDestroyNode(pPlan);
  return terrno;
}

L
Liu Jicong 已提交
85
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
H
Hongze Cheng 已提交
86 87
  SEncoder encoder;
  tEncoderInit(&encoder, NULL, 0);
L
Liu Jicong 已提交
88
  tEncodeSStreamTask(&encoder, pTask);
L
Liu Jicong 已提交
89 90
  int32_t size = encoder.pos;
  int32_t tlen = sizeof(SMsgHead) + size;
H
Hongze Cheng 已提交
91
  tEncoderClear(&encoder);
wafwerar's avatar
wafwerar 已提交
92
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
93 94 95 96
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
97
  ((SMsgHead*)buf)->vgId = htonl(nodeId);
L
Liu Jicong 已提交
98
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
H
Hongze Cheng 已提交
99
  tEncoderInit(&encoder, abuf, size);
L
Liu Jicong 已提交
100
  tEncodeSStreamTask(&encoder, pTask);
H
Hongze Cheng 已提交
101
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
102 103 104 105 106

  STransAction action = {0};
  memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
107
  action.msgType = type;
L
Liu Jicong 已提交
108
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
109
    taosMemoryFree(buf);
L
Liu Jicong 已提交
110 111 112 113 114 115 116
    return -1;
  }
  return 0;
}

int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
  int32_t msgLen;
L
Liu Jicong 已提交
117 118 119
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

L
Liu Jicong 已提交
120
  plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
121
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
122

L
Liu Jicong 已提交
123
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
124 125 126
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
127
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
L
Liu Jicong 已提交
128 129 130
  return 0;
}

L
Liu Jicong 已提交
131 132 133 134 135 136
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
  SSnodeObj* pObj = NULL;
  pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
  return pObj;
}

L
Liu Jicong 已提交
137 138
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
                             const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
139
  int32_t msgLen;
L
Liu Jicong 已提交
140 141 142 143 144 145

  pTask->nodeId = 0;
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

  plan->execNode.nodeId = 0;
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
146

L
Liu Jicong 已提交
147
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
148 149 150
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
151
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
L
Liu Jicong 已提交
152 153 154
  return 0;
}

L
Liu Jicong 已提交
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

L
Liu Jicong 已提交
170
int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
  SSdb*   pSdb = pMnode->pSdb;
  void*   pIter = NULL;
  SArray* tasks = taosArrayGetP(pStream->tasks, 0);

  ASSERT(taosArrayGetSize(pStream->tasks) == 1);

  while (1) {
    SVgObj* pVgroup;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pStream->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }
    SStreamTask* pTask = tNewSStreamTask(pStream->uid);
    if (pTask == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
    taosArrayPush(tasks, &pTask);

    pTask->nodeId = pVgroup->vgId;
    pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

    // source
    pTask->sourceType = TASK_SOURCE__MERGE;

    // exec
    pTask->execType = TASK_EXEC__NONE;

    // sink
L
Liu Jicong 已提交
202
    if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
L
Liu Jicong 已提交
203
      pTask->sinkType = TASK_SINK__SMA;
L
Liu Jicong 已提交
204
      pTask->smaSink.smaId = pStream->smaId;
L
Liu Jicong 已提交
205 206
    } else {
      pTask->sinkType = TASK_SINK__TABLE;
L
Liu Jicong 已提交
207
      pTask->tbSink.stbUid = pStream->targetStbUid;
L
Liu Jicong 已提交
208 209
      pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
      ASSERT(pTask->tbSink.pSchemaWrapper);
L
Liu Jicong 已提交
210 211 212 213 214 215 216 217 218 219
    }

    // dispatch
    pTask->dispatchType = TASK_DISPATCH__NONE;

    mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
  }
  return 0;
}

L
Liu Jicong 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
  ASSERT(pStream->fixedSinkVgId != 0);
  SArray*      tasks = taosArrayGetP(pStream->tasks, 0);
  SStreamTask* pTask = tNewSStreamTask(pStream->uid);
  if (pTask == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  taosArrayPush(tasks, &pTask);

  pTask->nodeId = pStream->fixedSinkVgId;
  SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
  if (pVgroup == NULL) {
    return -1;
  }
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
  // source
  pTask->sourceType = TASK_SOURCE__MERGE;

  // exec
  pTask->execType = TASK_EXEC__NONE;

  // sink
  if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
    pTask->sinkType = TASK_SINK__SMA;
    pTask->smaSink.smaId = pStream->smaId;
  } else {
    pTask->sinkType = TASK_SINK__TABLE;
L
Liu Jicong 已提交
248
    pTask->tbSink.stbUid = pStream->targetStbUid;
L
Liu Jicong 已提交
249
    pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
L
Liu Jicong 已提交
250
  }
L
Liu Jicong 已提交
251

L
Liu Jicong 已提交
252 253 254 255 256 257 258 259 260
  // dispatch
  pTask->dispatchType = TASK_DISPATCH__NONE;

  mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);

  return 0;
}

int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
261 262 263 264 265 266 267 268
  SSdb*       pSdb = pMnode->pSdb;
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
  ASSERT(pStream->vgNum == 0);

L
Liu Jicong 已提交
269
  int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
270 271
  ASSERT(totLevel <= 2);
  pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
L
Liu Jicong 已提交
272

L
Liu Jicong 已提交
273 274 275 276 277 278
  bool hasExtraSink = false;
  if (totLevel == 2) {
    SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
    taosArrayPush(pStream->tasks, &taskOneLevel);
    // add extra sink
    hasExtraSink = true;
L
Liu Jicong 已提交
279 280 281 282 283
    if (pStream->fixedSinkVgId == 0) {
      mndAddShuffledSinkToStream(pMnode, pTrans, pStream);
    } else {
      mndAddFixedSinkToStream(pMnode, pTrans, pStream);
    }
L
Liu Jicong 已提交
284 285
  }

L
Liu Jicong 已提交
286 287 288 289
  for (int32_t level = 0; level < totLevel; level++) {
    SArray*        taskOneLevel = taosArrayInit(0, sizeof(void*));
    SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
    ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
L
Liu Jicong 已提交
290

L
Liu Jicong 已提交
291
    SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
292 293 294 295 296

    // if (level == totLevel - 1 /* or no snode */) {
    if (level == totLevel - 1) {
      // last level, source, must assign to vnode
      // must be scan type
L
Liu Jicong 已提交
297
      ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
L
Liu Jicong 已提交
298 299

      // replicate task to each vnode
L
Liu Jicong 已提交
300 301
      void* pIter = NULL;
      while (1) {
L
Liu Jicong 已提交
302
        SVgObj* pVgroup;
L
Liu Jicong 已提交
303 304 305 306 307 308
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
        if (pIter == NULL) break;
        if (pVgroup->dbUid != pStream->dbUid) {
          sdbRelease(pSdb, pVgroup);
          continue;
        }
L
Liu Jicong 已提交
309 310 311
        SStreamTask* pTask = tNewSStreamTask(pStream->uid);
        // source part
        pTask->sourceType = TASK_SOURCE__SCAN;
L
Liu Jicong 已提交
312

L
Liu Jicong 已提交
313 314 315
        // sink part
        if (level == 0) {
          // only for inplace
L
Liu Jicong 已提交
316
          pTask->sinkType = TASK_SINK__NONE;
L
Liu Jicong 已提交
317
          if (!hasExtraSink) {
L
Liu Jicong 已提交
318 319
#if 1
            if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
L
Liu Jicong 已提交
320
              pTask->sinkType = TASK_SINK__SMA;
L
Liu Jicong 已提交
321
              pTask->smaSink.smaId = pStream->smaId;
L
Liu Jicong 已提交
322 323
            } else {
              pTask->sinkType = TASK_SINK__TABLE;
L
Liu Jicong 已提交
324
              pTask->tbSink.stbUid = pStream->targetStbUid;
L
Liu Jicong 已提交
325
              pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
L
Liu Jicong 已提交
326
            }
L
Liu Jicong 已提交
327
#endif
L
Liu Jicong 已提交
328
          }
L
Liu Jicong 已提交
329 330 331
        } else {
          pTask->sinkType = TASK_SINK__NONE;
        }
L
Liu Jicong 已提交
332

L
Liu Jicong 已提交
333 334 335 336 337 338 339
        // dispatch part
        if (level == 0) {
          pTask->dispatchType = TASK_DISPATCH__NONE;
        } else {
          // add fixed ep dispatcher
          int32_t lastLevel = level - 1;
          ASSERT(lastLevel == 0);
L
Liu Jicong 已提交
340
          if (hasExtraSink) lastLevel++;
L
Liu Jicong 已提交
341 342 343
          SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
          // one merge only
          ASSERT(taosArrayGetSize(pArray) == 1);
L
Liu Jicong 已提交
344
          SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
L
Liu Jicong 已提交
345 346 347
          pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
          pTask->dispatchType = TASK_DISPATCH__FIXED;

L
Liu Jicong 已提交
348
          pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
L
Liu Jicong 已提交
349 350 351 352 353 354
          pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
          pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
        }

        // exec part
        pTask->execType = TASK_EXEC__PIPE;
L
Liu Jicong 已提交
355
        pTask->exec.parallelizable = 1;
L
Liu Jicong 已提交
356
        if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
357 358 359 360
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
L
Liu Jicong 已提交
361 362
        sdbRelease(pSdb, pVgroup);
        taosArrayPush(taskOneLevel, &pTask);
L
Liu Jicong 已提交
363
      }
L
Liu Jicong 已提交
364
    } else {
L
Liu Jicong 已提交
365 366 367 368 369 370
      // merge plan

      // TODO if has snode, assign to snode

      // else, assign to vnode
      ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
L
Liu Jicong 已提交
371
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
372 373 374 375 376

      // source part, currently only support multi source
      pTask->sourceType = TASK_SOURCE__PIPE;

      // sink part
L
Liu Jicong 已提交
377
      pTask->sinkType = TASK_SINK__NONE;
L
Liu Jicong 已提交
378 379

      // dispatch part
L
Liu Jicong 已提交
380 381 382 383 384 385
      ASSERT(hasExtraSink);
      /*pTask->dispatchType = TASK_DISPATCH__NONE;*/
#if 1

      if (hasExtraSink) {
        // add dispatcher
L
Liu Jicong 已提交
386 387 388 389
        if (pStream->fixedSinkVgId == 0) {
          pTask->dispatchType = TASK_DISPATCH__SHUFFLE;

          pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
L
Liu Jicong 已提交
390
          SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb);
L
Liu Jicong 已提交
391 392 393 394 395 396
          ASSERT(pDb);
          if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
            sdbRelease(pSdb, pDb);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
L
Liu Jicong 已提交
397
          sdbRelease(pSdb, pDb);
L
Liu Jicong 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414

          // put taskId to useDbRsp
          // TODO: optimize
          SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
          int32_t sz = taosArrayGetSize(pVgs);
          SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
          int32_t sinkLvSize = taosArrayGetSize(sinkLv);
          for (int32_t i = 0; i < sz; i++) {
            SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
            for (int32_t j = 0; j < sinkLvSize; j++) {
              SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
              /*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
              if (pLastLevelTask->nodeId == pVgInfo->vgId) {
                pVgInfo->taskId = pLastLevelTask->taskId;
                /*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
                break;
              }
L
Liu Jicong 已提交
415 416
            }
          }
L
Liu Jicong 已提交
417 418 419 420 421 422 423 424 425 426
        } else {
          pTask->dispatchType = TASK_DISPATCH__FIXED;
          pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
          SArray* pArray = taosArrayGetP(pStream->tasks, 0);
          // one sink only
          ASSERT(taosArrayGetSize(pArray) == 1);
          SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
          pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
          pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
          pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
L
Liu Jicong 已提交
427
        }
L
Liu Jicong 已提交
428 429
      }
#endif
L
Liu Jicong 已提交
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456

      // exec part
      pTask->execType = TASK_EXEC__MERGE;
      pTask->exec.parallelizable = 0;
      SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
      ASSERT(pVgroup);
      if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
      sdbRelease(pSdb, pVgroup);
      taosArrayPush(taskOneLevel, &pTask);
    }

    taosArrayPush(pStream->tasks, &taskOneLevel);
  }

  if (totLevel == 2) {
    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
      if (pVgroup->dbUid != pStream->dbUid) {
        sdbRelease(pSdb, pVgroup);
        continue;
L
Liu Jicong 已提交
457
      }
L
Liu Jicong 已提交
458
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
459

L
Liu Jicong 已提交
460 461 462 463
      // source part
      pTask->sourceType = TASK_SOURCE__MERGE;

      // sink part
L
Liu Jicong 已提交
464
      pTask->sinkType = TASK_SINK__NONE;
L
Liu Jicong 已提交
465 466 467 468 469 470 471

      // dispatch part
      pTask->dispatchType = TASK_DISPATCH__NONE;

      // exec part
      pTask->execType = TASK_EXEC__NONE;
      pTask->exec.parallelizable = 0;
L
Liu Jicong 已提交
472 473
    }
  }
L
Liu Jicong 已提交
474 475

  // free memory
L
Liu Jicong 已提交
476
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
477

L
Liu Jicong 已提交
478 479
  return 0;
}
L
Liu Jicong 已提交
480 481

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
482 483
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
L
Liu Jicong 已提交
484 485
  SQueryPlan* pPlan = NULL;
  SSubplan*   plan = NULL;
L
Liu Jicong 已提交
486

L
Liu Jicong 已提交
487 488 489 490 491 492
  if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
    if (pPlan == NULL) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
L
Liu Jicong 已提交
493

L
Liu Jicong 已提交
494 495 496
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
    if (levelNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
497
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
498 499
      return -1;
    }
L
Liu Jicong 已提交
500

L
Liu Jicong 已提交
501
    SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
502

L
Liu Jicong 已提交
503 504 505
    int32_t opNum = LIST_LENGTH(inner->pNodeList);
    if (opNum != 1) {
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
506
      terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
L
Liu Jicong 已提交
507 508 509
      return -1;
    }
    plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
510
  }
L
Liu Jicong 已提交
511

512 513
  ASSERT(pSub->unassignedVgs);
  ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
514

L
Liu Jicong 已提交
515 516 517 518 519 520 521 522 523 524 525
  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;

526
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
L
Liu Jicong 已提交
527 528
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
    pVgEp->vgId = pVgroup->vgId;
529
    taosArrayPush(pSub->unassignedVgs, &pVgEp);
530

L
Liu Jicong 已提交
531
    mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
L
Liu Jicong 已提交
532

L
Liu Jicong 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546
    if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
      int32_t msgLen;

      plan->execNode.epSet = pVgEp->epSet;
      plan->execNode.nodeId = pVgEp->vgId;

      if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        terrno = TSDB_CODE_QRY_INVALID_INPUT;
        return -1;
      }
    } else {
      pVgEp->qmsg = strdup("");
L
Liu Jicong 已提交
547
    }
L
Liu Jicong 已提交
548 549
  }

550
  ASSERT(pSub->unassignedVgs->size > 0);
L
Liu Jicong 已提交
551

552
  ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
L
Liu Jicong 已提交
553

X
Xiaoyu Wang 已提交
554
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
555

L
Liu Jicong 已提交
556 557
  return 0;
}