mndScheduler.c 10.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
L
Liu Jicong 已提交
23
#include "mndSnode.h"
L
Liu Jicong 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndStream.h"
L
Liu Jicong 已提交
26 27 28 29 30 31 32
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
33 34
#include "tuuid.h"

L
Liu Jicong 已提交
35 36
extern bool tsStreamSchedV;

L
Liu Jicong 已提交
37
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
L
Liu Jicong 已提交
38 39 40
  SCoder encoder;
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
  tEncodeSStreamTask(&encoder, pTask);
L
Liu Jicong 已提交
41 42
  int32_t size = encoder.pos;
  int32_t tlen = sizeof(SMsgHead) + size;
L
Liu Jicong 已提交
43
  tCoderClear(&encoder);
wafwerar's avatar
wafwerar 已提交
44
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
45 46 47 48
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
49
  ((SMsgHead*)buf)->vgId = htonl(nodeId);
L
Liu Jicong 已提交
50
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
51
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
L
Liu Jicong 已提交
52 53 54 55 56 57 58
  tEncodeSStreamTask(&encoder, pTask);
  tCoderClear(&encoder);

  STransAction action = {0};
  memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
59
  action.msgType = type;
L
Liu Jicong 已提交
60
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
wafwerar's avatar
wafwerar 已提交
61
    taosMemoryFree(buf);
L
Liu Jicong 已提交
62 63 64 65 66 67 68
    return -1;
  }
  return 0;
}

int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
  int32_t msgLen;
L
Liu Jicong 已提交
69 70 71
  pTask->nodeId = pVgroup->vgId;
  pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);

L
Liu Jicong 已提交
72
  plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
73
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
76 77 78
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
79
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
L
Liu Jicong 已提交
80 81 82
  return 0;
}

L
Liu Jicong 已提交
83 84 85 86 87 88
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
  SSnodeObj* pObj = NULL;
  pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
  return pObj;
}

L
Liu Jicong 已提交
89 90
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
                             const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
91
  int32_t msgLen;
L
Liu Jicong 已提交
92 93 94 95 96 97

  pTask->nodeId = 0;
  pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);

  plan->execNode.nodeId = 0;
  plan->execNode.epSet = pTask->epSet;
L
Liu Jicong 已提交
98

L
Liu Jicong 已提交
99
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
100 101 102
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
103
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
L
Liu Jicong 已提交
104 105 106
  return 0;
}

L
Liu Jicong 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
  void*   pIter = NULL;
  SVgObj* pVgroup = NULL;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != dbUid) {
      sdbRelease(pMnode->pSdb, pVgroup);
      continue;
    }
    return pVgroup;
  }
  return pVgroup;
}

L
Liu Jicong 已提交
122
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) {
L
Liu Jicong 已提交
123 124 125 126 127 128 129 130
  SSdb*       pSdb = pMnode->pSdb;
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
  ASSERT(pStream->vgNum == 0);

L
Liu Jicong 已提交
131
  int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
132 133
  ASSERT(totLevel <= 2);
  pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
L
Liu Jicong 已提交
134

L
Liu Jicong 已提交
135 136 137 138
  for (int32_t level = 0; level < totLevel; level++) {
    SArray*        taskOneLevel = taosArrayInit(0, sizeof(void*));
    SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
    ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
L
Liu Jicong 已提交
139

L
Liu Jicong 已提交
140
    SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
141 142 143 144 145

    // if (level == totLevel - 1 /* or no snode */) {
    if (level == totLevel - 1) {
      // last level, source, must assign to vnode
      // must be scan type
L
Liu Jicong 已提交
146
      ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
L
Liu Jicong 已提交
147 148

      // replicate task to each vnode
L
Liu Jicong 已提交
149 150
      void* pIter = NULL;
      while (1) {
L
Liu Jicong 已提交
151
        SVgObj* pVgroup;
L
Liu Jicong 已提交
152 153 154 155 156 157
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
        if (pIter == NULL) break;
        if (pVgroup->dbUid != pStream->dbUid) {
          sdbRelease(pSdb, pVgroup);
          continue;
        }
L
Liu Jicong 已提交
158 159 160
        SStreamTask* pTask = tNewSStreamTask(pStream->uid);
        // source part
        pTask->sourceType = TASK_SOURCE__SCAN;
L
Liu Jicong 已提交
161

L
Liu Jicong 已提交
162 163 164 165 166
        // sink part
        if (level == 0) {
          // only for inplace
          pTask->sinkType = TASK_SINK__SHOW;
          pTask->showSink.reserved = 0;
L
Liu Jicong 已提交
167 168 169 170
          if (smaId != -1) {
            pTask->sinkType = TASK_SINK__SMA;
            pTask->smaSink.smaId = smaId;
          }
L
Liu Jicong 已提交
171 172 173
        } else {
          pTask->sinkType = TASK_SINK__NONE;
        }
L
Liu Jicong 已提交
174

L
Liu Jicong 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
        // dispatch part
        if (level == 0) {
          pTask->dispatchType = TASK_DISPATCH__NONE;
          // if inplace sink, no dispatcher
          // if fixed ep, add fixed ep dispatcher
          // if shuffle, add shuffle dispatcher
        } else {
          // add fixed ep dispatcher
          int32_t lastLevel = level - 1;
          ASSERT(lastLevel == 0);
          SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
          // one merge only
          ASSERT(taosArrayGetSize(pArray) == 1);
          SStreamTask* lastLevelTask = taosArrayGetP(pArray, lastLevel);
          pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
          pTask->dispatchType = TASK_DISPATCH__FIXED;

L
Liu Jicong 已提交
192
          pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
L
Liu Jicong 已提交
193 194 195 196 197 198
          pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
          pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
        }

        // exec part
        pTask->execType = TASK_EXEC__PIPE;
L
Liu Jicong 已提交
199
        pTask->exec.parallelizable = 1;
L
Liu Jicong 已提交
200
        if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
201 202 203 204
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
L
Liu Jicong 已提交
205 206
        sdbRelease(pSdb, pVgroup);
        taosArrayPush(taskOneLevel, &pTask);
L
Liu Jicong 已提交
207
      }
L
Liu Jicong 已提交
208
    } else {
L
Liu Jicong 已提交
209 210 211 212 213 214
      // merge plan

      // TODO if has snode, assign to snode

      // else, assign to vnode
      ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
L
Liu Jicong 已提交
215
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
216 217 218 219 220 221 222 223 224

      // source part, currently only support multi source
      pTask->sourceType = TASK_SOURCE__PIPE;

      // sink part
      pTask->sinkType = TASK_SINK__SHOW;
      /*pTask->sinkType = TASK_SINK__NONE;*/

      // dispatch part
L
Liu Jicong 已提交
225 226
      pTask->dispatchType = TASK_DISPATCH__NONE;
#if 0
L
Liu Jicong 已提交
227 228
      pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
      pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
L
Liu Jicong 已提交
229 230 231 232 233 234 235 236 237
      SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
      ASSERT(pDb);
      if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
        sdbRelease(pSdb, pDb);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
      sdbRelease(pSdb, pDb);
#endif
L
Liu Jicong 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264

      // exec part
      pTask->execType = TASK_EXEC__MERGE;
      pTask->exec.parallelizable = 0;
      SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
      ASSERT(pVgroup);
      if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
        sdbRelease(pSdb, pVgroup);
        qDestroyQueryPlan(pPlan);
        return -1;
      }
      sdbRelease(pSdb, pVgroup);
      taosArrayPush(taskOneLevel, &pTask);
    }

    taosArrayPush(pStream->tasks, &taskOneLevel);
  }

  if (totLevel == 2) {
    void* pIter = NULL;
    while (1) {
      SVgObj* pVgroup;
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
      if (pIter == NULL) break;
      if (pVgroup->dbUid != pStream->dbUid) {
        sdbRelease(pSdb, pVgroup);
        continue;
L
Liu Jicong 已提交
265
      }
L
Liu Jicong 已提交
266
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
L
Liu Jicong 已提交
267

L
Liu Jicong 已提交
268 269 270 271 272 273 274 275 276 277 278 279
      // source part
      pTask->sourceType = TASK_SOURCE__MERGE;

      // sink part
      pTask->sinkType = TASK_SINK__SHOW;

      // dispatch part
      pTask->dispatchType = TASK_DISPATCH__NONE;

      // exec part
      pTask->execType = TASK_EXEC__NONE;
      pTask->exec.parallelizable = 0;
L
Liu Jicong 已提交
280 281
    }
  }
L
Liu Jicong 已提交
282 283

  // free memory
L
Liu Jicong 已提交
284
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
285

L
Liu Jicong 已提交
286 287
  return 0;
}
L
Liu Jicong 已提交
288 289

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
290 291
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
X
Xiaoyu Wang 已提交
292
  SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
X
Xiaoyu Wang 已提交
293
  if (pPlan == NULL) {
L
Liu Jicong 已提交
294 295 296
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
297 298 299

  ASSERT(pSub->vgNum == 0);

X
Xiaoyu Wang 已提交
300
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
301
  if (levelNum != 1) {
X
Xiaoyu Wang 已提交
302
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
303
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
304 305 306
    return -1;
  }

X
Xiaoyu Wang 已提交
307
  SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
308

X
Xiaoyu Wang 已提交
309
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
L
Liu Jicong 已提交
310
  if (opNum != 1) {
X
Xiaoyu Wang 已提交
311
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
312
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
313 314
    return -1;
  }
X
Xiaoyu Wang 已提交
315
  SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
316 317 318 319 320 321 322 323 324 325 326 327

  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;
    plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
328
    plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
329 330 331 332

    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
333
    consumerEp.epSet = plan->execNode.epSet;
L
Liu Jicong 已提交
334 335
    consumerEp.vgId = plan->execNode.nodeId;
    int32_t msgLen;
L
Liu Jicong 已提交
336 337
    if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
      sdbRelease(pSdb, pVgroup);
X
Xiaoyu Wang 已提交
338
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
339 340 341 342
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
    taosArrayPush(pSub->unassignedVg, &consumerEp);
L
Liu Jicong 已提交
343 344
  }

X
Xiaoyu Wang 已提交
345
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
346

L
Liu Jicong 已提交
347 348
  return 0;
}