mndScheduler.c 7.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
L
Liu Jicong 已提交
23
#include "mndSnode.h"
L
Liu Jicong 已提交
24
#include "mndStb.h"
L
Liu Jicong 已提交
25
#include "mndStream.h"
L
Liu Jicong 已提交
26 27 28 29 30 31 32
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
L
Liu Jicong 已提交
33 34
#include "tuuid.h"

L
Liu Jicong 已提交
35 36
extern bool tsStreamSchedV;

L
Liu Jicong 已提交
37
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
L
Liu Jicong 已提交
38 39 40
  SCoder encoder;
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
  tEncodeSStreamTask(&encoder, pTask);
L
Liu Jicong 已提交
41 42
  int32_t size = encoder.pos;
  int32_t tlen = sizeof(SMsgHead) + size;
L
Liu Jicong 已提交
43 44 45 46 47 48
  tCoderClear(&encoder);
  void* buf = malloc(tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
49
  ((SMsgHead*)buf)->vgId = htonl(nodeId);
L
Liu Jicong 已提交
50
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
51
  tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
L
Liu Jicong 已提交
52 53 54 55 56 57 58
  tEncodeSStreamTask(&encoder, pTask);
  tCoderClear(&encoder);

  STransAction action = {0};
  memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
59
  action.msgType = type;
L
Liu Jicong 已提交
60
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
L
Liu Jicong 已提交
61
    free(buf);
L
Liu Jicong 已提交
62 63 64 65 66 67 68 69 70 71
    return -1;
  }
  return 0;
}

int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
  int32_t msgLen;
  plan->execNode.nodeId = pVgroup->vgId;
  plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);

L
Liu Jicong 已提交
72
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
73 74 75
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
76
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
L
Liu Jicong 已提交
77 78 79
  return 0;
}

L
Liu Jicong 已提交
80 81 82 83 84 85
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
  SSnodeObj* pObj = NULL;
  pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
  return pObj;
}

L
Liu Jicong 已提交
86 87
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
                             const SSnodeObj* pSnode) {
L
Liu Jicong 已提交
88 89 90 91
  int32_t msgLen;
  plan->execNode.nodeId = pSnode->id;
  plan->execNode.epSet = mndAcquireEpFromSnode(pMnode, pSnode);

L
Liu Jicong 已提交
92
  if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
L
Liu Jicong 已提交
93 94 95
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
96
  mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
L
Liu Jicong 已提交
97 98 99
  return 0;
}

L
Liu Jicong 已提交
100
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
L
Liu Jicong 已提交
101 102 103 104 105 106 107 108 109
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
  if (pPlan == NULL) {
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
  ASSERT(pStream->vgNum == 0);

L
Liu Jicong 已提交
110 111
  int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
  pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
L
Liu Jicong 已提交
112
  int32_t lastUsedVgId = 0;
L
Liu Jicong 已提交
113

L
Liu Jicong 已提交
114 115 116 117 118 119 120
  // gather vnodes
  // gather snodes
  // iterate plan, expand source to vnodes and assign ep to each task
  // iterate tasks, assign sink type and sink ep to each task

  for (int32_t revLevel = totLevel - 1; revLevel >= 0; revLevel--) {
    int32_t        level = totLevel - 1 - revLevel;
L
Liu Jicong 已提交
121
    SArray*        taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
L
Liu Jicong 已提交
122
    SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, revLevel);
L
Liu Jicong 已提交
123 124 125
    int32_t        opNum = LIST_LENGTH(inner->pNodeList);
    ASSERT(opNum == 1);

L
Liu Jicong 已提交
126
    SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
127
    if (level == 0) {
L
Liu Jicong 已提交
128
      ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
L
Liu Jicong 已提交
129 130 131 132 133 134 135 136 137
      void* pIter = NULL;
      while (1) {
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
        if (pIter == NULL) break;
        if (pVgroup->dbUid != pStream->dbUid) {
          sdbRelease(pSdb, pVgroup);
          continue;
        }

L
Liu Jicong 已提交
138
        lastUsedVgId = pVgroup->vgId;
L
Liu Jicong 已提交
139
        pStream->vgNum++;
L
Liu Jicong 已提交
140

L
Liu Jicong 已提交
141 142 143 144 145 146
        SStreamTask* pTask = tNewSStreamTask(pStream->uid);
        /*pTask->level = level;*/
        // TODO
        /*pTask->sourceType = STREAM_SOURCE__SUPER;*/
        /*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/
        pTask->exec.parallelizable = 1;
L
Liu Jicong 已提交
147
        if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
L
Liu Jicong 已提交
148 149 150 151 152
          sdbRelease(pSdb, pVgroup);
          qDestroyQueryPlan(pPlan);
          return -1;
        }
        taosArrayPush(taskOneLevel, pTask);
L
Liu Jicong 已提交
153
      }
L
Liu Jicong 已提交
154
    } else {
L
Liu Jicong 已提交
155 156 157 158 159
      SStreamTask* pTask = tNewSStreamTask(pStream->uid);
      /*pTask->level = level;*/
      /*pTask->sourceType = STREAM_SOURCE__NONE;*/
      /*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/
      pTask->exec.parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
L
Liu Jicong 已提交
160

L
Liu Jicong 已提交
161 162
      SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
      if (pSnode == NULL || tsStreamSchedV) {
L
Liu Jicong 已提交
163 164 165 166
        ASSERT(lastUsedVgId != 0);
        SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId);
        if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) {
          sdbRelease(pSdb, pVg);
L
Liu Jicong 已提交
167 168 169
          qDestroyQueryPlan(pPlan);
          return -1;
        }
L
Liu Jicong 已提交
170
        sdbRelease(pSdb, pVg);
L
Liu Jicong 已提交
171
      } else {
L
Liu Jicong 已提交
172 173 174 175
        if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) {
          sdbRelease(pSdb, pSnode);
          qDestroyQueryPlan(pPlan);
          return -1;
L
Liu Jicong 已提交
176
        }
L
Liu Jicong 已提交
177
      }
L
Liu Jicong 已提交
178
      sdbRelease(pMnode->pSdb, pSnode);
L
Liu Jicong 已提交
179

L
Liu Jicong 已提交
180
      taosArrayPush(taskOneLevel, pTask);
L
Liu Jicong 已提交
181 182 183
    }
    taosArrayPush(pStream->tasks, taskOneLevel);
  }
L
Liu Jicong 已提交
184
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
185 186
  return 0;
}
L
Liu Jicong 已提交
187 188

int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
189 190
  SSdb*       pSdb = pMnode->pSdb;
  SVgObj*     pVgroup = NULL;
X
Xiaoyu Wang 已提交
191
  SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
X
Xiaoyu Wang 已提交
192
  if (pPlan == NULL) {
L
Liu Jicong 已提交
193 194 195
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
    return -1;
  }
L
Liu Jicong 已提交
196 197 198

  ASSERT(pSub->vgNum == 0);

X
Xiaoyu Wang 已提交
199
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
L
Liu Jicong 已提交
200
  if (levelNum != 1) {
X
Xiaoyu Wang 已提交
201
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
202
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
203 204 205
    return -1;
  }

X
Xiaoyu Wang 已提交
206
  SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
L
Liu Jicong 已提交
207

X
Xiaoyu Wang 已提交
208
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
L
Liu Jicong 已提交
209
  if (opNum != 1) {
X
Xiaoyu Wang 已提交
210
    qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
211
    terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
L
Liu Jicong 已提交
212 213
    return -1;
  }
X
Xiaoyu Wang 已提交
214
  SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
L
Liu Jicong 已提交
215 216 217 218 219 220 221 222 223 224 225 226

  void* pIter = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
    if (pIter == NULL) break;
    if (pVgroup->dbUid != pTopic->dbUid) {
      sdbRelease(pSdb, pVgroup);
      continue;
    }

    pSub->vgNum++;
    plan->execNode.nodeId = pVgroup->vgId;
L
Liu Jicong 已提交
227
    plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
L
Liu Jicong 已提交
228 229 230 231

    SMqConsumerEp consumerEp = {0};
    consumerEp.status = 0;
    consumerEp.consumerId = -1;
L
Liu Jicong 已提交
232
    consumerEp.epSet = plan->execNode.epSet;
L
Liu Jicong 已提交
233 234
    consumerEp.vgId = plan->execNode.nodeId;
    int32_t msgLen;
L
Liu Jicong 已提交
235 236
    if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
      sdbRelease(pSdb, pVgroup);
X
Xiaoyu Wang 已提交
237
      qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
238 239 240 241
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
    taosArrayPush(pSub->unassignedVg, &consumerEp);
L
Liu Jicong 已提交
242 243
  }

X
Xiaoyu Wang 已提交
244
  qDestroyQueryPlan(pPlan);
L
Liu Jicong 已提交
245

L
Liu Jicong 已提交
246 247
  return 0;
}