mndScheduler.c 2.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
  SSdb*      pSdb = pMnode->pSdb;
  SVgObj*    pVgroup = NULL;
  SQueryDag* pDag = qStringToDag(pTopic->physicalPlan);
L
Liu Jicong 已提交
36 37 38 39
  if (pDag == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
40 41 42 43 44

  ASSERT(pSub->vgNum == 0);

  int32_t levelNum = taosArrayGetSize(pDag->pSubplans);
  if (levelNum != 1) {
L
Liu Jicong 已提交
45 46
    qDestroyQueryDag(pDag);
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
47 48 49
    return -1;
  }

L
Liu Jicong 已提交
50
  SArray* plans = taosArrayGet(pDag->pSubplans, 0);
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
  int32_t opNum = taosArrayGetSize(plans);
L
Liu Jicong 已提交
53
  if (opNum != 1) {
L
Liu Jicong 已提交
54 55
    qDestroyQueryDag(pDag);
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
56 57
    return -1;
  }
L
Liu Jicong 已提交
58
  SSubplan* plan = taosArrayGetP(plans, 0);
L
Liu Jicong 已提交
59 60 61 62 63 64 65 66 67 68 69 70

  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;
    plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
71
    plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
72 73 74 75

    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
76
    consumerEp.epSet = plan->execNode.epSet;
L
Liu Jicong 已提交
77 78
    consumerEp.vgId = plan->execNode.nodeId;
    int32_t msgLen;
L
Liu Jicong 已提交
79 80 81 82 83 84 85
    if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
      sdbRelease(pSdb, pVgroup);
      qDestroyQueryDag(pDag);
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
    taosArrayPush(pSub->unassignedVg, &consumerEp);
L
Liu Jicong 已提交
86 87
  }

L
Liu Jicong 已提交
88 89
  qDestroyQueryDag(pDag);

L
Liu Jicong 已提交
90 91
  return 0;
}