mndSubscribe.c 39.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18
#include "mndConsumer.h"
L
Liu Jicong 已提交
19
#include "mndScheduler.h"
L
Liu Jicong 已提交
20 21 22 23 24 25 26
#include "mndShow.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

27
#define MND_SUBSCRIBE_VER_NUMBER   2
L
Liu Jicong 已提交
28 29
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
30
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34 35 36
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
37 38 39 40
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
41

42 43 44 45 46 47 48
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
49

50 51 52 53 54 55 56
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
57

L
Liu Jicong 已提交
58
int32_t mndInitSubscribe(SMnode *pMnode) {
59 60 61 62 63 64 65 66 67 68
  SSdbTable table = {
      .sdbType = SDB_SUBSCRIBE,
      .keyType = SDB_KEY_BINARY,
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
      .insertFp = (SdbInsertFp)mndSubActionInsert,
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
  };

L
Liu Jicong 已提交
69 70 71 72 73
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
74 75 76 77

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);

L
Liu Jicong 已提交
78 79 80
  return sdbSetTable(pMnode->pSdb, table);
}

81
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
82 83 84 85 86
  SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
87

L
Liu Jicong 已提交
88
  pSub->dbUid = pTopic->dbUid;
L
Liu Jicong 已提交
89
  pSub->stbUid = pTopic->stbUid;
L
Liu Jicong 已提交
90
  pSub->subType = pTopic->subType;
L
Liu Jicong 已提交
91
  pSub->withMeta = pTopic->withMeta;
L
Liu Jicong 已提交
92

93 94 95 96 97 98 99 100 101
  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    tDeleteSubscribeObj(pSub);
    taosMemoryFree(pSub);
    return NULL;
  }

  return pSub;
}

L
Liu Jicong 已提交
102 103
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub,
                                    const SMqRebOutputVg *pRebVg) {
104 105 106 107
  SMqRebVgReq req = {0};
  req.oldConsumerId = pRebVg->oldConsumerId;
  req.newConsumerId = pRebVg->newConsumerId;
  req.vgId = pRebVg->pVgEp->vgId;
L
Liu Jicong 已提交
108
  req.qmsg = pRebVg->pVgEp->qmsg;
L
Liu Jicong 已提交
109
  req.subType = pSub->subType;
L
Liu Jicong 已提交
110
  req.withMeta = pSub->withMeta;
L
Liu Jicong 已提交
111
  req.suid = pSub->stbUid;
S
Shengliang Guan 已提交
112
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
113

114 115 116 117 118 119 120 121
  int32_t tlen = 0;
  int32_t ret = 0;
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret);
  if (ret < 0) {
    return -1;
  }

  tlen += sizeof(SMsgHead);
122 123 124 125 126
  void   *buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
127

128 129 130 131 132
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(tlen);
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);

133 134 135 136 137 138 139 140
  SEncoder encoder = {0};
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
  if (tEncodeSMqRebVgReq(&encoder, &req) < 0) {
    taosMemoryFreeClear(buf);
    tEncoderClear(&encoder);
    return -1;
  }
  tEncoderClear(&encoder);
141 142 143 144 145 146
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
147
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
148
                                        const SMqRebOutputVg *pRebVg) {
149 150 151 152
//  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
//    terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
//    return -1;
//  }
153 154 155

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
156
  if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) {
157 158 159 160 161
    return -1;
  }

  int32_t vgId = pRebVg->pVgEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
L
Liu Jicong 已提交
162 163
  if (pVgObj == NULL) {
    taosMemoryFree(buf);
164
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
165 166
    return -1;
  }
167 168 169 170 171

  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
172
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
173 174 175 176 177 178 179 180

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(buf);
    return -1;
  }
  return 0;
}
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
S
Shengliang Guan 已提交
183
  int32_t i = 0;
L
Liu Jicong 已提交
184
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
185 186
    i++;
  }
L
Liu Jicong 已提交
187 188
  memcpy(cgroup, key, i);
  cgroup[i] = 0;
L
Liu Jicong 已提交
189 190 191 192 193 194 195 196
  if (fullName) {
    strcpy(topic, &key[i + 1]);
  } else {
    while (key[i] != '.') {
      i++;
    }
    strcpy(topic, &key[i + 1]);
  }
L
Liu Jicong 已提交
197 198 199
  return 0;
}

L
Liu Jicong 已提交
200 201
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
  SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
L
Liu Jicong 已提交
202 203 204
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
L
Liu Jicong 已提交
205
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
206 207
      return NULL;
    }
L
Liu Jicong 已提交
208
    taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
L
Liu Jicong 已提交
209 210 211 212
  }
  return pRebSub;
}

213
static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
214 215
  int32_t     numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  const char *pSubKey = pOutput->pSub->key;
216 217

  int32_t actualRemoved = 0;
218
  for (int32_t i = 0; i < numOfRemoved; i++) {
219
    uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
220

221
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
222

223
    // consumer exists till now
224
    if (pConsumerEp) {
225
      actualRemoved++;
226

227
      int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
228
      for (int32_t j = 0; j < consumerVgNum; j++) {
229
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
230

231
        SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
232
        taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
233
        mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
234
      }
235

L
Liu Jicong 已提交
236
      taosArrayDestroy(pConsumerEp->vgs);
237
      taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
238

239 240 241 242
      // put into removed
      taosArrayPush(pOutput->removedConsumers, &consumerId);
    }
  }
243

244 245 246 247
  if (numOfRemoved != actualRemoved) {
    mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved);
  } else {
    mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved);
248
  }
249
}
250

251
static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
X
Xiaoyu Wang 已提交
252
  int32_t     numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
253 254 255 256 257
  const char *pSubKey = pOutput->pSub->key;

  for (int32_t i = 0; i < numOfNewConsumers; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);

258
    SMqConsumerEp newConsumerEp = {0};
259 260 261 262 263 264
    newConsumerEp.consumerId = consumerId;
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));

    taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
    taosArrayPush(pOutput->newConsumers, &consumerId);
    mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId);
265
  }
266
}
267

X
Xiaoyu Wang 已提交
268
static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
269
  const char *pSubKey = pOutput->pSub->key;
X
Xiaoyu Wang 已提交
270
  int32_t     numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
271 272 273 274 275 276 277 278 279 280

  for (int32_t i = 0; i < numOfVgroups; i++) {
    SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
    SMqRebOutputVg rebOutput = {
        .oldConsumerId = -1,
        .newConsumerId = -1,
        .pVgEp = pVgEp,
    };

    taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
281
    mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId);
L
Liu Jicong 已提交
282
  }
283
}
284

285 286 287 288 289 290 291 292 293 294 295 296
static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
  for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
    SMqVgEp       *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
    SMqRebOutputVg outputVg = {
        .oldConsumerId = pConsumerEp->consumerId,
        .newConsumerId = pConsumerEp->consumerId,
        .pVgEp = pVgEp,
    };
    taosArrayPush(pOutput->rebVgs, &outputVg);
  }
}

X
Xiaoyu Wang 已提交
297 298
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
                                        int32_t imbConsumerNum) {
299
  const char *pSubKey = pOutput->pSub->key;
300 301 302

  int32_t imbCnt = 0;
  void   *pIter = NULL;
303

304 305
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
306 307 308 309
    if (pIter == NULL) {
      break;
    }

310
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
311
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
312

313 314 315
    // all old consumers still existing need to be modified
    // TODO optimize: modify only consumer whose vgs changed
    taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
316 317
    if (consumerVgNum > minVgCnt) {
      if (imbCnt < imbConsumerNum) {
318 319 320 321 322 323 324 325 326 327 328
        // pop until equal minVg + 1
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
          SMqRebOutputVg outputVg = {
              .oldConsumerId = pConsumerEp->consumerId,
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
                pConsumerEp->consumerId);
329
        }
330
        imbCnt++;
331
      } else {
332
        // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
333 334
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
335
          SMqRebOutputVg outputVg = {
336
              .oldConsumerId = pConsumerEp->consumerId,
337 338 339 340
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
341
          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
L
Liu Jicong 已提交
342
                pConsumerEp->consumerId);
343 344 345
        }
      }
    }
346
    putNoTransferToOutput(pOutput, pConsumerEp);
347
  }
348
}
349

350 351 352 353 354 355 356 357 358
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
  int32_t     totalVgNum = pOutput->pSub->vgNum;
  const char *pSubKey = pOutput->pSub->key;

  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
  mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
        pInput->oldConsumerNum, numOfAdded, numOfRemoved);

359
  // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned
360 361
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);

362
  // 2. check and get actual removed consumers, put their vg into pHash
363
  doRemoveLostConsumers(pOutput, pHash, pInput);
364

365
  // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
366 367 368 369
  addUnassignedVgroups(pOutput, pHash);

  // 4. calc vg number of each consumer
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
H
Haojun Liao 已提交
370

371 372
  int32_t minVgCnt = 0;
  int32_t imbConsumerNum = 0;
H
Haojun Liao 已提交
373

374 375 376 377
  // calc num
  if (numOfFinal) {
    minVgCnt = totalVgNum / numOfFinal;
    imbConsumerNum = totalVgNum % numOfFinal;
378 379 380 381
    mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value",
          pSubKey, numOfFinal, minVgCnt, imbConsumerNum);
  } else {
    mInfo("sub:%s no consumer subscribe this topic", pSubKey);
382 383
  }

384
  // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash
385 386 387 388 389
  transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);

  // 6. add new consumer into sub
  doAddNewConsumers(pOutput, pInput);

390 391
  SMqRebOutputVg *pRebVg = NULL;
  void           *pRemovedIter = NULL;
392 393
  void           *pIter = NULL;

394
  // 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups
395 396
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
397 398 399 400
    if (pIter == NULL) {
      break;
    }

401
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
L
Liu Jicong 已提交
402 403

    // push until equal minVg
404
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
L
Liu Jicong 已提交
405 406
      // iter hash and find one vg
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
S
Shengliang Guan 已提交
407
      if (pRemovedIter == NULL) {
408
        mError("sub:%s removed iter is null, never can reach hear", pSubKey);
409
        break;
S
Shengliang Guan 已提交
410
      }
411

L
Liu Jicong 已提交
412
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
413
      pRebVg->newConsumerId = pConsumerEp->consumerId;
414 415
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
416
    }
417 418
  }

419 420 421 422 423 424 425 426 427 428
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) {
      break;
    }
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;

    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
      if (pRemovedIter == NULL) {
429
        mInfo("sub:%s removed iter is null", pSubKey);
L
Liu Jicong 已提交
430 431
        break;
      }
432

433 434 435 436
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
      pRebVg->newConsumerId = pConsumerEp->consumerId;
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
437
    }
438 439 440 441
  }

  // All assigned vg should be put into pOutput->rebVgs
  if(pRemovedIter != NULL){
442
    mError("sub:%s error pRemovedIter should be NULL", pSubKey);
443 444 445 446 447 448
  }
  while (1) {
    pRemovedIter = taosHashIterate(pHash, pRemovedIter);
    if (pRemovedIter == NULL) {
      break;
    }
449

450 451
    SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
    taosArrayPush(pOutput->rebVgs, pRebOutput);
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
    if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){    // if all consumer is removed
      taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);  // put all vg into unassigned
      SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
      if(pSub){
        taosRLockLatch(&pSub->lock);
        if(pOutput->pSub->offsetRows == NULL){
          pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
        }else{
          taosArrayClear(pOutput->pSub->offsetRows);
        }
        pIter = NULL;
        while(1){
          pIter = taosHashIterate(pSub->consumerHash, pIter);
          if (pIter == NULL) break;
          SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
          taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
        }
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
      }
472 473 474
    }
  }

L
Liu Jicong 已提交
475
  // 8. generate logs
476
  mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey);
L
Liu Jicong 已提交
477 478
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
479
    mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
480
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
L
Liu Jicong 已提交
481
  }
482 483 484 485 486 487 488 489 490 491 492 493

  pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) break;
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
    mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
      mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
            pConsumerEp->consumerId);
L
Liu Jicong 已提交
494 495
    }
  }
496 497 498 499 500 501 502

  // 9. clear
  taosHashCleanup(pHash);

  return 0;
}

S
Shengliang Guan 已提交
503
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
504
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
505 506 507
  if (pTrans == NULL) {
    return -1;
  }
508

509
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
510 511 512 513
  if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
    mndTransDrop(pTrans);
    return -1;
  }
514

515 516 517 518 519 520
  // make txn:
  // 1. redo action: action to all vg
  const SArray *rebVgs = pOutput->rebVgs;
  int32_t       vgNum = taosArrayGetSize(rebVgs);
  for (int32_t i = 0; i < vgNum; i++) {
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
L
Liu Jicong 已提交
521
    if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
H
Haojun Liao 已提交
522 523
      mndTransDrop(pTrans);
      return -1;
524 525 526 527 528
    }
  }

  // 2. redo log: subscribe and vg assignment
  // subscribe
L
Liu Jicong 已提交
529
  if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
H
Haojun Liao 已提交
530 531
    mndTransDrop(pTrans);
    return -1;
532 533 534 535
  }

  // 3. commit log: consumer to update status and epoch
  // 3.1 set touched consumer
536
  int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
537
  for (int32_t i = 0; i < consumerNum; i++) {
538
    int64_t         consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
539 540 541 542 543
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
544 545
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
H
Haojun Liao 已提交
546 547 548

      mndTransDrop(pTrans);
      return -1;
549
    }
H
Haojun Liao 已提交
550

L
Liu Jicong 已提交
551 552
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
553
  }
H
Haojun Liao 已提交
554

555 556 557
  // 3.2 set new consumer
  consumerNum = taosArrayGetSize(pOutput->newConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
558
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
559

560 561 562 563 564
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
565
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
566 567 568
    taosArrayPush(pConsumerNew->rebNewTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
569 570
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
H
Haojun Liao 已提交
571 572 573

      mndTransDrop(pTrans);
      return -1;
574
    }
H
Haojun Liao 已提交
575

L
Liu Jicong 已提交
576 577
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
578 579 580 581 582 583
  }

  // 3.3 set removed consumer
  consumerNum = taosArrayGetSize(pOutput->removedConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
584

585 586 587 588 589
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
590
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
591 592 593
    taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
594 595
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
H
Haojun Liao 已提交
596 597 598

      mndTransDrop(pTrans);
      return -1;
599
    }
H
Haojun Liao 已提交
600

L
Liu Jicong 已提交
601 602
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
603
  }
L
Liu Jicong 已提交
604

L
Liu Jicong 已提交
605 606
  // 4. TODO commit log: modification log

L
Liu Jicong 已提交
607
  // 5. set cb
608
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
L
Liu Jicong 已提交
609 610

  // 6. execution
L
Liu Jicong 已提交
611
  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
612
    mError("failed to prepare trans rebalance since %s", terrstr());
H
Haojun Liao 已提交
613 614
    mndTransDrop(pTrans);
    return -1;
L
Liu Jicong 已提交
615
  }
616 617 618 619 620

  mndTransDrop(pTrans);
  return 0;
}

S
Shengliang Guan 已提交
621 622 623
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
  SMnode            *pMnode = pMsg->info.node;
  SMqDoRebalanceMsg *pReq = pMsg->pCont;
624
  void              *pIter = NULL;
625
//  bool               rebalanceOnce = false;  // to ensure only once.
626

627
  mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
628

629
  // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
630 631
  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
632 633 634 635
    if (pIter == NULL) {
      break;
    }

X
Xiaoyu Wang 已提交
636
    SMqRebInputObj  rebInput = {0};
L
Liu Jicong 已提交
637
    SMqRebOutputObj rebOutput = {0};
L
Liu Jicong 已提交
638 639
    rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
    rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
640
    rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
L
Liu Jicong 已提交
641 642
    rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));

643 644 645 646 647 648 649 650 651 652 653 654 655 656
    if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL ||
        rebOutput.rebVgs == NULL) {
      taosArrayDestroy(rebOutput.newConsumers);
      taosArrayDestroy(rebOutput.removedConsumers);
      taosArrayDestroy(rebOutput.modifyConsumers);
      taosArrayDestroy(rebOutput.rebVgs);

      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mInfo("mq re-balance failed, due to out of memory");
      taosHashCleanup(pReq->rebSubHash);
      mndRebEnd();
      return -1;
    }

L
Liu Jicong 已提交
657 658 659 660
    SMqRebInfo      *pRebInfo = (SMqRebInfo *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);

    rebInput.pRebInfo = pRebInfo;
661 662 663 664 665

    if (pSub == NULL) {
      // split sub key and extract topic
      char topic[TSDB_TOPIC_FNAME_LEN];
      char cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
666
      mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
667

668
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
L
Liu Jicong 已提交
669
      if (pTopic == NULL) {
670
        mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic);
L
Liu Jicong 已提交
671 672
        continue;
      }
673

674 675
      taosRLockLatch(&pTopic->lock);

676
      rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key);
L
Liu Jicong 已提交
677 678

      if (rebOutput.pSub == NULL) {
679
        mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr());
L
Liu Jicong 已提交
680 681 682 683
        taosRUnLockLatch(&pTopic->lock);
        mndReleaseTopic(pMnode, pTopic);
        continue;
      }
L
Liu Jicong 已提交
684

685
      memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
686 687 688 689
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      rebInput.oldConsumerNum = 0;
690
      mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key);
L
Liu Jicong 已提交
691 692 693 694 695
    } else {
      taosRLockLatch(&pSub->lock);
      rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
      rebOutput.pSub = tCloneSubscribeObj(pSub);
      taosRUnLockLatch(&pSub->lock);
696

697
      mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
L
Liu Jicong 已提交
698 699
      mndReleaseSubscribe(pMnode, pSub);
    }
700

701
    if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
702
      mError("mq re-balance internal error");
703
    }
704

L
Liu Jicong 已提交
705 706
    // if add more consumer to balanced subscribe,
    // possibly no vg is changed
707
    // when each topic is re-balanced, issue an trans to save the results in sdb.
L
Liu Jicong 已提交
708
    if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
709
      mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr());
L
Liu Jicong 已提交
710
    }
711

L
Liu Jicong 已提交
712
    taosArrayDestroy(rebOutput.newConsumers);
713
    taosArrayDestroy(rebOutput.modifyConsumers);
L
Liu Jicong 已提交
714 715 716 717
    taosArrayDestroy(rebOutput.removedConsumers);
    taosArrayDestroy(rebOutput.rebVgs);
    tDeleteSubscribeObj(rebOutput.pSub);
    taosMemoryFree(rebOutput.pSub);
718 719 720
  }

  // reset flag
721
  mInfo("mq re-balance completed successfully");
722
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
723
  mndRebEnd();
724 725 726

  return 0;
}
L
Liu Jicong 已提交
727

728 729
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
L
Liu Jicong 已提交
730 731
  SMDropCgroupReq dropReq = {0};

732
  if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
L
Liu Jicong 已提交
733 734 735 736 737 738 739
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
  if (pSub == NULL) {
    if (dropReq.igNotExists) {
740
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
741 742 743 744 745 746 747 748
      return 0;
    } else {
      terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr());
      return -1;
    }
  }

L
Liu Jicong 已提交
749
  if (taosHashGetSize(pSub->consumerHash) != 0) {
L
Liu Jicong 已提交
750 751
    terrno = TSDB_CODE_MND_CGROUP_USED;
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
752
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
753 754 755
    return -1;
  }

756
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
L
Liu Jicong 已提交
757 758
  if (pTrans == NULL) {
    mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
759
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
760
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
761 762 763
    return -1;
  }

764
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
765 766 767

  if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
    mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
768
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
769
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
770 771 772
    return -1;
  }

L
Liu Jicong 已提交
773 774 775 776 777
  if (mndTransPrepare(pMnode, pTrans) < 0) {
    mndReleaseSubscribe(pMnode, pSub);
    mndTransDrop(pTrans);
    return -1;
  }
L
Liu Jicong 已提交
778 779 780 781 782
  mndReleaseSubscribe(pMnode, pSub);

  return TSDB_CODE_ACTION_IN_PROGRESS;
}

L
Liu Jicong 已提交
783 784 785
void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
786
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
787
  void   *buf = NULL;
L
Liu Jicong 已提交
788
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
S
Shengliang Guan 已提交
789
  if (tlen <= 0) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
790
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
791 792 793 794

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

wafwerar's avatar
wafwerar 已提交
795
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
796
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
797

L
Liu Jicong 已提交
798 799
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
800 801 802 803 804 805 806

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
807 808
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
809
SUB_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
810
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
811
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
812 813 814 815 816 817 818 819 820 821 822
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
823 824 825
  SSdbRow         *pRow = NULL;
  SMqSubscribeObj *pSub = NULL;
  void            *buf = NULL;
L
Liu Jicong 已提交
826 827 828 829

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

830
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
L
Liu Jicong 已提交
831 832 833 834
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

835
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
L
Liu Jicong 已提交
836 837
  if (pRow == NULL) goto SUB_DECODE_OVER;

838
  pSub = sdbGetRowObj(pRow);
L
Liu Jicong 已提交
839 840 841 842
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
843
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
844
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
845 846 847 848
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

849
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
L
Liu Jicong 已提交
850 851 852
    goto SUB_DECODE_OVER;
  }

853 854 855 856
  // update epset saved in mnode
  if (pSub->unassignedVgs != NULL) {
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < size; ++i) {
wmmhello's avatar
wmmhello 已提交
857
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
858 859 860 861 862 863 864 865 866
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
    }
  }
  if (pSub->consumerHash != NULL) {
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
    while (pIter) {
      SMqConsumerEp *pConsumerEp = pIter;
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
      for (int32_t i = 0; i < size; ++i) {
wmmhello's avatar
wmmhello 已提交
867
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
868 869 870 871 872 873
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
      }
      pIter = taosHashIterate(pSub->consumerHash, pIter);
    }
  }

L
Liu Jicong 已提交
874 875
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
876
SUB_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
877
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
878
  if (terrno != TSDB_CODE_SUCCESS) {
879
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
880
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
881 882 883
    return NULL;
  }

S
Shengliang Guan 已提交
884
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
L
Liu Jicong 已提交
885 886 887 888 889 890 891 892 893 894
  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
895
  tDeleteSubscribeObj(pSub);
L
Liu Jicong 已提交
896 897 898 899 900
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
901 902 903 904 905 906
  taosWLockLatch(&pOldSub->lock);

  SHashObj *tmp = pOldSub->consumerHash;
  pOldSub->consumerHash = pNewSub->consumerHash;
  pNewSub->consumerHash = tmp;

907 908 909 910
  SArray *tmp1 = pOldSub->unassignedVgs;
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
  pNewSub->unassignedVgs = tmp1;

911 912 913 914
  SArray *tmp2 = pOldSub->offsetRows;
  pOldSub->offsetRows = pNewSub->offsetRows;
  pNewSub->offsetRows = tmp2;

915
  taosWUnLockLatch(&pOldSub->lock);
L
Liu Jicong 已提交
916 917 918
  return 0;
}

919
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
920
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
921
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
922
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
923
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
924
  return 0;
L
Liu Jicong 已提交
925 926
}

L
Liu Jicong 已提交
927
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
928 929 930
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
931 932
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
933
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
934 935 936 937
  }
  return pSub;
}

L
Liu Jicong 已提交
938 939 940 941
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
942
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
943 944 945 946
  }
  return pSub;
}

L
Liu Jicong 已提交
947 948 949 950 951
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

L
Liu Jicong 已提交
952 953 954 955 956 957 958 959
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

L
Liu Jicong 已提交
960
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
961 962 963 964 965 966 967 968
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
969
  int32_t code = 0;
L
Liu Jicong 已提交
970 971 972 973 974 975 976 977 978 979 980 981 982 983
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    if (pSub->dbUid != pDb->uid) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
L
Liu Jicong 已提交
984
      sdbRelease(pSdb, pSub);
985 986 987
      sdbCancelFetch(pSdb, pIter);
      code = -1;
      break;
L
Liu Jicong 已提交
988
    }
989 990

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014
  }

  return code;
}

int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    char topic[TSDB_TOPIC_FNAME_LEN];
    char cgroup[TSDB_CGROUP_LEN];
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
    if (strcmp(topic, topicName) != 0) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    // iter all vnode to delete handle
1015 1016
    if (taosHashGetSize(pSub->consumerHash) != 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1017
      terrno = TSDB_CODE_MND_IN_REBALANCE;
1018 1019
      return -1;
    }
L
Liu Jicong 已提交
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp       *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
      SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
      pReq->head.vgId = htonl(pVgEp->vgId);
      pReq->vgId = pVgEp->vgId;
      pReq->consumerId = -1;
      memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
      STransAction action = {0};
      action.epSet = pVgEp->epSet;
      action.pCont = pReq;
      action.contLen = sizeof(SMqVDeleteReq);
L
Liu Jicong 已提交
1032
      action.msgType = TDMT_VND_TMQ_DELETE_SUB;
L
Liu Jicong 已提交
1033 1034
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        taosMemoryFree(pReq);
S
Shengliang Guan 已提交
1035
        sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1036 1037 1038
        return -1;
      }
    }
L
Liu Jicong 已提交
1039 1040 1041

    if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1042 1043
      goto END;
    }
S
Shengliang Guan 已提交
1044 1045

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1046 1047 1048 1049 1050 1051
  }

  code = 0;
END:
  return code;
}
L
Liu Jicong 已提交
1052

1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){
  int32_t sz = taosArrayGetSize(vgs);
  for (int32_t j = 0; j < sz; j++) {
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);

    SColumnInfoData *pColInfo;
    int32_t          cols = 0;

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false);

    // vg id
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false);

    // consumer id
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataSetVal(pColInfo, *numOfRows, (const char *)&consumerId, consumerId == -1);

    mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
           consumerId, varDataVal(cgroup), pVgEp->vgId);

    // offset
    OffsetRows *data = NULL;
    for(int i = 0; i < taosArrayGetSize(offsetRows); i++){
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
      if(tmp->vgId != pVgEp->vgId){
        continue;
      }
      data = tmp;
    }
    if(data){
      // vg id
      char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
      tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
      varDataSetLen(buf, strlen(varDataVal(buf)));
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false);
    }else{
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataSetNULL(pColInfo, *numOfRows);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataSetNULL(pColInfo, *numOfRows);
      mError("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
    }
    (*numOfRows)++;
  }
  return 0;
}

1108
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
S
Shengliang Guan 已提交
1109
  SMnode          *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1110 1111 1112 1113
  SSdb            *pSdb = pMnode->pSdb;
  int32_t          numOfRows = 0;
  SMqSubscribeObj *pSub = NULL;

L
Liu Jicong 已提交
1114 1115
  mDebug("mnd show subscriptions begin");

L
Liu Jicong 已提交
1116 1117
  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
1118 1119 1120
    if (pShow->pIter == NULL) {
      break;
    }
L
Liu Jicong 已提交
1121 1122 1123 1124 1125 1126 1127

    taosRLockLatch(&pSub->lock);

    if (numOfRows + pSub->vgNum > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
    }

1128 1129 1130 1131 1132 1133 1134
    // topic and cgroup
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
    varDataSetLen(topic, strlen(varDataVal(topic)));
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

L
Liu Jicong 已提交
1135 1136 1137 1138 1139 1140 1141
    SMqConsumerEp *pConsumerEp = NULL;
    void          *pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pSub->consumerHash, pIter);
      if (pIter == NULL) break;
      pConsumerEp = (SMqConsumerEp *)pIter;

1142
      buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows);
L
Liu Jicong 已提交
1143 1144
    }

L
Liu Jicong 已提交
1145
    // do not show for cleared subscription
1146
    buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows);
L
Liu Jicong 已提交
1147

1148 1149
    pBlock->info.rows = numOfRows;

L
Liu Jicong 已提交
1150 1151 1152 1153
    taosRUnLockLatch(&pSub->lock);
    sdbRelease(pSdb, pSub);
  }

L
Liu Jicong 已提交
1154 1155
  mDebug("mnd end show subscriptions");

L
Liu Jicong 已提交
1156 1157 1158 1159
  pShow->numOfRows += numOfRows;
  return numOfRows;
}

1160
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
L
Liu Jicong 已提交
1161 1162 1163
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}