mndScheduler.c 15.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
L
Liu Jicong 已提交
23
#include "mndSnode.h"
L
Liu Jicong 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndStream.h"
L
Liu Jicong 已提交
26 27 28 29 30 31 32
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
33 34
#include "tuuid.h"

L
Liu Jicong 已提交
35 36
extern bool tsStreamSchedV;

L
Liu Jicong 已提交
37
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
L
Liu Jicong 已提交
38 39 40
  SCoder encoder;
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
  tEncodeSStreamTask(&encoder, pTask);
L
Liu Jicong 已提交
41 42
  int32_t size = encoder.pos;
  int32_t tlen = sizeof(SMsgHead) + size;
L
Liu Jicong 已提交
43
  tCoderClear(&encoder);
wafwerar's avatar
wafwerar 已提交
44
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
45 46 47 48
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
49
  ((SMsgHead*)buf)->vgId = htonl(nodeId);
L
Liu Jicong 已提交
50
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
51
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
L
Liu Jicong 已提交
52 53 54 55 56 57 58
  tEncodeSStreamTask(&encoder, pTask);
  tCoderClear(&encoder);

  STransAction action = {0};
  memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
59
  action.msgType = type;
L
Liu Jicong 已提交
60
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
61
    taosMemoryFree(buf);
L
Liu Jicong 已提交
62 63 64 65 66 67 68
    return -1;
  }
  return 0;
}

int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
  int32_t msgLen;
L
Liu Jicong 已提交
69 70 71
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

L
Liu Jicong 已提交
72
  plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
73
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
76 77 78
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
79
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
L
Liu Jicong 已提交
80 81 82
  return 0;
}

L
Liu Jicong 已提交
83 84 85 86 87 88
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
  SSnodeObj* pObj = NULL;
  pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
  return pObj;
}

L
Liu Jicong 已提交
89 90
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
                             const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
91
  int32_t msgLen;
L
Liu Jicong 已提交
92 93 94 95 96 97

  pTask->nodeId = 0;
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

  plan->execNode.nodeId = 0;
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
98

L
Liu Jicong 已提交
99
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
100 101 102
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
103
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
L
Liu Jicong 已提交
104 105 106
  return 0;
}

L
Liu Jicong 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

L
Liu Jicong 已提交
122
int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
  SSdb*   pSdb = pMnode->pSdb;
  void*   pIter = NULL;
  SArray* tasks = taosArrayGetP(pStream->tasks, 0);

  ASSERT(taosArrayGetSize(pStream->tasks) == 1);

  while (1) {
    SVgObj* pVgroup;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pStream->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }
    SStreamTask* pTask = tNewSStreamTask(pStream->uid);
    if (pTask == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
    taosArrayPush(tasks, &pTask);

    pTask->nodeId = pVgroup->vgId;
    pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

    // source
    pTask->sourceType = TASK_SOURCE__MERGE;

    // exec
    pTask->execType = TASK_EXEC__NONE;

    // sink
L
Liu Jicong 已提交
154
    if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
L
Liu Jicong 已提交
155
      pTask->sinkType = TASK_SINK__SMA;
L
Liu Jicong 已提交
156
      pTask->smaSink.smaId = pStream->smaId;
L
Liu Jicong 已提交
157 158 159 160 161 162 163 164 165 166 167 168
    } else {
      pTask->sinkType = TASK_SINK__TABLE;
    }

    // dispatch
    pTask->dispatchType = TASK_DISPATCH__NONE;

    mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
  }
  return 0;
}

L
Liu Jicong 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
  ASSERT(pStream->fixedSinkVgId != 0);
  SArray*      tasks = taosArrayGetP(pStream->tasks, 0);
  SStreamTask* pTask = tNewSStreamTask(pStream->uid);
  if (pTask == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  taosArrayPush(tasks, &pTask);

  pTask->nodeId = pStream->fixedSinkVgId;
  SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
  if (pVgroup == NULL) {
    return -1;
  }
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
  // source
  pTask->sourceType = TASK_SOURCE__MERGE;

  // exec
  pTask->execType = TASK_EXEC__NONE;

  // sink
  if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
    pTask->sinkType = TASK_SINK__SMA;
    pTask->smaSink.smaId = pStream->smaId;
  } else {
    pTask->sinkType = TASK_SINK__TABLE;
  }
  //
  // dispatch
  pTask->dispatchType = TASK_DISPATCH__NONE;

  mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);

  return 0;
}

int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
208 209 210 211 212 213 214 215
  SSdb*       pSdb = pMnode->pSdb;
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
  ASSERT(pStream->vgNum == 0);

L
Liu Jicong 已提交
216
  int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
217 218
  ASSERT(totLevel <= 2);
  pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
L
Liu Jicong 已提交
219

L
Liu Jicong 已提交
220 221 222 223 224 225
  bool hasExtraSink = false;
  if (totLevel == 2) {
    SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
    taosArrayPush(pStream->tasks, &taskOneLevel);
    // add extra sink
    hasExtraSink = true;
L
Liu Jicong 已提交
226 227 228 229 230
    if (pStream->fixedSinkVgId == 0) {
      mndAddShuffledSinkToStream(pMnode, pTrans, pStream);
    } else {
      mndAddFixedSinkToStream(pMnode, pTrans, pStream);
    }
L
Liu Jicong 已提交
231 232
  }

L
Liu Jicong 已提交
233 234 235 236
  for (int32_t level = 0; level < totLevel; level++) {
    SArray*        taskOneLevel = taosArrayInit(0, sizeof(void*));
    SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
    ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
L
Liu Jicong 已提交
237

L
Liu Jicong 已提交
238
    SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
239 240 241 242 243

    // if (level == totLevel - 1 /* or no snode */) {
    if (level == totLevel - 1) {
      // last level, source, must assign to vnode
      // must be scan type
L
Liu Jicong 已提交
244
      ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
L
Liu Jicong 已提交
245 246

      // replicate task to each vnode
L
Liu Jicong 已提交
247 248
      void* pIter = NULL;
      while (1) {
L
Liu Jicong 已提交
249
        SVgObj* pVgroup;
L
Liu Jicong 已提交
250 251 252 253 254 255
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
        if (pIter == NULL) break;
        if (pVgroup->dbUid != pStream->dbUid) {
          sdbRelease(pSdb, pVgroup);
          continue;
        }
L
Liu Jicong 已提交
256 257 258
        SStreamTask* pTask = tNewSStreamTask(pStream->uid);
        // source part
        pTask->sourceType = TASK_SOURCE__SCAN;
L
Liu Jicong 已提交
259

L
Liu Jicong 已提交
260 261 262 263 264
        // sink part
        if (level == 0) {
          // only for inplace
          pTask->sinkType = TASK_SINK__SHOW;
          pTask->showSink.reserved = 0;
L
Liu Jicong 已提交
265
          if (!hasExtraSink) {
L
Liu Jicong 已提交
266 267
#if 1
            if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
L
Liu Jicong 已提交
268
              pTask->sinkType = TASK_SINK__SMA;
L
Liu Jicong 已提交
269
              pTask->smaSink.smaId = pStream->smaId;
L
Liu Jicong 已提交
270 271 272
            } else {
              pTask->sinkType = TASK_SINK__TABLE;
            }
L
Liu Jicong 已提交
273
#endif
L
Liu Jicong 已提交
274
          }
L
Liu Jicong 已提交
275 276 277
        } else {
          pTask->sinkType = TASK_SINK__NONE;
        }
L
Liu Jicong 已提交
278

L
Liu Jicong 已提交
279 280 281 282 283 284 285
        // dispatch part
        if (level == 0) {
          pTask->dispatchType = TASK_DISPATCH__NONE;
        } else {
          // add fixed ep dispatcher
          int32_t lastLevel = level - 1;
          ASSERT(lastLevel == 0);
L
Liu Jicong 已提交
286
          if (hasExtraSink) lastLevel++;
L
Liu Jicong 已提交
287 288 289
          SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
          // one merge only
          ASSERT(taosArrayGetSize(pArray) == 1);
L
Liu Jicong 已提交
290
          SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
L
Liu Jicong 已提交
291 292 293
          pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
          pTask->dispatchType = TASK_DISPATCH__FIXED;

L
Liu Jicong 已提交
294
          pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
L
Liu Jicong 已提交
295 296 297 298 299 300
          pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
          pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
        }

        // exec part
        pTask->execType = TASK_EXEC__PIPE;
L
Liu Jicong 已提交
301
        pTask->exec.parallelizable = 1;
L
Liu Jicong 已提交
302
        if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
303 304 305 306
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
L
Liu Jicong 已提交
307 308
        sdbRelease(pSdb, pVgroup);
        taosArrayPush(taskOneLevel, &pTask);
L
Liu Jicong 已提交
309
      }
L
Liu Jicong 已提交
310
    } else {
L
Liu Jicong 已提交
311 312 313 314 315 316
      // merge plan

      // TODO if has snode, assign to snode

      // else, assign to vnode
      ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
L
Liu Jicong 已提交
317
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
318 319 320 321 322 323 324 325 326

      // source part, currently only support multi source
      pTask->sourceType = TASK_SOURCE__PIPE;

      // sink part
      pTask->sinkType = TASK_SINK__SHOW;
      /*pTask->sinkType = TASK_SINK__NONE;*/

      // dispatch part
L
Liu Jicong 已提交
327 328 329 330 331 332
      ASSERT(hasExtraSink);
      /*pTask->dispatchType = TASK_DISPATCH__NONE;*/
#if 1

      if (hasExtraSink) {
        // add dispatcher
L
Liu Jicong 已提交
333 334 335 336 337 338 339 340 341 342 343
        if (pStream->fixedSinkVgId == 0) {
          pTask->dispatchType = TASK_DISPATCH__SHUFFLE;

          pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
          SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
          ASSERT(pDb);
          if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
            sdbRelease(pSdb, pDb);
            qDestroyQueryPlan(pPlan);
            return -1;
          }
L
Liu Jicong 已提交
344
          sdbRelease(pSdb, pDb);
L
Liu Jicong 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361

          // put taskId to useDbRsp
          // TODO: optimize
          SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
          int32_t sz = taosArrayGetSize(pVgs);
          SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
          int32_t sinkLvSize = taosArrayGetSize(sinkLv);
          for (int32_t i = 0; i < sz; i++) {
            SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
            for (int32_t j = 0; j < sinkLvSize; j++) {
              SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
              /*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
              if (pLastLevelTask->nodeId == pVgInfo->vgId) {
                pVgInfo->taskId = pLastLevelTask->taskId;
                /*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
                break;
              }
L
Liu Jicong 已提交
362 363
            }
          }
L
Liu Jicong 已提交
364 365 366 367 368 369 370 371 372 373
        } else {
          pTask->dispatchType = TASK_DISPATCH__FIXED;
          pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
          SArray* pArray = taosArrayGetP(pStream->tasks, 0);
          // one sink only
          ASSERT(taosArrayGetSize(pArray) == 1);
          SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
          pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
          pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
          pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
L
Liu Jicong 已提交
374
        }
L
Liu Jicong 已提交
375 376
      }
#endif
L
Liu Jicong 已提交
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403

      // exec part
      pTask->execType = TASK_EXEC__MERGE;
      pTask->exec.parallelizable = 0;
      SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
      ASSERT(pVgroup);
      if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
      sdbRelease(pSdb, pVgroup);
      taosArrayPush(taskOneLevel, &pTask);
    }

    taosArrayPush(pStream->tasks, &taskOneLevel);
  }

  if (totLevel == 2) {
    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
      if (pVgroup->dbUid != pStream->dbUid) {
        sdbRelease(pSdb, pVgroup);
        continue;
L
Liu Jicong 已提交
404
      }
L
Liu Jicong 已提交
405
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
406

L
Liu Jicong 已提交
407 408 409 410 411 412 413 414 415 416 417 418
      // source part
      pTask->sourceType = TASK_SOURCE__MERGE;

      // sink part
      pTask->sinkType = TASK_SINK__SHOW;

      // dispatch part
      pTask->dispatchType = TASK_DISPATCH__NONE;

      // exec part
      pTask->execType = TASK_EXEC__NONE;
      pTask->exec.parallelizable = 0;
L
Liu Jicong 已提交
419 420
    }
  }
L
Liu Jicong 已提交
421 422

  // free memory
L
Liu Jicong 已提交
423
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
424

L
Liu Jicong 已提交
425 426
  return 0;
}
L
Liu Jicong 已提交
427 428

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
429 430
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
X
Xiaoyu Wang 已提交
431
  SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
X
Xiaoyu Wang 已提交
432
  if (pPlan == NULL) {
L
Liu Jicong 已提交
433 434 435
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
436

L
Liu Jicong 已提交
437 438 439
  ASSERT(pSub->vgNum == -1);

  pSub->vgNum = 0;
L
Liu Jicong 已提交
440

X
Xiaoyu Wang 已提交
441
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
442
  if (levelNum != 1) {
X
Xiaoyu Wang 已提交
443
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
444
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
445 446 447
    return -1;
  }

X
Xiaoyu Wang 已提交
448
  SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
449

X
Xiaoyu Wang 已提交
450
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
L
Liu Jicong 已提交
451
  if (opNum != 1) {
X
Xiaoyu Wang 已提交
452
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
453
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
454 455
    return -1;
  }
X
Xiaoyu Wang 已提交
456
  SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
457

458 459
  int64_t             unexistKey = -1;
  SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
L
Liu Jicong 已提交
460 461 462
  ASSERT(pEpInSub);

  ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
463

L
Liu Jicong 已提交
464 465 466 467 468 469 470 471 472 473 474
  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;
    plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
475
    plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
476

477 478 479 480 481
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
    pVgEp->epSet = plan->execNode.epSet;
    pVgEp->vgId = plan->execNode.nodeId;

#if 0
L
Liu Jicong 已提交
482 483 484
    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
485
    consumerEp.epSet = plan->execNode.epSet;
L
Liu Jicong 已提交
486
    consumerEp.vgId = plan->execNode.nodeId;
487 488 489
#endif

    mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId);
L
Liu Jicong 已提交
490

L
Liu Jicong 已提交
491
    int32_t msgLen;
492
    if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
493
      sdbRelease(pSdb, pVgroup);
X
Xiaoyu Wang 已提交
494
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
495 496 497
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
498 499
    taosArrayPush(pEpInSub->vgs, &pVgEp);

L
Liu Jicong 已提交
500 501
    ASSERT(taosHashGetSize(pSub->consumerHash) == 1);

502
    /*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
L
Liu Jicong 已提交
503 504
  }

L
Liu Jicong 已提交
505 506 507 508 509 510 511
  ASSERT(pEpInSub->vgs->size > 0);
  pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));

  ASSERT(pEpInSub->vgs->size > 0);

  ASSERT(taosHashGetSize(pSub->consumerHash) == 1);

X
Xiaoyu Wang 已提交
512
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
513

L
Liu Jicong 已提交
514 515
  return 0;
}