mndSubscribe.c 36.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndScheduler.h"
L
Liu Jicong 已提交
23 24 25 26 27 28 29 30 31
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

L
Liu Jicong 已提交
32
#define MND_SUBSCRIBE_VER_NUMBER   1
L
Liu Jicong 已提交
33 34
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
35
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
36

L
Liu Jicong 已提交
37 38 39 40 41
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
42 43 44 45
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
46

47 48 49 50 51 52 53
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
54

55 56 57 58 59 60 61
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63
int32_t mndInitSubscribe(SMnode *pMnode) {
64 65 66 67 68 69 70 71 72 73
  SSdbTable table = {
      .sdbType = SDB_SUBSCRIBE,
      .keyType = SDB_KEY_BINARY,
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
      .insertFp = (SdbInsertFp)mndSubActionInsert,
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
  };

L
Liu Jicong 已提交
74 75 76 77 78
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
79 80 81 82

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);

L
Liu Jicong 已提交
83 84 85
  return sdbSetTable(pMnode->pSdb, table);
}

86
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
87 88 89 90 91
  SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
92

L
Liu Jicong 已提交
93
  pSub->dbUid = pTopic->dbUid;
L
Liu Jicong 已提交
94
  pSub->stbUid = pTopic->stbUid;
L
Liu Jicong 已提交
95
  pSub->subType = pTopic->subType;
L
Liu Jicong 已提交
96
  pSub->withMeta = pTopic->withMeta;
L
Liu Jicong 已提交
97

98 99 100 101 102 103 104 105 106
  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    tDeleteSubscribeObj(pSub);
    taosMemoryFree(pSub);
    return NULL;
  }

  return pSub;
}

L
Liu Jicong 已提交
107 108
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub,
                                    const SMqRebOutputVg *pRebVg) {
109 110 111 112
  SMqRebVgReq req = {0};
  req.oldConsumerId = pRebVg->oldConsumerId;
  req.newConsumerId = pRebVg->newConsumerId;
  req.vgId = pRebVg->pVgEp->vgId;
L
Liu Jicong 已提交
113
  req.qmsg = pRebVg->pVgEp->qmsg;
L
Liu Jicong 已提交
114
  req.subType = pSub->subType;
L
Liu Jicong 已提交
115
  req.withMeta = pSub->withMeta;
L
Liu Jicong 已提交
116
  req.suid = pSub->stbUid;
S
Shengliang Guan 已提交
117
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

  int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
  void   *buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(tlen);
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);

  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqRebVgReq(&abuf, &req);
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
138
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
139
                                        const SMqRebOutputVg *pRebVg) {
140 141 142 143
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
    terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
    return -1;
  }
144 145 146

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
147
  if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) {
148 149 150 151 152
    return -1;
  }

  int32_t vgId = pRebVg->pVgEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
L
Liu Jicong 已提交
153 154
  if (pVgObj == NULL) {
    taosMemoryFree(buf);
155
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
156 157
    return -1;
  }
158 159 160 161 162

  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
163
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
164 165 166 167 168 169 170 171

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(buf);
    return -1;
  }
  return 0;
}
L
Liu Jicong 已提交
172

L
Liu Jicong 已提交
173
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
S
Shengliang Guan 已提交
174
  int32_t i = 0;
L
Liu Jicong 已提交
175
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
176 177
    i++;
  }
L
Liu Jicong 已提交
178 179
  memcpy(cgroup, key, i);
  cgroup[i] = 0;
L
Liu Jicong 已提交
180 181 182 183 184 185 186 187
  if (fullName) {
    strcpy(topic, &key[i + 1]);
  } else {
    while (key[i] != '.') {
      i++;
    }
    strcpy(topic, &key[i + 1]);
  }
L
Liu Jicong 已提交
188 189 190
  return 0;
}

L
Liu Jicong 已提交
191 192
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
  SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
L
Liu Jicong 已提交
193 194 195
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
L
Liu Jicong 已提交
196
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
197 198
      return NULL;
    }
L
Liu Jicong 已提交
199
    taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
L
Liu Jicong 已提交
200 201 202 203
  }
  return pRebSub;
}

204
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
205 206
  int32_t     totalVgNum = pOutput->pSub->vgNum;
  const char *sub = pOutput->pSub->key;
207
  mInfo("sub:%s mq re-balance %d vgroups", sub, pOutput->pSub->vgNum);
L
Liu Jicong 已提交
208

209 210 211 212 213 214 215
  // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);

  // 2. check and get actual removed consumers, put their vg into hash
  int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  int32_t actualRemoved = 0;
  for (int32_t i = 0; i < removedNum; i++) {
216
    uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
217

218
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
219

220
    if (pConsumerEp) {
221
      actualRemoved++;
222
      int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
223
      for (int32_t j = 0; j < consumerVgNum; j++) {
224
        SMqVgEp       *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
225 226 227 228 229 230
        SMqRebOutputVg outputVg = {
            .oldConsumerId = consumerId,
            .newConsumerId = -1,
            .pVgEp = pVgEp,
        };
        taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
231
        mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId);
232
      }
L
Liu Jicong 已提交
233
      taosArrayDestroy(pConsumerEp->vgs);
234 235 236 237 238
      taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
      // put into removed
      taosArrayPush(pOutput->removedConsumers, &consumerId);
    }
  }
239 240

  if (removedNum != actualRemoved) {
241
    mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved);
242
  }
243 244 245

  // if previously no consumer, there are vgs not assigned
  {
246
    int32_t consumerVgNum = taosArrayGetSize(pOutput->pSub->unassignedVgs);
247
    for (int32_t i = 0; i < consumerVgNum; i++) {
248
      SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
249 250 251 252 253 254
      SMqRebOutputVg rebOutput = {
          .oldConsumerId = -1,
          .newConsumerId = -1,
          .pVgEp = pVgEp,
      };
      taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
255
      mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", sub, pVgEp->vgId);
256 257 258 259
    }
  }

  // 3. calc vg number of each consumer
L
Liu Jicong 已提交
260 261
  int32_t afterRebConsumerNum = pInput->oldConsumerNum + taosArrayGetSize(pInput->pRebInfo->newConsumers) -
                                taosArrayGetSize(pInput->pRebInfo->removedConsumers);
L
Liu Jicong 已提交
262 263
  int32_t minVgCnt = 0;
  int32_t imbConsumerNum = 0;
264
  // calc num
L
Liu Jicong 已提交
265 266 267 268
  if (afterRebConsumerNum) {
    minVgCnt = totalVgNum / afterRebConsumerNum;
    imbConsumerNum = totalVgNum % afterRebConsumerNum;
  }
269 270

  mInfo("sub:%s mq re-balance %d consumers: at least %d vg each, %d consumer has more vg", sub,
271
        afterRebConsumerNum, minVgCnt, imbConsumerNum);
272 273 274 275 276 277

  // 4. first scan: remove consumer more than wanted, put to remove hash
  int32_t imbCnt = 0;
  void   *pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
278 279 280 281
    if (pIter == NULL) {
      break;
    }

282
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
283

284
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
285 286
    // all old consumers still existing are touched
    // TODO optimize: touch only consumer whose vgs changed
287
    taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId);
288 289 290
    if (consumerVgNum > minVgCnt) {
      if (imbCnt < imbConsumerNum) {
        if (consumerVgNum == minVgCnt + 1) {
L
Liu Jicong 已提交
291
          imbCnt++;
292 293 294
          continue;
        } else {
          // pop until equal minVg + 1
295 296
          while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
            SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
297
            SMqRebOutputVg outputVg = {
298
                .oldConsumerId = pConsumerEp->consumerId,
299 300 301 302
                .newConsumerId = -1,
                .pVgEp = pVgEp,
            };
            taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
303
            mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
L
Liu Jicong 已提交
304
                  pConsumerEp->consumerId);
305 306 307 308 309
          }
          imbCnt++;
        }
      } else {
        // pop until equal minVg
310 311
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
312
          SMqRebOutputVg outputVg = {
313
              .oldConsumerId = pConsumerEp->consumerId,
314 315 316 317
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
318
          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
L
Liu Jicong 已提交
319
                pConsumerEp->consumerId);
320 321 322 323 324 325 326 327 328 329
        }
      }
    }
  }

  // 5. add new consumer into sub
  {
    int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers);
    for (int32_t i = 0; i < consumerNum; i++) {
      int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
330

331
      SMqConsumerEp newConsumerEp;
332 333
      newConsumerEp.consumerId = consumerId;
      newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
334
      taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
335
      taosArrayPush(pOutput->newConsumers, &consumerId);
336
      mInfo("sub:%s mq rebalance add new consumer:%" PRId64, sub, consumerId);
337 338 339 340 341 342 343 344 345 346 347
    }
  }

  // 6. second scan: find consumer do not have enough vg, extract from temporary hash and assign to new consumer.
  // All related vg should be put into rebVgs
  SMqRebOutputVg *pRebVg = NULL;
  void           *pRemovedIter = NULL;
  pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) break;
348
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
L
Liu Jicong 已提交
349 350

    // push until equal minVg
351
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
L
Liu Jicong 已提交
352 353
      // iter hash and find one vg
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
S
Shengliang Guan 已提交
354
      if (pRemovedIter == NULL) {
355
        mError("sub:%s removed iter is null", sub);
S
Shengliang Guan 已提交
356 357
        continue;
      }
358

L
Liu Jicong 已提交
359 360
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
      // push
361 362
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      pRebVg->newConsumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
363
      taosArrayPush(pOutput->rebVgs, pRebVg);
L
Liu Jicong 已提交
364
      mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
L
Liu Jicong 已提交
365
            pConsumerEp->consumerId);
L
Liu Jicong 已提交
366
    }
367 368 369
  }

  // 7. handle unassigned vg
370
  if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
L
Liu Jicong 已提交
371 372
    // if has consumer, assign all left vg
    while (1) {
L
Liu Jicong 已提交
373
      SMqConsumerEp *pConsumerEp = NULL;
L
Liu Jicong 已提交
374
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
L
Liu Jicong 已提交
375 376 377 378 379 380 381 382 383 384
      if (pRemovedIter == NULL) {
        if (pIter != NULL) {
          taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
          pIter = NULL;
        }
        break;
      }
      while (1) {
        pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
        pConsumerEp = (SMqConsumerEp *)pIter;
385

386 387
        if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
          break;
L
Liu Jicong 已提交
388 389
        }
      }
L
Liu Jicong 已提交
390
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
391 392
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      pRebVg->newConsumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
393
      if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
L
Liu Jicong 已提交
394
        mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 " (second scan)", pRebVg->pVgEp->vgId,
L
Liu Jicong 已提交
395 396 397
              pConsumerEp->consumerId);
        continue;
      }
L
Liu Jicong 已提交
398
      taosArrayPush(pOutput->rebVgs, pRebVg);
L
Liu Jicong 已提交
399
      mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
L
Liu Jicong 已提交
400
            pConsumerEp->consumerId);
L
Liu Jicong 已提交
401
    }
402 403 404 405 406 407
  } else {
    // if all consumer is removed, put all vg into unassigned
    pIter = NULL;
    SMqRebOutputVg *pRebOutput = NULL;
    while (1) {
      pIter = taosHashIterate(pHash, pIter);
408 409 410 411
      if (pIter == NULL) {
        break;
      }

412
      pRebOutput = (SMqRebOutputVg *)pIter;
413

414
      taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
L
Liu Jicong 已提交
415
      taosArrayPush(pOutput->rebVgs, pRebOutput);
416
      mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId);
417 418 419
    }
  }

L
Liu Jicong 已提交
420
  // 8. generate logs
421
  mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", sub);
L
Liu Jicong 已提交
422 423
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
424
    mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, sub,
425
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
L
Liu Jicong 已提交
426
  }
L
Liu Jicong 已提交
427
  {
428
    pIter = NULL;
L
Liu Jicong 已提交
429 430 431 432 433
    while (1) {
      pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
      if (pIter == NULL) break;
      SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
      int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
434
      mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz);
L
Liu Jicong 已提交
435 436
      for (int32_t i = 0; i < sz; i++) {
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
437
        mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRId64, sub, pVgEp->vgId,
438
              pConsumerEp->consumerId);
L
Liu Jicong 已提交
439 440 441
      }
    }
  }
442 443 444 445 446 447 448

  // 9. clear
  taosHashCleanup(pHash);

  return 0;
}

S
Shengliang Guan 已提交
449
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
450
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
451
  if (pTrans == NULL) return -1;
452

453
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
454 455 456 457
  if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
    mndTransDrop(pTrans);
    return -1;
  }
458

459 460 461 462 463 464
  // make txn:
  // 1. redo action: action to all vg
  const SArray *rebVgs = pOutput->rebVgs;
  int32_t       vgNum = taosArrayGetSize(rebVgs);
  for (int32_t i = 0; i < vgNum; i++) {
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
L
Liu Jicong 已提交
465
    if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
466 467 468 469 470 471
      goto REB_FAIL;
    }
  }

  // 2. redo log: subscribe and vg assignment
  // subscribe
L
Liu Jicong 已提交
472
  if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
473 474 475 476 477 478 479 480 481 482 483 484 485
    goto REB_FAIL;
  }

  // 3. commit log: consumer to update status and epoch
  // 3.1 set touched consumer
  int32_t consumerNum = taosArrayGetSize(pOutput->touchedConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
    int64_t         consumerId = *(int64_t *)taosArrayGet(pOutput->touchedConsumers, i);
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
486 487
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
488 489
      goto REB_FAIL;
    }
L
Liu Jicong 已提交
490 491
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
492 493 494 495
  }
  // 3.2 set new consumer
  consumerNum = taosArrayGetSize(pOutput->newConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
496
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
497

498 499 500 501 502
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
503
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
504 505 506
    taosArrayPush(pConsumerNew->rebNewTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
507 508
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
509 510
      goto REB_FAIL;
    }
L
Liu Jicong 已提交
511 512
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
513 514 515 516 517 518
  }

  // 3.3 set removed consumer
  consumerNum = taosArrayGetSize(pOutput->removedConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
519

520 521 522 523 524
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
525
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
526 527 528
    taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
529 530
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
531 532
      goto REB_FAIL;
    }
L
Liu Jicong 已提交
533 534
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
535
  }
L
Liu Jicong 已提交
536

L
Liu Jicong 已提交
537 538
  // 4. TODO commit log: modification log

L
Liu Jicong 已提交
539
  // 5. set cb
540
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
L
Liu Jicong 已提交
541 542

  // 6. execution
L
Liu Jicong 已提交
543
  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
544
    mError("failed to prepare trans rebalance since %s", terrstr());
L
Liu Jicong 已提交
545 546
    goto REB_FAIL;
  }
547 548 549 550 551 552 553 554 555

  mndTransDrop(pTrans);
  return 0;

REB_FAIL:
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
556 557 558
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
  SMnode            *pMnode = pMsg->info.node;
  SMqDoRebalanceMsg *pReq = pMsg->pCont;
559 560
  void              *pIter = NULL;

561
  mInfo("mq re-balance start");
562 563 564

  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
565 566 567 568
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
569 570 571
    SMqRebInputObj rebInput = {0};

    SMqRebOutputObj rebOutput = {0};
L
Liu Jicong 已提交
572 573 574
    rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
    rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
    rebOutput.touchedConsumers = taosArrayInit(0, sizeof(int64_t));
L
Liu Jicong 已提交
575 576
    rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));

L
Liu Jicong 已提交
577 578 579 580
    SMqRebInfo      *pRebInfo = (SMqRebInfo *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);

    rebInput.pRebInfo = pRebInfo;
581 582 583 584 585

    if (pSub == NULL) {
      // split sub key and extract topic
      char topic[TSDB_TOPIC_FNAME_LEN];
      char cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
586
      mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
587
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
L
Liu Jicong 已提交
588
      if (pTopic == NULL) {
589
        mError("mq re-balance %s ignored since topic %s not exist", pRebInfo->key, topic);
L
Liu Jicong 已提交
590 591
        continue;
      }
592

593 594
      taosRLockLatch(&pTopic->lock);

595
      rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key);
L
Liu Jicong 已提交
596 597 598 599 600 601 602

      if (rebOutput.pSub == NULL) {
        mError("mq rebalance %s failed create sub since %s, abort", pRebInfo->key, terrstr());
        taosRUnLockLatch(&pTopic->lock);
        mndReleaseTopic(pMnode, pTopic);
        continue;
      }
L
Liu Jicong 已提交
603
      memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
604 605 606 607 608 609 610 611 612 613 614 615

      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      rebInput.oldConsumerNum = 0;
    } else {
      taosRLockLatch(&pSub->lock);
      rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
      rebOutput.pSub = tCloneSubscribeObj(pSub);
      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
616

617
    if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
618
      mError("mq re-balance internal error");
619
    }
620

L
Liu Jicong 已提交
621 622
    // if add more consumer to balanced subscribe,
    // possibly no vg is changed
623

L
Liu Jicong 已提交
624
    if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
625
      mError("mq re-balance persist re-balance output error, possibly vnode splitted or dropped");
L
Liu Jicong 已提交
626
    }
627

L
Liu Jicong 已提交
628 629 630 631 632 633 634 635 636 637
    taosArrayDestroy(pRebInfo->lostConsumers);
    taosArrayDestroy(pRebInfo->newConsumers);
    taosArrayDestroy(pRebInfo->removedConsumers);

    taosArrayDestroy(rebOutput.newConsumers);
    taosArrayDestroy(rebOutput.touchedConsumers);
    taosArrayDestroy(rebOutput.removedConsumers);
    taosArrayDestroy(rebOutput.rebVgs);
    tDeleteSubscribeObj(rebOutput.pSub);
    taosMemoryFree(rebOutput.pSub);
638 639 640
  }

  // reset flag
641
  mInfo("mq re-balance completed successfully");
642
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
643
  mndRebEnd();
644 645 646

  return 0;
}
L
Liu Jicong 已提交
647

648 649
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
L
Liu Jicong 已提交
650 651
  SMDropCgroupReq dropReq = {0};

652
  if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
L
Liu Jicong 已提交
653 654 655 656 657 658 659
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
  if (pSub == NULL) {
    if (dropReq.igNotExists) {
660
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
661 662 663 664 665 666 667 668
      return 0;
    } else {
      terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr());
      return -1;
    }
  }

L
Liu Jicong 已提交
669
  if (taosHashGetSize(pSub->consumerHash) != 0) {
L
Liu Jicong 已提交
670 671
    terrno = TSDB_CODE_MND_CGROUP_USED;
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
672
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
673 674 675
    return -1;
  }

676
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
L
Liu Jicong 已提交
677 678
  if (pTrans == NULL) {
    mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
679
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
680
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
681 682 683
    return -1;
  }

684
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
685 686 687

  if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
    mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
688
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
689
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
690 691 692
    return -1;
  }

L
Liu Jicong 已提交
693 694 695 696 697
  if (mndTransPrepare(pMnode, pTrans) < 0) {
    mndReleaseSubscribe(pMnode, pSub);
    mndTransDrop(pTrans);
    return -1;
  }
L
Liu Jicong 已提交
698 699 700 701 702
  mndReleaseSubscribe(pMnode, pSub);

  return TSDB_CODE_ACTION_IN_PROGRESS;
}

L
Liu Jicong 已提交
703 704 705
void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
706
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
707
  void   *buf = NULL;
L
Liu Jicong 已提交
708
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
S
Shengliang Guan 已提交
709
  if (tlen <= 0) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
710
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
711 712 713 714

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

wafwerar's avatar
wafwerar 已提交
715
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
716
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
717

L
Liu Jicong 已提交
718 719
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
720 721 722 723 724 725 726

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
727 728
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
729
SUB_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
730
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
731
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
732 733 734 735 736 737 738 739 740 741 742
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
743 744 745
  SSdbRow         *pRow = NULL;
  SMqSubscribeObj *pSub = NULL;
  void            *buf = NULL;
L
Liu Jicong 已提交
746 747 748 749 750 751 752 753 754

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

  if (sver != MND_SUBSCRIBE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

755
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
L
Liu Jicong 已提交
756 757
  if (pRow == NULL) goto SUB_DECODE_OVER;

758
  pSub = sdbGetRowObj(pRow);
L
Liu Jicong 已提交
759 760 761 762
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
763
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
764
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
765 766 767 768 769 770 771 772
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

  if (tDecodeSubscribeObj(buf, pSub) == NULL) {
    goto SUB_DECODE_OVER;
  }

773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793
  // update epset saved in mnode
  if (pSub->unassignedVgs != NULL) {
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < size; ++i) {
      SMqVgEp *pMqVgEp = taosArrayGet(pSub->unassignedVgs, i);
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
    }
  }
  if (pSub->consumerHash != NULL) {
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
    while (pIter) {
      SMqConsumerEp *pConsumerEp = pIter;
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
      for (int32_t i = 0; i < size; ++i) {
        SMqVgEp *pMqVgEp = taosArrayGet(pConsumerEp->vgs, i);
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
      }
      pIter = taosHashIterate(pSub->consumerHash, pIter);
    }
  }

L
Liu Jicong 已提交
794 795
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
796
SUB_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
797
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
798
  if (terrno != TSDB_CODE_SUCCESS) {
799
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
800
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
801 802 803
    return NULL;
  }

S
Shengliang Guan 已提交
804
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
L
Liu Jicong 已提交
805 806 807 808 809 810 811 812 813 814
  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
815
  tDeleteSubscribeObj(pSub);
L
Liu Jicong 已提交
816 817 818 819 820
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
821 822 823 824 825 826
  taosWLockLatch(&pOldSub->lock);

  SHashObj *tmp = pOldSub->consumerHash;
  pOldSub->consumerHash = pNewSub->consumerHash;
  pNewSub->consumerHash = tmp;

827 828 829 830
  SArray *tmp1 = pOldSub->unassignedVgs;
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
  pNewSub->unassignedVgs = tmp1;

831
  taosWUnLockLatch(&pOldSub->lock);
L
Liu Jicong 已提交
832 833 834
  return 0;
}

835
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
836
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
837
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
838
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
839
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
840
  return 0;
L
Liu Jicong 已提交
841 842
}

L
Liu Jicong 已提交
843
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
844 845 846
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
847 848
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
849
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
850 851 852 853
  }
  return pSub;
}

L
Liu Jicong 已提交
854 855 856 857
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
858
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
859 860 861 862
  }
  return pSub;
}

L
Liu Jicong 已提交
863 864 865 866 867
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

L
Liu Jicong 已提交
868 869 870 871 872 873 874 875
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

L
Liu Jicong 已提交
876
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
877 878 879 880 881 882 883 884
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
885
  int32_t code = 0;
L
Liu Jicong 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898 899
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    if (pSub->dbUid != pDb->uid) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
L
Liu Jicong 已提交
900
      sdbRelease(pSdb, pSub);
901 902 903
      sdbCancelFetch(pSdb, pIter);
      code = -1;
      break;
L
Liu Jicong 已提交
904
    }
905 906

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
  }

  return code;
}

int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    char topic[TSDB_TOPIC_FNAME_LEN];
    char cgroup[TSDB_CGROUP_LEN];
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
    if (strcmp(topic, topicName) != 0) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    // iter all vnode to delete handle
931 932
    if (taosHashGetSize(pSub->consumerHash) != 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
933
      terrno = TSDB_CODE_MND_IN_REBALANCE;
934 935
      return -1;
    }
L
Liu Jicong 已提交
936 937 938 939 940 941 942 943 944 945 946 947
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp       *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
      SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
      pReq->head.vgId = htonl(pVgEp->vgId);
      pReq->vgId = pVgEp->vgId;
      pReq->consumerId = -1;
      memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
      STransAction action = {0};
      action.epSet = pVgEp->epSet;
      action.pCont = pReq;
      action.contLen = sizeof(SMqVDeleteReq);
L
Liu Jicong 已提交
948
      action.msgType = TDMT_VND_TMQ_DELETE_SUB;
L
Liu Jicong 已提交
949 950
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        taosMemoryFree(pReq);
S
Shengliang Guan 已提交
951
        sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
952 953 954
        return -1;
      }
    }
L
Liu Jicong 已提交
955 956 957

    if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
958 959
      goto END;
    }
S
Shengliang Guan 已提交
960 961

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
962 963 964 965 966 967
  }

  code = 0;
END:
  return code;
}
L
Liu Jicong 已提交
968

969
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
S
Shengliang Guan 已提交
970
  SMnode          *pMnode = pReq->info.node;
L
Liu Jicong 已提交
971 972 973 974
  SSdb            *pSdb = pMnode->pSdb;
  int32_t          numOfRows = 0;
  SMqSubscribeObj *pSub = NULL;

L
Liu Jicong 已提交
975 976
  mDebug("mnd show subscriptions begin");

L
Liu Jicong 已提交
977 978
  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
979 980 981
    if (pShow->pIter == NULL) {
      break;
    }
L
Liu Jicong 已提交
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005

    taosRLockLatch(&pSub->lock);

    if (numOfRows + pSub->vgNum > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
    }

    SMqConsumerEp *pConsumerEp = NULL;
    void          *pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pSub->consumerHash, pIter);
      if (pIter == NULL) break;
      pConsumerEp = (SMqConsumerEp *)pIter;

      int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
      for (int32_t j = 0; j < sz; j++) {
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);

        SColumnInfoData *pColInfo;
        int32_t          cols = 0;

        // topic and cgroup
        char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
1006
        mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
L
Liu Jicong 已提交
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
        varDataSetLen(topic, strlen(varDataVal(topic)));
        varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppend(pColInfo, numOfRows, (const char *)topic, false);

        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);

        // vg id
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);

        // consumer id
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);

1024
        mDebug("mnd show subscriptions: topic %s, consumer %" PRId64 " cgroup %s vgid %d", varDataVal(topic),
L
Liu Jicong 已提交
1025 1026
               pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);

L
Liu Jicong 已提交
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
        // offset
#if 0
      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif

        numOfRows++;
      }
    }

L
Liu Jicong 已提交
1042
    // do not show for cleared subscription
1043
#if 1
L
Liu Jicong 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);

      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // topic and cgroup
      char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
1054
      mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
L
Liu Jicong 已提交
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
      varDataSetLen(topic, strlen(varDataVal(topic)));
      varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)topic, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);

      // vg id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, NULL, true);

1072
      mDebug("mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup),
L
Liu Jicong 已提交
1073 1074
             pVgEp->vgId);

L
Liu Jicong 已提交
1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
      // offset
#if 0
      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif

      numOfRows++;
    }

L
Liu Jicong 已提交
1089
#endif
1090 1091 1092

    pBlock->info.rows = numOfRows;

L
Liu Jicong 已提交
1093 1094 1095 1096
    taosRUnLockLatch(&pSub->lock);
    sdbRelease(pSdb, pSub);
  }

L
Liu Jicong 已提交
1097 1098
  mDebug("mnd end show subscriptions");

L
Liu Jicong 已提交
1099 1100 1101 1102
  pShow->numOfRows += numOfRows;
  return numOfRows;
}

1103
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
L
Liu Jicong 已提交
1104 1105 1106
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}