mndSubscribe.c 34.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18 19 20 21
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23
#include "mndScheduler.h"
L
Liu Jicong 已提交
24 25 26 27 28 29 30 31 32
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

L
Liu Jicong 已提交
33
#define MND_SUBSCRIBE_VER_NUMBER   1
L
Liu Jicong 已提交
34 35
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
36
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38 39 40 41 42 43
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);

S
Shengliang Guan 已提交
44
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
L
Liu Jicong 已提交
45
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
46

S
Shengliang Guan 已提交
47
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
48 49
static void    mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);

50 51 52 53 54 55 56
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
57

58 59 60 61 62 63 64
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
65

L
Liu Jicong 已提交
66
int32_t mndInitSubscribe(SMnode *pMnode) {
67 68 69 70 71 72 73 74 75 76 77 78
  SSdbTable table = {
      .sdbType = SDB_SUBSCRIBE,
      .keyType = SDB_KEY_BINARY,
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
      .insertFp = (SdbInsertFp)mndSubActionInsert,
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
  };

  mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndTransProcessRsp);
79
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
L
Liu Jicong 已提交
80 81
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq);
82
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
83 84 85 86

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);

L
Liu Jicong 已提交
87 88 89
  return sdbSetTable(pMnode->pSdb, table);
}

90 91 92 93 94 95
static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
  SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
96
  pSub->dbUid = pTopic->dbUid;
L
Liu Jicong 已提交
97
  pSub->stbUid = pTopic->stbUid;
L
Liu Jicong 已提交
98
  pSub->subType = pTopic->subType;
L
Liu Jicong 已提交
99
  pSub->withMeta = pTopic->withMeta;
L
Liu Jicong 已提交
100

101 102
  ASSERT(pSub->unassignedVgs->size == 0);
  ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
103 104 105 106 107 108 109

  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    tDeleteSubscribeObj(pSub);
    taosMemoryFree(pSub);
    return NULL;
  }

110 111
  ASSERT(pSub->unassignedVgs->size > 0);
  ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
L
Liu Jicong 已提交
112

113 114 115
  return pSub;
}

L
Liu Jicong 已提交
116 117
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub,
                                    const SMqRebOutputVg *pRebVg) {
118 119 120 121
  SMqRebVgReq req = {0};
  req.oldConsumerId = pRebVg->oldConsumerId;
  req.newConsumerId = pRebVg->newConsumerId;
  req.vgId = pRebVg->pVgEp->vgId;
L
Liu Jicong 已提交
122
  req.qmsg = pRebVg->pVgEp->qmsg;
L
Liu Jicong 已提交
123
  req.subType = pSub->subType;
L
Liu Jicong 已提交
124
  req.withMeta = pSub->withMeta;
L
Liu Jicong 已提交
125
  req.suid = pSub->stbUid;
L
Liu Jicong 已提交
126
  strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146

  int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
  void   *buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(tlen);
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);

  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSMqRebVgReq(&abuf, &req);
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
147
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
148 149 150 151 152
                                        const SMqRebOutputVg *pRebVg) {
  ASSERT(pRebVg->oldConsumerId != pRebVg->newConsumerId);

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
153
  if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) {
154 155 156 157 158
    return -1;
  }

  int32_t vgId = pRebVg->pVgEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
L
Liu Jicong 已提交
159
  if (pVgObj == NULL) {
L
Liu Jicong 已提交
160
    ASSERT(0);
L
Liu Jicong 已提交
161 162 163
    taosMemoryFree(buf);
    return -1;
  }
164 165 166 167 168 169 170 171 172 173 174 175 176 177

  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = tlen;
  action.msgType = TDMT_VND_MQ_VG_CHANGE;

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(buf);
    return -1;
  }
  return 0;
}
L
Liu Jicong 已提交
178

L
Liu Jicong 已提交
179
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
S
Shengliang Guan 已提交
180
  int32_t i = 0;
L
Liu Jicong 已提交
181
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
182 183
    i++;
  }
L
Liu Jicong 已提交
184 185
  memcpy(cgroup, key, i);
  cgroup[i] = 0;
L
Liu Jicong 已提交
186 187 188 189 190 191 192 193
  if (fullName) {
    strcpy(topic, &key[i + 1]);
  } else {
    while (key[i] != '.') {
      i++;
    }
    strcpy(topic, &key[i + 1]);
  }
L
Liu Jicong 已提交
194 195 196
  return 0;
}

L
Liu Jicong 已提交
197 198
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
  SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
L
Liu Jicong 已提交
199 200 201
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
L
Liu Jicong 已提交
202
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
203 204
      return NULL;
    }
L
Liu Jicong 已提交
205
    taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
L
Liu Jicong 已提交
206 207 208 209
  }
  return pRebSub;
}

210 211 212
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
  int32_t totalVgNum = pOutput->pSub->vgNum;

213
  mInfo("mq rebalance: subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum);
L
Liu Jicong 已提交
214

215 216 217 218 219 220 221 222 223
  // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);

  // 2. check and get actual removed consumers, put their vg into hash
  int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  int32_t actualRemoved = 0;
  for (int32_t i = 0; i < removedNum; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
    ASSERT(consumerId > 0);
224 225 226 227
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
    ASSERT(pConsumerEp);
    if (pConsumerEp) {
      ASSERT(consumerId == pConsumerEp->consumerId);
228
      actualRemoved++;
229
      int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
230
      for (int32_t j = 0; j < consumerVgNum; j++) {
231
        SMqVgEp       *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
232 233 234 235 236 237
        SMqRebOutputVg outputVg = {
            .oldConsumerId = consumerId,
            .newConsumerId = -1,
            .pVgEp = pVgEp,
        };
        taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
S
Shengliang Guan 已提交
238
        mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64, pVgEp->vgId, consumerId);
239 240 241 242 243 244 245 246 247 248
      }
      taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
      // put into removed
      taosArrayPush(pOutput->removedConsumers, &consumerId);
    }
  }
  ASSERT(removedNum == actualRemoved);

  // if previously no consumer, there are vgs not assigned
  {
249
    int32_t consumerVgNum = taosArrayGetSize(pOutput->pSub->unassignedVgs);
250
    for (int32_t i = 0; i < consumerVgNum; i++) {
251
      SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
252 253 254 255 256 257
      SMqRebOutputVg rebOutput = {
          .oldConsumerId = -1,
          .newConsumerId = -1,
          .pVgEp = pVgEp,
      };
      taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
S
Shengliang Guan 已提交
258
      mInfo("mq rebalance: remove vgId:%d from unassigned", pVgEp->vgId);
259 260 261 262
    }
  }

  // 3. calc vg number of each consumer
L
Liu Jicong 已提交
263 264
  int32_t afterRebConsumerNum = pInput->oldConsumerNum + taosArrayGetSize(pInput->pRebInfo->newConsumers) -
                                taosArrayGetSize(pInput->pRebInfo->removedConsumers);
L
Liu Jicong 已提交
265 266
  int32_t minVgCnt = 0;
  int32_t imbConsumerNum = 0;
267
  // calc num
L
Liu Jicong 已提交
268 269 270 271
  if (afterRebConsumerNum) {
    minVgCnt = totalVgNum / afterRebConsumerNum;
    imbConsumerNum = totalVgNum % afterRebConsumerNum;
  }
272 273
  mInfo("mq rebalance: %d consumer after rebalance, at least %d vg each, %d consumer has more vg", afterRebConsumerNum,
        minVgCnt, imbConsumerNum);
274 275 276 277 278 279 280

  // 4. first scan: remove consumer more than wanted, put to remove hash
  int32_t imbCnt = 0;
  void   *pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) break;
281 282 283
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    ASSERT(pConsumerEp->consumerId > 0);
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
284 285
    // all old consumers still existing are touched
    // TODO optimize: touch only consumer whose vgs changed
286
    taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId);
287 288 289 290 291 292
    if (consumerVgNum > minVgCnt) {
      if (imbCnt < imbConsumerNum) {
        if (consumerVgNum == minVgCnt + 1) {
          continue;
        } else {
          // pop until equal minVg + 1
293 294
          while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
            SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
295
            SMqRebOutputVg outputVg = {
296
                .oldConsumerId = pConsumerEp->consumerId,
297 298 299 300
                .newConsumerId = -1,
                .pVgEp = pVgEp,
            };
            taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
L
Liu Jicong 已提交
301 302
            mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
                  pConsumerEp->consumerId);
303 304 305 306 307
          }
          imbCnt++;
        }
      } else {
        // pop until equal minVg
308 309
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
310
          SMqRebOutputVg outputVg = {
311
              .oldConsumerId = pConsumerEp->consumerId,
312 313 314 315
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
L
Liu Jicong 已提交
316 317
          mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
                pConsumerEp->consumerId);
318 319 320 321 322 323 324 325 326 327 328
        }
      }
    }
  }

  // 5. add new consumer into sub
  {
    int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers);
    for (int32_t i = 0; i < consumerNum; i++) {
      int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
      ASSERT(consumerId > 0);
329
      SMqConsumerEp newConsumerEp;
330 331
      newConsumerEp.consumerId = consumerId;
      newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
332
      taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
333
      taosArrayPush(pOutput->newConsumers, &consumerId);
S
Shengliang Guan 已提交
334
      mInfo("mq rebalance: add new consumer:%" PRId64, consumerId);
335 336 337 338 339 340 341 342 343 344 345
    }
  }

  // 6. second scan: find consumer do not have enough vg, extract from temporary hash and assign to new consumer.
  // All related vg should be put into rebVgs
  SMqRebOutputVg *pRebVg = NULL;
  void           *pRemovedIter = NULL;
  pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) break;
346 347
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    ASSERT(pConsumerEp->consumerId > 0);
L
Liu Jicong 已提交
348 349

    // push until equal minVg
350
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
L
Liu Jicong 已提交
351 352 353 354 355
      // iter hash and find one vg
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
      ASSERT(pRemovedIter);
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
      // push
356 357
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      pRebVg->newConsumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
358
      taosArrayPush(pOutput->rebVgs, pRebVg);
L
Liu Jicong 已提交
359 360
      mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
            pConsumerEp->consumerId);
L
Liu Jicong 已提交
361
    }
362 363 364
  }

  // 7. handle unassigned vg
365
  if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
L
Liu Jicong 已提交
366 367 368 369 370 371 372
    // if has consumer, assign all left vg
    while (1) {
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
      if (pRemovedIter == NULL) break;
      pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
      ASSERT(pIter);
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
373 374 375 376
      SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
      ASSERT(pConsumerEp->consumerId > 0);
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      pRebVg->newConsumerId = pConsumerEp->consumerId;
L
Liu Jicong 已提交
377 378 379 380 381
      if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
        mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
              pConsumerEp->consumerId);
        continue;
      }
L
Liu Jicong 已提交
382
      taosArrayPush(pOutput->rebVgs, pRebVg);
L
Liu Jicong 已提交
383 384
      mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
            pConsumerEp->consumerId);
L
Liu Jicong 已提交
385
    }
386 387 388 389 390 391 392 393 394
  } else {
    // if all consumer is removed, put all vg into unassigned
    pIter = NULL;
    SMqRebOutputVg *pRebOutput = NULL;
    while (1) {
      pIter = taosHashIterate(pHash, pIter);
      if (pIter == NULL) break;
      pRebOutput = (SMqRebOutputVg *)pIter;
      ASSERT(pRebOutput->newConsumerId == -1);
395
      taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
L
Liu Jicong 已提交
396
      taosArrayPush(pOutput->rebVgs, pRebOutput);
S
Shengliang Guan 已提交
397
      mInfo("mq rebalance: unassign vgId:%d (second scan)", pRebOutput->pVgEp->vgId);
398 399 400
    }
  }

L
Liu Jicong 已提交
401 402 403 404
  // 8. TODO generate logs
  mInfo("rebalance calculation completed, rebalanced vg:");
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
S
Shengliang Guan 已提交
405
    mInfo("vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, pOutputRebVg->pVgEp->vgId,
406
          pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
L
Liu Jicong 已提交
407
  }
408 409 410 411 412 413 414

  // 9. clear
  taosHashCleanup(pHash);

  return 0;
}

S
Shengliang Guan 已提交
415
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
L
Liu Jicong 已提交
416
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg);
417
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
418 419
  if (pTrans == NULL) return -1;

420 421 422 423 424 425
  // make txn:
  // 1. redo action: action to all vg
  const SArray *rebVgs = pOutput->rebVgs;
  int32_t       vgNum = taosArrayGetSize(rebVgs);
  for (int32_t i = 0; i < vgNum; i++) {
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
L
Liu Jicong 已提交
426
    if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
427 428 429 430 431 432
      goto REB_FAIL;
    }
  }

  // 2. redo log: subscribe and vg assignment
  // subscribe
L
Liu Jicong 已提交
433
  if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
    goto REB_FAIL;
  }

  // 3. commit log: consumer to update status and epoch
  // 3.1 set touched consumer
  int32_t consumerNum = taosArrayGetSize(pOutput->touchedConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
    int64_t         consumerId = *(int64_t *)taosArrayGet(pOutput->touchedConsumers, i);
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
      goto REB_FAIL;
    }
  }
  // 3.2 set new consumer
  consumerNum = taosArrayGetSize(pOutput->newConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
453
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
454 455 456 457 458 459
    ASSERT(consumerId > 0);
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
460
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
461 462 463
    taosArrayPush(pConsumerNew->rebNewTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
464
      ASSERT(0);
465 466 467 468 469 470 471 472 473 474 475 476 477 478
      goto REB_FAIL;
    }
  }

  // 3.3 set removed consumer
  consumerNum = taosArrayGetSize(pOutput->removedConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
    ASSERT(consumerId > 0);
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
479
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
480 481 482
    taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
483
      ASSERT(0);
484 485 486
      goto REB_FAIL;
    }
  }
L
Liu Jicong 已提交
487
#if 0
L
Liu Jicong 已提交
488 489 490 491 492 493 494 495 496 497
  if (consumerNum) {
    char topic[TSDB_TOPIC_FNAME_LEN];
    char cgroup[TSDB_CGROUP_LEN];
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
    if (pTopic) {
      // TODO make topic complete
      SMqTopicObj topicObj = {0};
      memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
      topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
L
Liu Jicong 已提交
498 499 500 501
      // TODO is that correct?
      pTopic->refConsumerCnt = topicObj.refConsumerCnt;
      mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup,
            topicObj.refConsumerCnt);
L
Liu Jicong 已提交
502 503 504 505
      if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) {
        ASSERT(0);
        goto REB_FAIL;
      }
L
Liu Jicong 已提交
506 507
    }
  }
L
Liu Jicong 已提交
508
#endif
L
Liu Jicong 已提交
509

L
Liu Jicong 已提交
510 511
  // 4. TODO commit log: modification log

L
Liu Jicong 已提交
512
  // 5. set cb
513
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
L
Liu Jicong 已提交
514 515

  // 6. execution
L
Liu Jicong 已提交
516 517 518 519
  if (mndTransPrepare(pMnode, pTrans) != 0) {
    ASSERT(0);
    goto REB_FAIL;
  }
520 521 522 523 524 525 526 527 528

  mndTransDrop(pTrans);
  return 0;

REB_FAIL:
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
529 530 531
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
  SMnode            *pMnode = pMsg->info.node;
  SMqDoRebalanceMsg *pReq = pMsg->pCont;
532 533 534 535 536 537 538
  void              *pIter = NULL;

  mInfo("mq rebalance start");

  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
    if (pIter == NULL) break;
L
Liu Jicong 已提交
539 540 541
    SMqRebInputObj rebInput = {0};

    SMqRebOutputObj rebOutput = {0};
L
Liu Jicong 已提交
542 543 544
    rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
    rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
    rebOutput.touchedConsumers = taosArrayInit(0, sizeof(int64_t));
L
Liu Jicong 已提交
545 546
    rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));

L
Liu Jicong 已提交
547 548 549 550
    SMqRebInfo      *pRebInfo = (SMqRebInfo *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);

    rebInput.pRebInfo = pRebInfo;
551 552 553 554 555

    if (pSub == NULL) {
      // split sub key and extract topic
      char topic[TSDB_TOPIC_FNAME_LEN];
      char cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
556
      mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
557
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
L
Liu Jicong 已提交
558 559 560 561 562
      /*ASSERT(pTopic);*/
      if (pTopic == NULL) {
        mError("rebalance %s failed since topic %s was dropped, abort", pRebInfo->key, topic);
        continue;
      }
563 564
      taosRLockLatch(&pTopic->lock);

L
Liu Jicong 已提交
565
      rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
L
Liu Jicong 已提交
566
      memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
567 568 569 570 571 572 573 574 575 576 577 578 579
      ASSERT(taosHashGetSize(rebOutput.pSub->consumerHash) == 0);

      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      rebInput.oldConsumerNum = 0;
    } else {
      taosRLockLatch(&pSub->lock);
      rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
      rebOutput.pSub = tCloneSubscribeObj(pSub);
      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
580 581 582

    // TODO replace assert with error check
    ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
583

L
Liu Jicong 已提交
584 585 586
    // if add more consumer to balanced subscribe,
    // possibly no vg is changed
    /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
587

588
    // TODO replace assert with error check
L
Liu Jicong 已提交
589 590 591
    if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
      mError("persist rebalance output error, possibly vnode splitted or dropped");
    }
L
Liu Jicong 已提交
592 593 594 595 596 597 598 599 600 601
    taosArrayDestroy(pRebInfo->lostConsumers);
    taosArrayDestroy(pRebInfo->newConsumers);
    taosArrayDestroy(pRebInfo->removedConsumers);

    taosArrayDestroy(rebOutput.newConsumers);
    taosArrayDestroy(rebOutput.touchedConsumers);
    taosArrayDestroy(rebOutput.removedConsumers);
    taosArrayDestroy(rebOutput.rebVgs);
    tDeleteSubscribeObj(rebOutput.pSub);
    taosMemoryFree(rebOutput.pSub);
602 603 604 605 606
  }

  // reset flag
  mInfo("mq rebalance completed successfully");
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
607
  mndRebEnd();
608 609 610

  return 0;
}
L
Liu Jicong 已提交
611

L
Liu Jicong 已提交
612
static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
L
Liu Jicong 已提交
613 614
  SMnode         *pMnode = pReq->info.node;
  SSdb           *pSdb = pMnode->pSdb;
L
Liu Jicong 已提交
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
  SMDropCgroupReq dropReq = {0};

  if (tDeserializeSMDropCgroupReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
  if (pSub == NULL) {
    if (dropReq.igNotExists) {
      mDebug("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr());
      return -1;
    }
  }

L
Liu Jicong 已提交
634
  if (taosHashGetSize(pSub->consumerHash) != 0) {
L
Liu Jicong 已提交
635 636
    terrno = TSDB_CODE_MND_CGROUP_USED;
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
637
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
638 639 640
    return -1;
  }

641
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
L
Liu Jicong 已提交
642 643
  if (pTrans == NULL) {
    mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
644
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
645 646 647 648 649 650 651
    return -1;
  }

  mDebug("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);

  if (mndDropOffsetBySubKey(pMnode, pTrans, pSub->key) < 0) {
    ASSERT(0);
L
Liu Jicong 已提交
652
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
653 654 655 656 657
    return -1;
  }

  if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
    mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
658
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
659 660 661
    return -1;
  }

L
Liu Jicong 已提交
662 663
  mndTransPrepare(pMnode, pTrans);

L
Liu Jicong 已提交
664 665 666 667 668
  mndReleaseSubscribe(pMnode, pSub);

  return TSDB_CODE_ACTION_IN_PROGRESS;
}

L
Liu Jicong 已提交
669 670 671
void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
672
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
673
  void   *buf = NULL;
L
Liu Jicong 已提交
674
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
L
Liu Jicong 已提交
675
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
676 677 678 679

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

wafwerar's avatar
wafwerar 已提交
680
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
681
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
682

L
Liu Jicong 已提交
683 684
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
685 686 687 688 689 690 691

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
692 693
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
694
SUB_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
695
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
696
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
697 698 699 700 701 702 703 704 705 706 707
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
708
  void *buf = NULL;
L
Liu Jicong 已提交
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

  if (sver != MND_SUBSCRIBE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

  int32_t  size = sizeof(SMqSubscribeObj);
  SSdbRow *pRow = sdbAllocRow(size);
  if (pRow == NULL) goto SUB_DECODE_OVER;

  SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
727
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
728
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
729 730 731 732 733 734 735 736
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

  if (tDecodeSubscribeObj(buf, pSub) == NULL) {
    goto SUB_DECODE_OVER;
  }

L
Liu Jicong 已提交
737 738
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
739
SUB_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
740
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
741
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
742
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
743
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
744 745 746 747 748 749 750 751 752 753 754 755 756
    return NULL;
  }

  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
757
  tDeleteSubscribeObj(pSub);
L
Liu Jicong 已提交
758 759 760 761 762
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
763 764 765 766 767 768
  taosWLockLatch(&pOldSub->lock);

  SHashObj *tmp = pOldSub->consumerHash;
  pOldSub->consumerHash = pNewSub->consumerHash;
  pNewSub->consumerHash = tmp;

769 770 771 772
  SArray *tmp1 = pOldSub->unassignedVgs;
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
  pNewSub->unassignedVgs = tmp1;

773
  taosWUnLockLatch(&pOldSub->lock);
L
Liu Jicong 已提交
774 775 776
  return 0;
}

777
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
778
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
779
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
780
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
781
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
782
  return 0;
L
Liu Jicong 已提交
783 784
}

L
Liu Jicong 已提交
785
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
786 787 788
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
789 790
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
791
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
792 793 794 795
  }
  return pSub;
}

L
Liu Jicong 已提交
796 797 798 799
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
800
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
801 802 803 804
  }
  return pSub;
}

L
Liu Jicong 已提交
805 806 807 808 809
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

L
Liu Jicong 已提交
810 811 812 813 814 815 816 817
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

L
Liu Jicong 已提交
818
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    if (pSub->dbUid != pDb->uid) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
L
Liu Jicong 已提交
842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870
      sdbRelease(pSdb, pSub);
      goto END;
    }
  }

  code = 0;
END:
  return code;
}

int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    char topic[TSDB_TOPIC_FNAME_LEN];
    char cgroup[TSDB_CGROUP_LEN];
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
    if (strcmp(topic, topicName) != 0) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    // iter all vnode to delete handle
L
Liu Jicong 已提交
871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889
    ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp       *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
      SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
      pReq->head.vgId = htonl(pVgEp->vgId);
      pReq->vgId = pVgEp->vgId;
      pReq->consumerId = -1;
      memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
      STransAction action = {0};
      action.epSet = pVgEp->epSet;
      action.pCont = pReq;
      action.contLen = sizeof(SMqVDeleteReq);
      action.msgType = TDMT_VND_MQ_VG_DELETE;
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        taosMemoryFree(pReq);
        return -1;
      }
    }
L
Liu Jicong 已提交
890 891 892

    if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
893 894 895 896 897 898 899 900
      goto END;
    }
  }

  code = 0;
END:
  return code;
}
L
Liu Jicong 已提交
901

S
Shengliang Guan 已提交
902 903
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode          *pMnode = pReq->info.node;
L
Liu Jicong 已提交
904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934
  SSdb            *pSdb = pMnode->pSdb;
  int32_t          numOfRows = 0;
  SMqSubscribeObj *pSub = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
    if (pShow->pIter == NULL) break;

    taosRLockLatch(&pSub->lock);

    if (numOfRows + pSub->vgNum > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
    }

    SMqConsumerEp *pConsumerEp = NULL;
    void          *pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pSub->consumerHash, pIter);
      if (pIter == NULL) break;
      pConsumerEp = (SMqConsumerEp *)pIter;

      int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
      for (int32_t j = 0; j < sz; j++) {
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);

        SColumnInfoData *pColInfo;
        int32_t          cols = 0;

        // topic and cgroup
        char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
935
        mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
L
Liu Jicong 已提交
936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967
        varDataSetLen(topic, strlen(varDataVal(topic)));
        varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppend(pColInfo, numOfRows, (const char *)topic, false);

        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);

        // vg id
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);

        // consumer id
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);

        // offset
#if 0
      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif

        numOfRows++;
      }
    }

L
Liu Jicong 已提交
968
    // do not show for cleared subscription
969
#if 1
L
Liu Jicong 已提交
970 971 972 973 974 975 976 977 978 979
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);

      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // topic and cgroup
      char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
980
      mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
L
Liu Jicong 已提交
981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
      varDataSetLen(topic, strlen(varDataVal(topic)));
      varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)topic, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);

      // vg id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, NULL, true);

      // offset
#if 0
      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif

      numOfRows++;
    }

L
Liu Jicong 已提交
1012
#endif
L
Liu Jicong 已提交
1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
    taosRUnLockLatch(&pSub->lock);
    sdbRelease(pSdb, pSub);
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}