mndSubscribe.c 40.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18
#include "mndConsumer.h"
L
Liu Jicong 已提交
19
#include "mndScheduler.h"
L
Liu Jicong 已提交
20 21 22 23 24 25 26
#include "mndShow.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

27
#define MND_SUBSCRIBE_VER_NUMBER   2
L
Liu Jicong 已提交
28 29
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
30
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34 35 36
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
37 38 39 40
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
41

42 43 44 45 46 47 48
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
49

50 51 52 53 54 55 56
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
57

L
Liu Jicong 已提交
58
int32_t mndInitSubscribe(SMnode *pMnode) {
59 60 61 62 63 64 65 66 67 68
  SSdbTable table = {
      .sdbType = SDB_SUBSCRIBE,
      .keyType = SDB_KEY_BINARY,
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
      .insertFp = (SdbInsertFp)mndSubActionInsert,
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
  };

L
Liu Jicong 已提交
69 70 71 72 73
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
74 75 76 77

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);

L
Liu Jicong 已提交
78 79 80
  return sdbSetTable(pMnode->pSdb, table);
}

81
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
82 83 84 85 86
  SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
87

L
Liu Jicong 已提交
88
  pSub->dbUid = pTopic->dbUid;
L
Liu Jicong 已提交
89
  pSub->stbUid = pTopic->stbUid;
L
Liu Jicong 已提交
90
  pSub->subType = pTopic->subType;
L
Liu Jicong 已提交
91
  pSub->withMeta = pTopic->withMeta;
L
Liu Jicong 已提交
92

93 94 95 96 97 98 99 100 101
  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    tDeleteSubscribeObj(pSub);
    taosMemoryFree(pSub);
    return NULL;
  }

  return pSub;
}

102 103
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
                                    const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
104 105 106 107
  SMqRebVgReq req = {0};
  req.oldConsumerId = pRebVg->oldConsumerId;
  req.newConsumerId = pRebVg->newConsumerId;
  req.vgId = pRebVg->pVgEp->vgId;
108 109 110 111 112 113 114 115 116 117 118
  if(pPlan){
    pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
    pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
    int32_t msgLen;
    if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) {
      terrno = TSDB_CODE_QRY_INVALID_INPUT;
      return -1;
    }
  }else{
    req.qmsg = taosStrdup("");
  }
L
Liu Jicong 已提交
119
  req.subType = pSub->subType;
L
Liu Jicong 已提交
120
  req.withMeta = pSub->withMeta;
L
Liu Jicong 已提交
121
  req.suid = pSub->stbUid;
S
Shengliang Guan 已提交
122
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
123

124 125 126 127
  int32_t tlen = 0;
  int32_t ret = 0;
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret);
  if (ret < 0) {
128
    taosMemoryFree(req.qmsg);
129 130 131 132
    return -1;
  }

  tlen += sizeof(SMsgHead);
133 134 135
  void   *buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
136
    taosMemoryFree(req.qmsg);
137 138
    return -1;
  }
139

140 141 142 143 144
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(tlen);
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);

145 146 147 148 149
  SEncoder encoder = {0};
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
  if (tEncodeSMqRebVgReq(&encoder, &req) < 0) {
    taosMemoryFreeClear(buf);
    tEncoderClear(&encoder);
150
    taosMemoryFree(req.qmsg);
151 152 153
    return -1;
  }
  tEncoderClear(&encoder);
154 155 156
  *pBuf = buf;
  *pLen = tlen;

157
  taosMemoryFree(req.qmsg);
158 159 160
  return 0;
}

161 162
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
                                        const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
wmmhello's avatar
wmmhello 已提交
163 164 165 166
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
    terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
    return -1;
  }
167 168 169

  void   *buf;
  int32_t tlen;
170
  if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan) < 0) {
171 172 173 174 175
    return -1;
  }

  int32_t vgId = pRebVg->pVgEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
L
Liu Jicong 已提交
176 177
  if (pVgObj == NULL) {
    taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
178
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
L
Liu Jicong 已提交
179 180
    return -1;
  }
181 182 183 184 185

  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
186
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
187 188 189 190 191 192 193 194

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(buf);
    return -1;
  }
  return 0;
}
L
Liu Jicong 已提交
195

L
Liu Jicong 已提交
196
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
S
Shengliang Guan 已提交
197
  int32_t i = 0;
L
Liu Jicong 已提交
198
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
199 200
    i++;
  }
L
Liu Jicong 已提交
201 202
  memcpy(cgroup, key, i);
  cgroup[i] = 0;
L
Liu Jicong 已提交
203 204 205 206 207 208 209 210
  if (fullName) {
    strcpy(topic, &key[i + 1]);
  } else {
    while (key[i] != '.') {
      i++;
    }
    strcpy(topic, &key[i + 1]);
  }
L
Liu Jicong 已提交
211 212 213
  return 0;
}

L
Liu Jicong 已提交
214 215
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
  SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
L
Liu Jicong 已提交
216 217 218
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
L
Liu Jicong 已提交
219
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
220 221
      return NULL;
    }
L
Liu Jicong 已提交
222
    taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
L
Liu Jicong 已提交
223 224 225 226
  }
  return pRebSub;
}

227
static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
228 229
  int32_t     numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  const char *pSubKey = pOutput->pSub->key;
230 231

  int32_t actualRemoved = 0;
232
  for (int32_t i = 0; i < numOfRemoved; i++) {
233
    uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
234

235
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
236

237
    // consumer exists till now
238
    if (pConsumerEp) {
239
      actualRemoved++;
240

241
      int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
242
      for (int32_t j = 0; j < consumerVgNum; j++) {
243
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
244

245
        SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
246
        taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
247
        mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
248
      }
249

L
Liu Jicong 已提交
250
      taosArrayDestroy(pConsumerEp->vgs);
251
      taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
252

253 254 255 256
      // put into removed
      taosArrayPush(pOutput->removedConsumers, &consumerId);
    }
  }
257

258 259 260 261
  if (numOfRemoved != actualRemoved) {
    mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved);
  } else {
    mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved);
262
  }
263
}
264

265
static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
X
Xiaoyu Wang 已提交
266
  int32_t     numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
267 268 269 270 271
  const char *pSubKey = pOutput->pSub->key;

  for (int32_t i = 0; i < numOfNewConsumers; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);

Y
yihaoDeng 已提交
272
    SMqConsumerEp newConsumerEp = {0};
273 274 275 276 277 278
    newConsumerEp.consumerId = consumerId;
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));

    taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
    taosArrayPush(pOutput->newConsumers, &consumerId);
    mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId);
279
  }
280
}
281

X
Xiaoyu Wang 已提交
282
static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
283
  const char *pSubKey = pOutput->pSub->key;
X
Xiaoyu Wang 已提交
284
  int32_t     numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
285 286 287 288 289 290 291 292 293 294

  for (int32_t i = 0; i < numOfVgroups; i++) {
    SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
    SMqRebOutputVg rebOutput = {
        .oldConsumerId = -1,
        .newConsumerId = -1,
        .pVgEp = pVgEp,
    };

    taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
295
    mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId);
L
Liu Jicong 已提交
296
  }
297
}
298

wmmhello's avatar
wmmhello 已提交
299 300 301 302 303 304 305 306 307 308 309
//static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
//  for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
//    SMqVgEp       *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
//    SMqRebOutputVg outputVg = {
//        .oldConsumerId = pConsumerEp->consumerId,
//        .newConsumerId = pConsumerEp->consumerId,
//        .pVgEp = pVgEp,
//    };
//    taosArrayPush(pOutput->rebVgs, &outputVg);
//  }
//}
310

X
Xiaoyu Wang 已提交
311 312
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
                                        int32_t imbConsumerNum) {
313
  const char *pSubKey = pOutput->pSub->key;
314 315 316

  int32_t imbCnt = 0;
  void   *pIter = NULL;
317

318 319
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
320 321 322 323
    if (pIter == NULL) {
      break;
    }

324
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
325
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
326

327 328 329
    // all old consumers still existing need to be modified
    // TODO optimize: modify only consumer whose vgs changed
    taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
330 331
    if (consumerVgNum > minVgCnt) {
      if (imbCnt < imbConsumerNum) {
332 333 334 335 336 337 338 339 340 341 342
        // pop until equal minVg + 1
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
          SMqRebOutputVg outputVg = {
              .oldConsumerId = pConsumerEp->consumerId,
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
                pConsumerEp->consumerId);
343
        }
344
        imbCnt++;
345
      } else {
346
        // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
347 348
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
349
          SMqRebOutputVg outputVg = {
350
              .oldConsumerId = pConsumerEp->consumerId,
351 352 353 354
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
355
          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
L
Liu Jicong 已提交
356
                pConsumerEp->consumerId);
357 358 359
        }
      }
    }
wmmhello's avatar
wmmhello 已提交
360
//    putNoTransferToOutput(pOutput, pConsumerEp);
361
  }
362
}
363

364 365 366 367 368 369 370 371 372
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
  int32_t     totalVgNum = pOutput->pSub->vgNum;
  const char *pSubKey = pOutput->pSub->key;

  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
  mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
        pInput->oldConsumerNum, numOfAdded, numOfRemoved);

373
  // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned
374 375
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);

376
  // 2. check and get actual removed consumers, put their vg into pHash
377
  doRemoveLostConsumers(pOutput, pHash, pInput);
378

379
  // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
380 381 382 383
  addUnassignedVgroups(pOutput, pHash);

  // 4. calc vg number of each consumer
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
H
Haojun Liao 已提交
384

385 386
  int32_t minVgCnt = 0;
  int32_t imbConsumerNum = 0;
H
Haojun Liao 已提交
387

388 389 390 391
  // calc num
  if (numOfFinal) {
    minVgCnt = totalVgNum / numOfFinal;
    imbConsumerNum = totalVgNum % numOfFinal;
392 393 394 395
    mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value",
          pSubKey, numOfFinal, minVgCnt, imbConsumerNum);
  } else {
    mInfo("sub:%s no consumer subscribe this topic", pSubKey);
396 397
  }

398
  // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash
399 400 401 402 403
  transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);

  // 6. add new consumer into sub
  doAddNewConsumers(pOutput, pInput);

404 405
  SMqRebOutputVg *pRebVg = NULL;
  void           *pRemovedIter = NULL;
406 407
  void           *pIter = NULL;

408
  // 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups
409 410
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
411 412 413 414
    if (pIter == NULL) {
      break;
    }

415
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
L
Liu Jicong 已提交
416 417

    // push until equal minVg
418
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
L
Liu Jicong 已提交
419 420
      // iter hash and find one vg
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
S
Shengliang Guan 已提交
421
      if (pRemovedIter == NULL) {
422
        mError("sub:%s removed iter is null, never can reach hear", pSubKey);
423
        break;
S
Shengliang Guan 已提交
424
      }
425

L
Liu Jicong 已提交
426
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
427
      pRebVg->newConsumerId = pConsumerEp->consumerId;
428 429
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
430
    }
431 432
  }

433 434 435 436 437 438 439 440 441 442
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) {
      break;
    }
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;

    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
      if (pRemovedIter == NULL) {
443
        mInfo("sub:%s removed iter is null", pSubKey);
L
Liu Jicong 已提交
444 445
        break;
      }
446

447 448 449 450
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
      pRebVg->newConsumerId = pConsumerEp->consumerId;
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
451
    }
452 453 454 455
  }

  // All assigned vg should be put into pOutput->rebVgs
  if(pRemovedIter != NULL){
456
    mError("sub:%s error pRemovedIter should be NULL", pSubKey);
457 458 459 460 461 462
  }
  while (1) {
    pRemovedIter = taosHashIterate(pHash, pRemovedIter);
    if (pRemovedIter == NULL) {
      break;
    }
463

464 465 466
    SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
    taosArrayPush(pOutput->rebVgs, pRebOutput);
    if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){    // if all consumer is removed, put all vg into unassigned
467
      taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
468 469 470
    }
  }

L
Liu Jicong 已提交
471
  // 8. generate logs
472
  mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey);
L
Liu Jicong 已提交
473 474
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
475
    mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
476
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
L
Liu Jicong 已提交
477
  }
478 479 480 481 482 483 484 485 486 487 488 489

  pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) break;
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
    mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
      mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
            pConsumerEp->consumerId);
L
Liu Jicong 已提交
490 491
    }
  }
492 493 494 495 496 497 498

  // 9. clear
  taosHashCleanup(pHash);

  return 0;
}

S
Shengliang Guan 已提交
499
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
500 501 502 503 504 505 506 507 508
  struct SSubplan* pPlan = NULL;
  if(strcmp(pOutput->pSub->qmsg, "") != 0){
    int32_t code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return -1;
    }
  }

509
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
510
  if (pTrans == NULL) {
511
    nodesDestroyNode((SNode*)pPlan);
512 513
    return -1;
  }
514

515
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
516 517
  if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
    mndTransDrop(pTrans);
518
    nodesDestroyNode((SNode*)pPlan);
519 520
    return -1;
  }
521

522 523 524 525 526 527
  // make txn:
  // 1. redo action: action to all vg
  const SArray *rebVgs = pOutput->rebVgs;
  int32_t       vgNum = taosArrayGetSize(rebVgs);
  for (int32_t i = 0; i < vgNum; i++) {
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
528
    if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan) < 0) {
H
Haojun Liao 已提交
529
      mndTransDrop(pTrans);
530
      nodesDestroyNode((SNode*)pPlan);
H
Haojun Liao 已提交
531
      return -1;
532 533
    }
  }
534
  nodesDestroyNode((SNode*)pPlan);
535 536 537

  // 2. redo log: subscribe and vg assignment
  // subscribe
L
Liu Jicong 已提交
538
  if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
H
Haojun Liao 已提交
539 540
    mndTransDrop(pTrans);
    return -1;
541 542
  }

wmmhello's avatar
wmmhello 已提交
543 544 545 546
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
  char cgroup[TSDB_CGROUP_LEN] = {0};
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);

547 548
  // 3. commit log: consumer to update status and epoch
  // 3.1 set touched consumer
549
  int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
550
  for (int32_t i = 0; i < consumerNum; i++) {
551
    int64_t         consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
wmmhello's avatar
wmmhello 已提交
552 553
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_NOTOPIC;
554
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
wmmhello's avatar
wmmhello 已提交
555
      tDeleteSMqConsumerObj(pConsumerNew, true);
H
Haojun Liao 已提交
556 557 558

      mndTransDrop(pTrans);
      return -1;
559
    }
H
Haojun Liao 已提交
560

wmmhello's avatar
wmmhello 已提交
561
    tDeleteSMqConsumerObj(pConsumerNew, true);
562
  }
H
Haojun Liao 已提交
563

564 565 566
  // 3.2 set new consumer
  consumerNum = taosArrayGetSize(pOutput->newConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
567
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
wmmhello's avatar
wmmhello 已提交
568 569
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_TOPIC;
570

wmmhello's avatar
wmmhello 已提交
571 572
    char* topicTmp = taosStrdup(topic);
    taosArrayPush(pConsumerNew->rebNewTopics, &topicTmp);
573
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
wmmhello's avatar
wmmhello 已提交
574
      tDeleteSMqConsumerObj(pConsumerNew, true);
H
Haojun Liao 已提交
575 576 577

      mndTransDrop(pTrans);
      return -1;
578
    }
H
Haojun Liao 已提交
579

wmmhello's avatar
wmmhello 已提交
580
    tDeleteSMqConsumerObj(pConsumerNew, true);
581 582 583 584 585 586
  }

  // 3.3 set removed consumer
  consumerNum = taosArrayGetSize(pOutput->removedConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
587

wmmhello's avatar
wmmhello 已提交
588 589 590 591 592
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_REMOVE;

    char* topicTmp = taosStrdup(topic);
    taosArrayPush(pConsumerNew->rebRemovedTopics, &topicTmp);
593
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
wmmhello's avatar
wmmhello 已提交
594
      tDeleteSMqConsumerObj(pConsumerNew, true);
H
Haojun Liao 已提交
595 596 597

      mndTransDrop(pTrans);
      return -1;
598
    }
H
Haojun Liao 已提交
599

wmmhello's avatar
wmmhello 已提交
600
    tDeleteSMqConsumerObj(pConsumerNew, true);
601
  }
L
Liu Jicong 已提交
602

L
Liu Jicong 已提交
603 604
  // 4. TODO commit log: modification log

L
Liu Jicong 已提交
605
  // 5. set cb
606
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
L
Liu Jicong 已提交
607 608

  // 6. execution
L
Liu Jicong 已提交
609
  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
610
    mError("failed to prepare trans rebalance since %s", terrstr());
H
Haojun Liao 已提交
611 612
    mndTransDrop(pTrans);
    return -1;
L
Liu Jicong 已提交
613
  }
614 615 616 617 618

  mndTransDrop(pTrans);
  return 0;
}

S
Shengliang Guan 已提交
619 620 621
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
  SMnode            *pMnode = pMsg->info.node;
  SMqDoRebalanceMsg *pReq = pMsg->pCont;
622
  void              *pIter = NULL;
623
//  bool               rebalanceOnce = false;  // to ensure only once.
624

625
  mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
626

627
  // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
628 629
  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
630 631 632 633
    if (pIter == NULL) {
      break;
    }

X
Xiaoyu Wang 已提交
634
    SMqRebInputObj  rebInput = {0};
L
Liu Jicong 已提交
635
    SMqRebOutputObj rebOutput = {0};
L
Liu Jicong 已提交
636 637
    rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
    rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
638
    rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
L
Liu Jicong 已提交
639 640
    rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));

641 642 643 644 645 646 647 648 649 650 651 652 653 654
    if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL ||
        rebOutput.rebVgs == NULL) {
      taosArrayDestroy(rebOutput.newConsumers);
      taosArrayDestroy(rebOutput.removedConsumers);
      taosArrayDestroy(rebOutput.modifyConsumers);
      taosArrayDestroy(rebOutput.rebVgs);

      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mInfo("mq re-balance failed, due to out of memory");
      taosHashCleanup(pReq->rebSubHash);
      mndRebEnd();
      return -1;
    }

L
Liu Jicong 已提交
655 656 657 658
    SMqRebInfo      *pRebInfo = (SMqRebInfo *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);

    rebInput.pRebInfo = pRebInfo;
659 660 661 662 663

    if (pSub == NULL) {
      // split sub key and extract topic
      char topic[TSDB_TOPIC_FNAME_LEN];
      char cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
664
      mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
665

666
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
L
Liu Jicong 已提交
667
      if (pTopic == NULL) {
668
        mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic);
L
Liu Jicong 已提交
669 670
        continue;
      }
671

672 673
      taosRLockLatch(&pTopic->lock);

674
      rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key);
L
Liu Jicong 已提交
675 676

      if (rebOutput.pSub == NULL) {
677
        mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr());
L
Liu Jicong 已提交
678 679 680 681
        taosRUnLockLatch(&pTopic->lock);
        mndReleaseTopic(pMnode, pTopic);
        continue;
      }
L
Liu Jicong 已提交
682

683
      memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
684 685 686 687
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      rebInput.oldConsumerNum = 0;
688
      mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key);
L
Liu Jicong 已提交
689 690 691 692 693
    } else {
      taosRLockLatch(&pSub->lock);
      rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
      rebOutput.pSub = tCloneSubscribeObj(pSub);
      taosRUnLockLatch(&pSub->lock);
694

695
      mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
L
Liu Jicong 已提交
696 697
      mndReleaseSubscribe(pMnode, pSub);
    }
698

699
    if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
700
      mError("mq re-balance internal error");
701
    }
702

L
Liu Jicong 已提交
703 704
    // if add more consumer to balanced subscribe,
    // possibly no vg is changed
705
    // when each topic is re-balanced, issue an trans to save the results in sdb.
L
Liu Jicong 已提交
706
    if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
707
      mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr());
L
Liu Jicong 已提交
708
    }
709

L
Liu Jicong 已提交
710
    taosArrayDestroy(rebOutput.newConsumers);
711
    taosArrayDestroy(rebOutput.modifyConsumers);
L
Liu Jicong 已提交
712 713 714 715
    taosArrayDestroy(rebOutput.removedConsumers);
    taosArrayDestroy(rebOutput.rebVgs);
    tDeleteSubscribeObj(rebOutput.pSub);
    taosMemoryFree(rebOutput.pSub);
716 717 718
  }

  // reset flag
719
  mInfo("mq re-balance completed successfully");
720
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
721
  mndRebEnd();
722 723 724

  return 0;
}
L
Liu Jicong 已提交
725

726 727
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
L
Liu Jicong 已提交
728 729
  SMDropCgroupReq dropReq = {0};

730
  if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
L
Liu Jicong 已提交
731 732 733 734 735 736 737
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
  if (pSub == NULL) {
    if (dropReq.igNotExists) {
738
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
739 740 741 742 743 744 745 746
      return 0;
    } else {
      terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr());
      return -1;
    }
  }

L
Liu Jicong 已提交
747
  if (taosHashGetSize(pSub->consumerHash) != 0) {
L
Liu Jicong 已提交
748 749
    terrno = TSDB_CODE_MND_CGROUP_USED;
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
750
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
751 752 753
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
754 755 756 757 758 759 760 761 762
  void           *pIter = NULL;
  SMqConsumerObj *pConsumer;
  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) {
      break;
    }

    if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) {
763
      mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
wmmhello's avatar
wmmhello 已提交
764 765 766 767
    }
    sdbRelease(pMnode->pSdb, pConsumer);
  }

768
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
L
Liu Jicong 已提交
769 770
  if (pTrans == NULL) {
    mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
771
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
772
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
773 774 775
    return -1;
  }

776
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
777 778 779

  if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
    mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
780
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
781
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
782 783 784
    return -1;
  }

L
Liu Jicong 已提交
785 786 787 788 789
  if (mndTransPrepare(pMnode, pTrans) < 0) {
    mndReleaseSubscribe(pMnode, pSub);
    mndTransDrop(pTrans);
    return -1;
  }
L
Liu Jicong 已提交
790 791 792 793 794
  mndReleaseSubscribe(pMnode, pSub);

  return TSDB_CODE_ACTION_IN_PROGRESS;
}

L
Liu Jicong 已提交
795 796 797
void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
798
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
799
  void   *buf = NULL;
L
Liu Jicong 已提交
800
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
S
Shengliang Guan 已提交
801
  if (tlen <= 0) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
802
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
803 804 805 806

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

wafwerar's avatar
wafwerar 已提交
807
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
808
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
809

L
Liu Jicong 已提交
810 811
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
812 813 814 815 816 817 818

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
819 820
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
821
SUB_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
822
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
823
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
824 825 826 827 828 829 830 831 832 833 834
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
835 836 837
  SSdbRow         *pRow = NULL;
  SMqSubscribeObj *pSub = NULL;
  void            *buf = NULL;
L
Liu Jicong 已提交
838 839 840 841

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

842
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
L
Liu Jicong 已提交
843 844 845 846
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

847
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
L
Liu Jicong 已提交
848 849
  if (pRow == NULL) goto SUB_DECODE_OVER;

850
  pSub = sdbGetRowObj(pRow);
L
Liu Jicong 已提交
851 852 853 854
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
855
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
856
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
857 858 859 860
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

861
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
L
Liu Jicong 已提交
862 863 864
    goto SUB_DECODE_OVER;
  }

865 866 867 868
  // update epset saved in mnode
  if (pSub->unassignedVgs != NULL) {
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < size; ++i) {
wmmhello's avatar
wmmhello 已提交
869
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
870 871 872 873 874 875 876 877 878
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
    }
  }
  if (pSub->consumerHash != NULL) {
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
    while (pIter) {
      SMqConsumerEp *pConsumerEp = pIter;
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
      for (int32_t i = 0; i < size; ++i) {
wmmhello's avatar
wmmhello 已提交
879
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
880 881 882 883 884 885
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
      }
      pIter = taosHashIterate(pSub->consumerHash, pIter);
    }
  }

L
Liu Jicong 已提交
886 887
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
888
SUB_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
889
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
890
  if (terrno != TSDB_CODE_SUCCESS) {
891
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
892
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
893 894 895
    return NULL;
  }

S
Shengliang Guan 已提交
896
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
L
Liu Jicong 已提交
897 898 899 900 901 902 903 904 905 906
  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
907
  tDeleteSubscribeObj(pSub);
L
Liu Jicong 已提交
908 909 910 911 912
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
913 914 915 916 917 918
  taosWLockLatch(&pOldSub->lock);

  SHashObj *tmp = pOldSub->consumerHash;
  pOldSub->consumerHash = pNewSub->consumerHash;
  pNewSub->consumerHash = tmp;

919 920 921 922
  SArray *tmp1 = pOldSub->unassignedVgs;
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
  pNewSub->unassignedVgs = tmp1;

923
  taosWUnLockLatch(&pOldSub->lock);
L
Liu Jicong 已提交
924 925 926
  return 0;
}

927
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
928
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
929
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
930
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
931
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
932
  return 0;
L
Liu Jicong 已提交
933 934
}

L
Liu Jicong 已提交
935
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
936 937 938
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
939 940
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
941
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
942 943 944 945
  }
  return pSub;
}

L
Liu Jicong 已提交
946 947 948 949
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
950
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
951 952 953 954
  }
  return pSub;
}

wmmhello's avatar
wmmhello 已提交
955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
  int32_t num = 0;
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;


    char topic[TSDB_TOPIC_FNAME_LEN];
    char cgroup[TSDB_CGROUP_LEN];
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
    if (strcmp(topic, topicName) != 0) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    num++;
    sdbRelease(pSdb, pSub);
  }

  return num;
}

L
Liu Jicong 已提交
981 982 983 984 985
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

L
Liu Jicong 已提交
986 987 988 989 990 991 992 993
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

L
Liu Jicong 已提交
994
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
995 996 997 998 999 1000 1001 1002
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1003
  int32_t code = 0;
L
Liu Jicong 已提交
1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    if (pSub->dbUid != pDb->uid) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
L
Liu Jicong 已提交
1018
      sdbRelease(pSdb, pSub);
1019 1020 1021
      sdbCancelFetch(pSdb, pIter);
      code = -1;
      break;
L
Liu Jicong 已提交
1022
    }
1023 1024

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
  }

  return code;
}

int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    char topic[TSDB_TOPIC_FNAME_LEN];
    char cgroup[TSDB_CGROUP_LEN];
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
    if (strcmp(topic, topicName) != 0) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    // iter all vnode to delete handle
1049 1050
    if (taosHashGetSize(pSub->consumerHash) != 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1051
      terrno = TSDB_CODE_MND_IN_REBALANCE;
1052 1053
      return -1;
    }
L
Liu Jicong 已提交
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp       *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
      SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
      pReq->head.vgId = htonl(pVgEp->vgId);
      pReq->vgId = pVgEp->vgId;
      pReq->consumerId = -1;
      memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
      STransAction action = {0};
      action.epSet = pVgEp->epSet;
      action.pCont = pReq;
      action.contLen = sizeof(SMqVDeleteReq);
L
Liu Jicong 已提交
1066
      action.msgType = TDMT_VND_TMQ_DELETE_SUB;
L
Liu Jicong 已提交
1067 1068
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        taosMemoryFree(pReq);
S
Shengliang Guan 已提交
1069
        sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1070 1071 1072
        return -1;
      }
    }
L
Liu Jicong 已提交
1073 1074 1075

    if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1076 1077
      goto END;
    }
S
Shengliang Guan 已提交
1078 1079

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1080 1081 1082 1083 1084 1085
  }

  code = 0;
END:
  return code;
}
L
Liu Jicong 已提交
1086

1087
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
S
Shengliang Guan 已提交
1088
  SMnode          *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1089 1090 1091 1092
  SSdb            *pSdb = pMnode->pSdb;
  int32_t          numOfRows = 0;
  SMqSubscribeObj *pSub = NULL;

L
Liu Jicong 已提交
1093 1094
  mDebug("mnd show subscriptions begin");

L
Liu Jicong 已提交
1095 1096
  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
1097 1098 1099
    if (pShow->pIter == NULL) {
      break;
    }
L
Liu Jicong 已提交
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123

    taosRLockLatch(&pSub->lock);

    if (numOfRows + pSub->vgNum > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
    }

    SMqConsumerEp *pConsumerEp = NULL;
    void          *pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pSub->consumerHash, pIter);
      if (pIter == NULL) break;
      pConsumerEp = (SMqConsumerEp *)pIter;

      int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
      for (int32_t j = 0; j < sz; j++) {
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);

        SColumnInfoData *pColInfo;
        int32_t          cols = 0;

        // topic and cgroup
        char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
1124
        mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
L
Liu Jicong 已提交
1125 1126 1127 1128
        varDataSetLen(topic, strlen(varDataVal(topic)));
        varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1129
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1130 1131

        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1132
        colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1133 1134 1135

        // vg id
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1136
        colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
L
Liu Jicong 已提交
1137 1138

        // consumer id
wmmhello's avatar
wmmhello 已提交
1139 1140 1141 1142
        char        consumerIdHex[32] = {0};
        sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumerEp->consumerId);
        varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));

L
Liu Jicong 已提交
1143
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1144
        colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false);
L
Liu Jicong 已提交
1145

D
dapan1121 已提交
1146
        mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
L
Liu Jicong 已提交
1147 1148
               pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);

L
Liu Jicong 已提交
1149 1150 1151 1152
        // offset
#if 0
      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1153
      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
L
Liu Jicong 已提交
1154 1155 1156

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1157
      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1158 1159 1160 1161 1162 1163
#endif

        numOfRows++;
      }
    }

L
Liu Jicong 已提交
1164
    // do not show for cleared subscription
L
Liu Jicong 已提交
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);

      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // topic and cgroup
      char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
1175
      mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
L
Liu Jicong 已提交
1176 1177 1178 1179
      varDataSetLen(topic, strlen(varDataVal(topic)));
      varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1180
      colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1181 1182

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1183
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1184 1185 1186

      // vg id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1187
      colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
L
Liu Jicong 已提交
1188 1189 1190

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1191
      colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1192

1193
      mDebug("mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup),
L
Liu Jicong 已提交
1194 1195
             pVgEp->vgId);

L
Liu Jicong 已提交
1196 1197 1198 1199
      // offset
#if 0
      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1200
      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
L
Liu Jicong 已提交
1201 1202 1203

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1204
      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1205 1206 1207 1208 1209
#endif

      numOfRows++;
    }

1210 1211
    pBlock->info.rows = numOfRows;

L
Liu Jicong 已提交
1212 1213 1214 1215
    taosRUnLockLatch(&pSub->lock);
    sdbRelease(pSdb, pSub);
  }

L
Liu Jicong 已提交
1216 1217
  mDebug("mnd end show subscriptions");

L
Liu Jicong 已提交
1218 1219 1220 1221
  pShow->numOfRows += numOfRows;
  return numOfRows;
}

1222
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
L
Liu Jicong 已提交
1223 1224 1225
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}