mndScheduler.c 8.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
#include "mndStb.h"
L
Liu Jicong 已提交
24
#include "mndStream.h"
L
Liu Jicong 已提交
25 26 27 28 29 30 31
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
32 33
#include "tuuid.h"

L
Liu Jicong 已提交
34
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
35 36 37 38 39 40 41 42 43
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
  ASSERT(pStream->vgNum == 0);

L
Liu Jicong 已提交
44 45
  int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
  pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
L
Liu Jicong 已提交
46

L
Liu Jicong 已提交
47 48 49 50
  int32_t msgLen;
  for (int32_t level = 0; level < totLevel; level++) {
    SArray*        taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
    SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
L
Liu Jicong 已提交
51 52 53
    int32_t        opNum = LIST_LENGTH(inner->pNodeList);
    ASSERT(opNum == 1);

L
Liu Jicong 已提交
54 55
    SSubplan* plan = nodesListGetNode(inner->pNodeList, level);
    if (level == 0) {
L
Liu Jicong 已提交
56 57 58 59 60 61 62 63 64 65 66
      ASSERT(plan->type == SUBPLAN_TYPE_SCAN);
      void* pIter = NULL;
      while (1) {
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
        if (pIter == NULL) break;
        if (pVgroup->dbUid != pStream->dbUid) {
          sdbRelease(pSdb, pVgroup);
          continue;
        }

        pStream->vgNum++;
L
Liu Jicong 已提交
67 68 69 70
        // send to vnode

        SStreamTask* pTask = streamTaskNew(pStream->uid, level);

L
Liu Jicong 已提交
71 72
        plan->execNode.nodeId = pVgroup->vgId;
        plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
73 74 75 76 77 78 79
        if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          terrno = TSDB_CODE_QRY_INVALID_INPUT;
          return -1;
        }
        taosArrayPush(taskOneLevel, pTask);
L
Liu Jicong 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105

        SCoder encoder;
        tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
        tEncodeSStreamTask(&encoder, pTask);
        int32_t tlen = sizeof(SMsgHead) + encoder.pos;
        tCoderClear(&encoder);
        void* buf = rpcMallocCont(tlen);
        if (buf == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
        ((SMsgHead*)buf)->streamTaskId = pTask->taskId;
        void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
        tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
        tEncodeSStreamTask(&encoder, pTask);
        tCoderClear(&encoder);

        STransAction action = {0};
        action.epSet = plan->execNode.epSet;
        action.pCont = buf;
        action.contLen = tlen;
        action.msgType = TDMT_VND_TASK_DEPLOY;
        if (mndTransAppendRedoAction(pTrans, &action) != 0) {
          rpcFreeCont(buf);
          return -1;
        }
L
Liu Jicong 已提交
106 107 108 109 110 111 112 113 114 115
      }
    } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) {
      // duplicatable

      int32_t parallel = 0;
      // if no snode, parallel set to fetch thread num in vnode

      // if has snode, set to shared thread num in snode
      parallel = SND_SHARED_THREAD_NUM;

L
Liu Jicong 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129
      for (int32_t i = 0; i < parallel; i++) {
        SStreamTask* pTask = streamTaskNew(pStream->uid, level);

        // TODO:get snode id and ep
        plan->execNode.nodeId = pVgroup->vgId;
        plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);

        if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
          qDestroyQueryPlan(pPlan);
          terrno = TSDB_CODE_QRY_INVALID_INPUT;
          return -1;
        }

        taosArrayPush(taskOneLevel, pTask);
L
Liu Jicong 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155

        SCoder encoder;
        tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
        tEncodeSStreamTask(&encoder, pTask);
        int32_t tlen = sizeof(SMsgHead) + encoder.pos;
        tCoderClear(&encoder);
        void* buf = rpcMallocCont(tlen);
        if (buf == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
        ((SMsgHead*)buf)->streamTaskId = pTask->taskId;
        void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
        tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
        tEncodeSStreamTask(&encoder, pTask);
        tCoderClear(&encoder);

        STransAction action = {0};
        action.epSet = plan->execNode.epSet;
        action.pCont = buf;
        action.contLen = tlen;
        action.msgType = TDMT_SND_TASK_DEPLOY;
        if (mndTransAppendRedoAction(pTrans, &action) != 0) {
          rpcFreeCont(buf);
          return -1;
        }
L
Liu Jicong 已提交
156 157 158
      }
    } else {
      // not duplicatable
L
Liu Jicong 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
      SStreamTask* pTask = streamTaskNew(pStream->uid, level);

      // TODO:get snode id and ep
      plan->execNode.nodeId = pVgroup->vgId;
      plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);

      if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        terrno = TSDB_CODE_QRY_INVALID_INPUT;
        return -1;
      }
      taosArrayPush(taskOneLevel, pTask);
L
Liu Jicong 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197

      SCoder encoder;
      tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
      tEncodeSStreamTask(&encoder, pTask);
      int32_t tlen = sizeof(SMsgHead) + encoder.pos;
      tCoderClear(&encoder);
      void* buf = rpcMallocCont(tlen);
      if (buf == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
      ((SMsgHead*)buf)->streamTaskId = pTask->taskId;
      void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
      tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
      tEncodeSStreamTask(&encoder, pTask);
      tCoderClear(&encoder);

      STransAction action = {0};
      action.epSet = plan->execNode.epSet;
      action.pCont = buf;
      action.contLen = tlen;
      action.msgType = TDMT_SND_TASK_DEPLOY;
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        rpcFreeCont(buf);
        return -1;
      }
L
Liu Jicong 已提交
198 199 200 201 202
    }
    taosArrayPush(pStream->tasks, taskOneLevel);
  }
  return 0;
}
L
Liu Jicong 已提交
203 204

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
205 206
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
X
Xiaoyu Wang 已提交
207
  SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
X
Xiaoyu Wang 已提交
208
  if (pPlan == NULL) {
L
Liu Jicong 已提交
209 210 211
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
212 213 214

  ASSERT(pSub->vgNum == 0);

X
Xiaoyu Wang 已提交
215
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
216
  if (levelNum != 1) {
X
Xiaoyu Wang 已提交
217
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
218
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
219 220 221
    return -1;
  }

X
Xiaoyu Wang 已提交
222
  SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
223

X
Xiaoyu Wang 已提交
224
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
L
Liu Jicong 已提交
225
  if (opNum != 1) {
X
Xiaoyu Wang 已提交
226
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
227
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
228 229
    return -1;
  }
X
Xiaoyu Wang 已提交
230
  SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
231 232 233 234 235 236 237 238 239 240 241 242

  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;
    plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
243
    plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
244 245 246 247

    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
248
    consumerEp.epSet = plan->execNode.epSet;
L
Liu Jicong 已提交
249 250
    consumerEp.vgId = plan->execNode.nodeId;
    int32_t msgLen;
L
Liu Jicong 已提交
251 252
    if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
      sdbRelease(pSdb, pVgroup);
X
Xiaoyu Wang 已提交
253
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
254 255 256 257
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
    taosArrayPush(pSub->unassignedVg, &consumerEp);
L
Liu Jicong 已提交
258 259
  }

X
Xiaoyu Wang 已提交
260
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
261

L
Liu Jicong 已提交
262 263
  return 0;
}