mndScheduler.c 9.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
L
Liu Jicong 已提交
23
#include "mndSnode.h"
L
Liu Jicong 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndStream.h"
L
Liu Jicong 已提交
26 27 28 29 30 31 32
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
33 34
#include "tuuid.h"

L
Liu Jicong 已提交
35 36
extern bool tsStreamSchedV;

L
Liu Jicong 已提交
37
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
L
Liu Jicong 已提交
38 39 40
  SCoder encoder;
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
  tEncodeSStreamTask(&encoder, pTask);
L
Liu Jicong 已提交
41 42
  int32_t size = encoder.pos;
  int32_t tlen = sizeof(SMsgHead) + size;
L
Liu Jicong 已提交
43
  tCoderClear(&encoder);
wafwerar's avatar
wafwerar 已提交
44
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
45 46 47 48
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
49
  ((SMsgHead*)buf)->vgId = htonl(nodeId);
L
Liu Jicong 已提交
50
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
51
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
L
Liu Jicong 已提交
52 53 54 55 56 57 58
  tEncodeSStreamTask(&encoder, pTask);
  tCoderClear(&encoder);

  STransAction action = {0};
  memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
59
  action.msgType = type;
L
Liu Jicong 已提交
60
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
61
    taosMemoryFree(buf);
L
Liu Jicong 已提交
62 63 64 65 66 67 68
    return -1;
  }
  return 0;
}

int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
  int32_t msgLen;
L
Liu Jicong 已提交
69 70 71
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

L
Liu Jicong 已提交
72
  plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
73
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
76 77 78
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
79
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
L
Liu Jicong 已提交
80 81 82
  return 0;
}

L
Liu Jicong 已提交
83 84 85 86 87 88
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
  SSnodeObj* pObj = NULL;
  pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
  return pObj;
}

L
Liu Jicong 已提交
89 90
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
                             const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
91
  int32_t msgLen;
L
Liu Jicong 已提交
92 93 94 95 96 97

  pTask->nodeId = 0;
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

  plan->execNode.nodeId = 0;
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
98

L
Liu Jicong 已提交
99
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
100 101 102
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
103
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
L
Liu Jicong 已提交
104 105 106
  return 0;
}

L
Liu Jicong 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

L
Liu Jicong 已提交
122
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) {
L
Liu Jicong 已提交
123 124 125 126 127 128 129 130
  SSdb*       pSdb = pMnode->pSdb;
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
  ASSERT(pStream->vgNum == 0);

L
Liu Jicong 已提交
131
  int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
132 133
  ASSERT(totLevel <= 2);
  pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
L
Liu Jicong 已提交
134

L
Liu Jicong 已提交
135 136 137 138
  for (int32_t level = 0; level < totLevel; level++) {
    SArray*        taskOneLevel = taosArrayInit(0, sizeof(void*));
    SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
    ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
L
Liu Jicong 已提交
139

L
Liu Jicong 已提交
140
    SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
141 142 143 144 145

    // if (level == totLevel - 1 /* or no snode */) {
    if (level == totLevel - 1) {
      // last level, source, must assign to vnode
      // must be scan type
L
Liu Jicong 已提交
146
      ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
L
Liu Jicong 已提交
147 148

      // replicate task to each vnode
L
Liu Jicong 已提交
149 150
      void* pIter = NULL;
      while (1) {
L
Liu Jicong 已提交
151
        SVgObj* pVgroup;
L
Liu Jicong 已提交
152 153 154 155 156 157
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
        if (pIter == NULL) break;
        if (pVgroup->dbUid != pStream->dbUid) {
          sdbRelease(pSdb, pVgroup);
          continue;
        }
L
Liu Jicong 已提交
158 159 160
        SStreamTask* pTask = tNewSStreamTask(pStream->uid);
        // source part
        pTask->sourceType = TASK_SOURCE__SCAN;
L
Liu Jicong 已提交
161

L
Liu Jicong 已提交
162 163 164 165 166
        // sink part
        if (level == 0) {
          // only for inplace
          pTask->sinkType = TASK_SINK__SHOW;
          pTask->showSink.reserved = 0;
L
Liu Jicong 已提交
167 168 169 170
          if (smaId != -1) {
            pTask->sinkType = TASK_SINK__SMA;
            pTask->smaSink.smaId = smaId;
          }
L
Liu Jicong 已提交
171 172 173
        } else {
          pTask->sinkType = TASK_SINK__NONE;
        }
L
Liu Jicong 已提交
174

L
Liu Jicong 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
        // dispatch part
        if (level == 0) {
          pTask->dispatchType = TASK_DISPATCH__NONE;
          // if inplace sink, no dispatcher
          // if fixed ep, add fixed ep dispatcher
          // if shuffle, add shuffle dispatcher
        } else {
          // add fixed ep dispatcher
          int32_t lastLevel = level - 1;
          ASSERT(lastLevel == 0);
          SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
          // one merge only
          ASSERT(taosArrayGetSize(pArray) == 1);
          SStreamTask* lastLevelTask = taosArrayGetP(pArray, lastLevel);
          pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
          pTask->dispatchType = TASK_DISPATCH__FIXED;

L
Liu Jicong 已提交
192
          pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
L
Liu Jicong 已提交
193 194 195 196 197 198
          pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
          pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
        }

        // exec part
        pTask->execType = TASK_EXEC__PIPE;
L
Liu Jicong 已提交
199
        pTask->exec.parallelizable = 1;
L
Liu Jicong 已提交
200
        if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
201 202 203 204
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
L
Liu Jicong 已提交
205 206
        sdbRelease(pSdb, pVgroup);
        taosArrayPush(taskOneLevel, &pTask);
L
Liu Jicong 已提交
207
      }
L
Liu Jicong 已提交
208
    } else {
L
Liu Jicong 已提交
209 210 211 212 213 214
      // merge plan

      // TODO if has snode, assign to snode

      // else, assign to vnode
      ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
L
Liu Jicong 已提交
215
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253

      // source part, currently only support multi source
      pTask->sourceType = TASK_SOURCE__PIPE;

      // sink part
      pTask->sinkType = TASK_SINK__SHOW;
      /*pTask->sinkType = TASK_SINK__NONE;*/

      // dispatch part
      pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
      pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;

      // exec part
      pTask->execType = TASK_EXEC__MERGE;
      pTask->exec.parallelizable = 0;
      SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
      ASSERT(pVgroup);
      if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
      sdbRelease(pSdb, pVgroup);
      taosArrayPush(taskOneLevel, &pTask);
    }

    taosArrayPush(pStream->tasks, &taskOneLevel);
  }

  if (totLevel == 2) {
    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
      if (pVgroup->dbUid != pStream->dbUid) {
        sdbRelease(pSdb, pVgroup);
        continue;
L
Liu Jicong 已提交
254
      }
L
Liu Jicong 已提交
255
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
256

L
Liu Jicong 已提交
257 258 259 260 261 262 263 264 265 266 267 268
      // source part
      pTask->sourceType = TASK_SOURCE__MERGE;

      // sink part
      pTask->sinkType = TASK_SINK__SHOW;

      // dispatch part
      pTask->dispatchType = TASK_DISPATCH__NONE;

      // exec part
      pTask->execType = TASK_EXEC__NONE;
      pTask->exec.parallelizable = 0;
L
Liu Jicong 已提交
269 270
    }
  }
L
Liu Jicong 已提交
271 272

  // free memory
L
Liu Jicong 已提交
273
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
274

L
Liu Jicong 已提交
275 276
  return 0;
}
L
Liu Jicong 已提交
277 278

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
279 280
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
X
Xiaoyu Wang 已提交
281
  SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
X
Xiaoyu Wang 已提交
282
  if (pPlan == NULL) {
L
Liu Jicong 已提交
283 284 285
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
286 287 288

  ASSERT(pSub->vgNum == 0);

X
Xiaoyu Wang 已提交
289
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
290
  if (levelNum != 1) {
X
Xiaoyu Wang 已提交
291
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
292
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
293 294 295
    return -1;
  }

X
Xiaoyu Wang 已提交
296
  SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
297

X
Xiaoyu Wang 已提交
298
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
L
Liu Jicong 已提交
299
  if (opNum != 1) {
X
Xiaoyu Wang 已提交
300
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
301
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
302 303
    return -1;
  }
X
Xiaoyu Wang 已提交
304
  SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
305 306 307 308 309 310 311 312 313 314 315 316

  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;
    plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
317
    plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
318 319 320 321

    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
322
    consumerEp.epSet = plan->execNode.epSet;
L
Liu Jicong 已提交
323 324
    consumerEp.vgId = plan->execNode.nodeId;
    int32_t msgLen;
L
Liu Jicong 已提交
325 326
    if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
      sdbRelease(pSdb, pVgroup);
X
Xiaoyu Wang 已提交
327
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
328 329 330 331
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
    taosArrayPush(pSub->unassignedVg, &consumerEp);
L
Liu Jicong 已提交
332 333
  }

X
Xiaoyu Wang 已提交
334
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
335

L
Liu Jicong 已提交
336 337
  return 0;
}