mndScheduler.c 9.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
L
Liu Jicong 已提交
23
#include "mndSnode.h"
L
Liu Jicong 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndStream.h"
L
Liu Jicong 已提交
26 27 28 29 30 31 32
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
33 34
#include "tuuid.h"

L
Liu Jicong 已提交
35 36
extern bool tsStreamSchedV;

L
Liu Jicong 已提交
37
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
L
Liu Jicong 已提交
38 39 40
  SCoder encoder;
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
  tEncodeSStreamTask(&encoder, pTask);
L
Liu Jicong 已提交
41 42
  int32_t size = encoder.pos;
  int32_t tlen = sizeof(SMsgHead) + size;
L
Liu Jicong 已提交
43
  tCoderClear(&encoder);
wafwerar's avatar
wafwerar 已提交
44
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
45 46 47 48
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
49
  ((SMsgHead*)buf)->vgId = htonl(nodeId);
L
Liu Jicong 已提交
50
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
51
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
L
Liu Jicong 已提交
52 53 54 55 56 57 58
  tEncodeSStreamTask(&encoder, pTask);
  tCoderClear(&encoder);

  STransAction action = {0};
  memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
59
  action.msgType = type;
L
Liu Jicong 已提交
60
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
61
    taosMemoryFree(buf);
L
Liu Jicong 已提交
62 63 64 65 66 67 68
    return -1;
  }
  return 0;
}

int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
  int32_t msgLen;
L
Liu Jicong 已提交
69 70 71
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

L
Liu Jicong 已提交
72
  plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
73
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
76 77 78
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
79
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
L
Liu Jicong 已提交
80 81 82
  return 0;
}

L
Liu Jicong 已提交
83 84 85 86 87 88
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
  SSnodeObj* pObj = NULL;
  pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
  return pObj;
}

L
Liu Jicong 已提交
89 90
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
                             const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
91
  int32_t msgLen;
L
Liu Jicong 已提交
92 93 94 95 96 97

  pTask->nodeId = 0;
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

  plan->execNode.nodeId = 0;
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
98

L
Liu Jicong 已提交
99
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
100 101 102
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
103
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
L
Liu Jicong 已提交
104 105 106
  return 0;
}

L
Liu Jicong 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

L
Liu Jicong 已提交
122
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
123 124 125 126 127 128 129 130
  SSdb*       pSdb = pMnode->pSdb;
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
  ASSERT(pStream->vgNum == 0);

L
Liu Jicong 已提交
131
  int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
132 133
  ASSERT(totLevel <= 2);
  pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
L
Liu Jicong 已提交
134

L
Liu Jicong 已提交
135 136 137 138
  for (int32_t level = 0; level < totLevel; level++) {
    SArray*        taskOneLevel = taosArrayInit(0, sizeof(void*));
    SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
    ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
L
Liu Jicong 已提交
139

L
Liu Jicong 已提交
140
    SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
141 142 143 144 145

    // if (level == totLevel - 1 /* or no snode */) {
    if (level == totLevel - 1) {
      // last level, source, must assign to vnode
      // must be scan type
L
Liu Jicong 已提交
146
      ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
L
Liu Jicong 已提交
147 148

      // replicate task to each vnode
L
Liu Jicong 已提交
149 150
      void* pIter = NULL;
      while (1) {
L
Liu Jicong 已提交
151
        SVgObj* pVgroup;
L
Liu Jicong 已提交
152 153 154 155 156 157
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
        if (pIter == NULL) break;
        if (pVgroup->dbUid != pStream->dbUid) {
          sdbRelease(pSdb, pVgroup);
          continue;
        }
L
Liu Jicong 已提交
158 159 160
        SStreamTask* pTask = tNewSStreamTask(pStream->uid);
        // source part
        pTask->sourceType = TASK_SOURCE__SCAN;
L
Liu Jicong 已提交
161

L
Liu Jicong 已提交
162 163 164 165 166 167 168 169
        // sink part
        if (level == 0) {
          // only for inplace
          pTask->sinkType = TASK_SINK__SHOW;
          pTask->showSink.reserved = 0;
        } else {
          pTask->sinkType = TASK_SINK__NONE;
        }
L
Liu Jicong 已提交
170

L
Liu Jicong 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
        // dispatch part
        if (level == 0) {
          pTask->dispatchType = TASK_DISPATCH__NONE;
          // if inplace sink, no dispatcher
          // if fixed ep, add fixed ep dispatcher
          // if shuffle, add shuffle dispatcher
        } else {
          // add fixed ep dispatcher
          int32_t lastLevel = level - 1;
          ASSERT(lastLevel == 0);
          SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
          // one merge only
          ASSERT(taosArrayGetSize(pArray) == 1);
          SStreamTask* lastLevelTask = taosArrayGetP(pArray, lastLevel);
          pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
          pTask->dispatchType = TASK_DISPATCH__FIXED;

          pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
          pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
        }

        // exec part
        pTask->execType = TASK_EXEC__PIPE;
L
Liu Jicong 已提交
194
        pTask->exec.parallelizable = 1;
L
Liu Jicong 已提交
195
        if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
196 197 198 199
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
L
Liu Jicong 已提交
200 201
        sdbRelease(pSdb, pVgroup);
        taosArrayPush(taskOneLevel, &pTask);
L
Liu Jicong 已提交
202
      }
L
Liu Jicong 已提交
203
    } else {
L
Liu Jicong 已提交
204 205 206 207 208 209
      // merge plan

      // TODO if has snode, assign to snode

      // else, assign to vnode
      ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
L
Liu Jicong 已提交
210
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248

      // source part, currently only support multi source
      pTask->sourceType = TASK_SOURCE__PIPE;

      // sink part
      pTask->sinkType = TASK_SINK__SHOW;
      /*pTask->sinkType = TASK_SINK__NONE;*/

      // dispatch part
      pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
      pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;

      // exec part
      pTask->execType = TASK_EXEC__MERGE;
      pTask->exec.parallelizable = 0;
      SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
      ASSERT(pVgroup);
      if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
      sdbRelease(pSdb, pVgroup);
      taosArrayPush(taskOneLevel, &pTask);
    }

    taosArrayPush(pStream->tasks, &taskOneLevel);
  }

  if (totLevel == 2) {
    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
      if (pVgroup->dbUid != pStream->dbUid) {
        sdbRelease(pSdb, pVgroup);
        continue;
L
Liu Jicong 已提交
249
      }
L
Liu Jicong 已提交
250
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
251

L
Liu Jicong 已提交
252 253 254 255 256 257 258 259 260 261 262 263
      // source part
      pTask->sourceType = TASK_SOURCE__MERGE;

      // sink part
      pTask->sinkType = TASK_SINK__SHOW;

      // dispatch part
      pTask->dispatchType = TASK_DISPATCH__NONE;

      // exec part
      pTask->execType = TASK_EXEC__NONE;
      pTask->exec.parallelizable = 0;
L
Liu Jicong 已提交
264 265
    }
  }
L
Liu Jicong 已提交
266 267

  // free memory
L
Liu Jicong 已提交
268
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
269

L
Liu Jicong 已提交
270 271
  return 0;
}
L
Liu Jicong 已提交
272 273

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
274 275
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
X
Xiaoyu Wang 已提交
276
  SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
X
Xiaoyu Wang 已提交
277
  if (pPlan == NULL) {
L
Liu Jicong 已提交
278 279 280
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
281 282 283

  ASSERT(pSub->vgNum == 0);

X
Xiaoyu Wang 已提交
284
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
285
  if (levelNum != 1) {
X
Xiaoyu Wang 已提交
286
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
287
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
288 289 290
    return -1;
  }

X
Xiaoyu Wang 已提交
291
  SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
292

X
Xiaoyu Wang 已提交
293
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
L
Liu Jicong 已提交
294
  if (opNum != 1) {
X
Xiaoyu Wang 已提交
295
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
296
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
297 298
    return -1;
  }
X
Xiaoyu Wang 已提交
299
  SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
300 301 302 303 304 305 306 307 308 309 310 311

  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;
    plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
312
    plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
313 314 315 316

    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
317
    consumerEp.epSet = plan->execNode.epSet;
L
Liu Jicong 已提交
318 319
    consumerEp.vgId = plan->execNode.nodeId;
    int32_t msgLen;
L
Liu Jicong 已提交
320 321
    if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
      sdbRelease(pSdb, pVgroup);
X
Xiaoyu Wang 已提交
322
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
323 324 325 326
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
    taosArrayPush(pSub->unassignedVg, &consumerEp);
L
Liu Jicong 已提交
327 328
  }

X
Xiaoyu Wang 已提交
329
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
330

L
Liu Jicong 已提交
331 332
  return 0;
}