mndSubscribe.c 40.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18
#include "mndConsumer.h"
L
Liu Jicong 已提交
19
#include "mndScheduler.h"
L
Liu Jicong 已提交
20 21 22 23 24 25 26
#include "mndShow.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

27
#define MND_SUBSCRIBE_VER_NUMBER   2
L
Liu Jicong 已提交
28 29
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
30
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34 35 36
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
37 38 39 40
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
41

42 43 44 45 46 47 48
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
49

50 51 52 53 54 55 56
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
57

L
Liu Jicong 已提交
58
int32_t mndInitSubscribe(SMnode *pMnode) {
59 60 61 62 63 64 65 66 67 68
  SSdbTable table = {
      .sdbType = SDB_SUBSCRIBE,
      .keyType = SDB_KEY_BINARY,
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
      .insertFp = (SdbInsertFp)mndSubActionInsert,
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
  };

L
Liu Jicong 已提交
69 70 71 72 73
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
74 75 76 77

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);

L
Liu Jicong 已提交
78 79 80
  return sdbSetTable(pMnode->pSdb, table);
}

81
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
82 83 84 85 86
  SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
87

L
Liu Jicong 已提交
88
  pSub->dbUid = pTopic->dbUid;
L
Liu Jicong 已提交
89
  pSub->stbUid = pTopic->stbUid;
L
Liu Jicong 已提交
90
  pSub->subType = pTopic->subType;
L
Liu Jicong 已提交
91
  pSub->withMeta = pTopic->withMeta;
L
Liu Jicong 已提交
92

93 94 95 96 97 98 99 100 101
  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    tDeleteSubscribeObj(pSub);
    taosMemoryFree(pSub);
    return NULL;
  }

  return pSub;
}

L
Liu Jicong 已提交
102 103
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub,
                                    const SMqRebOutputVg *pRebVg) {
104 105 106 107
  SMqRebVgReq req = {0};
  req.oldConsumerId = pRebVg->oldConsumerId;
  req.newConsumerId = pRebVg->newConsumerId;
  req.vgId = pRebVg->pVgEp->vgId;
L
Liu Jicong 已提交
108
  req.qmsg = pRebVg->pVgEp->qmsg;
L
Liu Jicong 已提交
109
  req.subType = pSub->subType;
L
Liu Jicong 已提交
110
  req.withMeta = pSub->withMeta;
L
Liu Jicong 已提交
111
  req.suid = pSub->stbUid;
S
Shengliang Guan 已提交
112
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
113

114 115 116 117 118 119 120 121
  int32_t tlen = 0;
  int32_t ret = 0;
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret);
  if (ret < 0) {
    return -1;
  }

  tlen += sizeof(SMsgHead);
122 123 124 125 126
  void   *buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
127

128 129 130 131 132
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(tlen);
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);

133 134 135 136 137 138 139 140
  SEncoder encoder = {0};
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
  if (tEncodeSMqRebVgReq(&encoder, &req) < 0) {
    taosMemoryFreeClear(buf);
    tEncoderClear(&encoder);
    return -1;
  }
  tEncoderClear(&encoder);
141 142 143 144 145 146
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
147
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
148
                                        const SMqRebOutputVg *pRebVg) {
149 150 151 152
//  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
//    terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
//    return -1;
//  }
153 154 155

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
156
  if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) {
157 158 159 160 161
    return -1;
  }

  int32_t vgId = pRebVg->pVgEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
L
Liu Jicong 已提交
162 163
  if (pVgObj == NULL) {
    taosMemoryFree(buf);
164
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
165 166
    return -1;
  }
167 168 169 170 171

  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
172
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
173 174 175 176 177 178 179 180

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(buf);
    return -1;
  }
  return 0;
}
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
S
Shengliang Guan 已提交
183
  int32_t i = 0;
L
Liu Jicong 已提交
184
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
185 186
    i++;
  }
L
Liu Jicong 已提交
187 188
  memcpy(cgroup, key, i);
  cgroup[i] = 0;
L
Liu Jicong 已提交
189 190 191 192 193 194 195 196
  if (fullName) {
    strcpy(topic, &key[i + 1]);
  } else {
    while (key[i] != '.') {
      i++;
    }
    strcpy(topic, &key[i + 1]);
  }
L
Liu Jicong 已提交
197 198 199
  return 0;
}

L
Liu Jicong 已提交
200 201
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
  SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
L
Liu Jicong 已提交
202 203 204
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
L
Liu Jicong 已提交
205
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
206 207
      return NULL;
    }
L
Liu Jicong 已提交
208
    taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
L
Liu Jicong 已提交
209 210 211 212
  }
  return pRebSub;
}

213
static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
214 215
  int32_t     numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  const char *pSubKey = pOutput->pSub->key;
216 217

  int32_t actualRemoved = 0;
218
  for (int32_t i = 0; i < numOfRemoved; i++) {
219
    uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
220

221
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
222

223
    // consumer exists till now
224
    if (pConsumerEp) {
225
      actualRemoved++;
226

227
      int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
228
      for (int32_t j = 0; j < consumerVgNum; j++) {
229
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
230

231
        SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
232
        taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
233
        mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
234
      }
235

L
Liu Jicong 已提交
236
      taosArrayDestroy(pConsumerEp->vgs);
237
      taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
238

239 240 241 242
      // put into removed
      taosArrayPush(pOutput->removedConsumers, &consumerId);
    }
  }
243

244 245 246 247
  if (numOfRemoved != actualRemoved) {
    mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved);
  } else {
    mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved);
248
  }
249
}
250

251
static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
X
Xiaoyu Wang 已提交
252
  int32_t     numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
253 254 255 256 257
  const char *pSubKey = pOutput->pSub->key;

  for (int32_t i = 0; i < numOfNewConsumers; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);

258
    SMqConsumerEp newConsumerEp = {0};
259 260 261 262 263 264
    newConsumerEp.consumerId = consumerId;
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));

    taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
    taosArrayPush(pOutput->newConsumers, &consumerId);
    mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId);
265
  }
266
}
267

X
Xiaoyu Wang 已提交
268
static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
269
  const char *pSubKey = pOutput->pSub->key;
X
Xiaoyu Wang 已提交
270
  int32_t     numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
271 272 273 274 275 276 277 278 279 280

  for (int32_t i = 0; i < numOfVgroups; i++) {
    SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
    SMqRebOutputVg rebOutput = {
        .oldConsumerId = -1,
        .newConsumerId = -1,
        .pVgEp = pVgEp,
    };

    taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
281
    mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId);
L
Liu Jicong 已提交
282
  }
283
}
284

285 286 287 288 289 290 291 292 293 294 295 296
static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
  for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
    SMqVgEp       *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
    SMqRebOutputVg outputVg = {
        .oldConsumerId = pConsumerEp->consumerId,
        .newConsumerId = pConsumerEp->consumerId,
        .pVgEp = pVgEp,
    };
    taosArrayPush(pOutput->rebVgs, &outputVg);
  }
}

X
Xiaoyu Wang 已提交
297 298
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
                                        int32_t imbConsumerNum) {
299
  const char *pSubKey = pOutput->pSub->key;
300 301 302

  int32_t imbCnt = 0;
  void   *pIter = NULL;
303

304 305
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
306 307 308 309
    if (pIter == NULL) {
      break;
    }

310
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
311
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
312

313 314 315
    // all old consumers still existing need to be modified
    // TODO optimize: modify only consumer whose vgs changed
    taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
316 317
    if (consumerVgNum > minVgCnt) {
      if (imbCnt < imbConsumerNum) {
318 319 320 321 322 323 324 325 326 327 328
        // pop until equal minVg + 1
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
          SMqRebOutputVg outputVg = {
              .oldConsumerId = pConsumerEp->consumerId,
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
                pConsumerEp->consumerId);
329
        }
330
        imbCnt++;
331
      } else {
332
        // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
333 334
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
335
          SMqRebOutputVg outputVg = {
336
              .oldConsumerId = pConsumerEp->consumerId,
337 338 339 340
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
341
          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
L
Liu Jicong 已提交
342
                pConsumerEp->consumerId);
343 344 345
        }
      }
    }
346
    putNoTransferToOutput(pOutput, pConsumerEp);
347
  }
348
}
349

350 351 352 353 354 355 356 357 358
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
  int32_t     totalVgNum = pOutput->pSub->vgNum;
  const char *pSubKey = pOutput->pSub->key;

  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
  mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
        pInput->oldConsumerNum, numOfAdded, numOfRemoved);

359
  // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned
360 361
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);

362
  // 2. check and get actual removed consumers, put their vg into pHash
363
  doRemoveLostConsumers(pOutput, pHash, pInput);
364

365
  // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
366 367 368 369
  addUnassignedVgroups(pOutput, pHash);

  // 4. calc vg number of each consumer
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
H
Haojun Liao 已提交
370

371 372
  int32_t minVgCnt = 0;
  int32_t imbConsumerNum = 0;
H
Haojun Liao 已提交
373

374 375 376 377
  // calc num
  if (numOfFinal) {
    minVgCnt = totalVgNum / numOfFinal;
    imbConsumerNum = totalVgNum % numOfFinal;
378 379 380 381
    mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value",
          pSubKey, numOfFinal, minVgCnt, imbConsumerNum);
  } else {
    mInfo("sub:%s no consumer subscribe this topic", pSubKey);
382 383
  }

384
  // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash
385 386 387 388 389
  transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);

  // 6. add new consumer into sub
  doAddNewConsumers(pOutput, pInput);

390 391
  SMqRebOutputVg *pRebVg = NULL;
  void           *pRemovedIter = NULL;
392 393
  void           *pIter = NULL;

394
  // 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups
395 396
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
397 398 399 400
    if (pIter == NULL) {
      break;
    }

401
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
L
Liu Jicong 已提交
402 403

    // push until equal minVg
404
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
L
Liu Jicong 已提交
405 406
      // iter hash and find one vg
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
S
Shengliang Guan 已提交
407
      if (pRemovedIter == NULL) {
408
        mError("sub:%s removed iter is null, never can reach hear", pSubKey);
409
        break;
S
Shengliang Guan 已提交
410
      }
411

L
Liu Jicong 已提交
412
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
413
      pRebVg->newConsumerId = pConsumerEp->consumerId;
414 415
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
416
    }
417 418
  }

419 420 421 422 423 424 425 426 427 428
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) {
      break;
    }
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;

    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
      if (pRemovedIter == NULL) {
429
        mInfo("sub:%s removed iter is null", pSubKey);
L
Liu Jicong 已提交
430 431
        break;
      }
432

433 434 435 436
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
      pRebVg->newConsumerId = pConsumerEp->consumerId;
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
437
    }
438 439 440 441
  }

  // All assigned vg should be put into pOutput->rebVgs
  if(pRemovedIter != NULL){
442
    mError("sub:%s error pRemovedIter should be NULL", pSubKey);
443 444 445 446 447 448
  }
  while (1) {
    pRemovedIter = taosHashIterate(pHash, pRemovedIter);
    if (pRemovedIter == NULL) {
      break;
    }
449

450 451 452
    SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
    taosArrayPush(pOutput->rebVgs, pRebOutput);
    if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){    // if all consumer is removed, put all vg into unassigned
453
      taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
454 455 456
    }
  }

L
Liu Jicong 已提交
457
  // 8. generate logs
458
  mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey);
L
Liu Jicong 已提交
459 460
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
461
    mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
462
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
L
Liu Jicong 已提交
463
  }
464 465 466 467 468 469 470 471 472 473 474 475

  pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) break;
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
    mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
      mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
            pConsumerEp->consumerId);
L
Liu Jicong 已提交
476 477
    }
  }
478 479 480 481 482 483 484

  // 9. clear
  taosHashCleanup(pHash);

  return 0;
}

S
Shengliang Guan 已提交
485
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
486
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
487 488 489
  if (pTrans == NULL) {
    return -1;
  }
490

491
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
492 493 494 495
  if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
    mndTransDrop(pTrans);
    return -1;
  }
496

497 498 499 500 501 502
  // make txn:
  // 1. redo action: action to all vg
  const SArray *rebVgs = pOutput->rebVgs;
  int32_t       vgNum = taosArrayGetSize(rebVgs);
  for (int32_t i = 0; i < vgNum; i++) {
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
L
Liu Jicong 已提交
503
    if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
H
Haojun Liao 已提交
504 505
      mndTransDrop(pTrans);
      return -1;
506 507 508 509 510
    }
  }

  // 2. redo log: subscribe and vg assignment
  // subscribe
L
Liu Jicong 已提交
511
  if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
H
Haojun Liao 已提交
512 513
    mndTransDrop(pTrans);
    return -1;
514 515 516 517
  }

  // 3. commit log: consumer to update status and epoch
  // 3.1 set touched consumer
518
  int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
519
  for (int32_t i = 0; i < consumerNum; i++) {
520
    int64_t         consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
521 522 523 524 525
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
526 527
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
H
Haojun Liao 已提交
528 529 530

      mndTransDrop(pTrans);
      return -1;
531
    }
H
Haojun Liao 已提交
532

L
Liu Jicong 已提交
533 534
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
535
  }
H
Haojun Liao 已提交
536

537 538 539
  // 3.2 set new consumer
  consumerNum = taosArrayGetSize(pOutput->newConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
540
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
541

542 543 544 545 546
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
547
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
548 549 550
    taosArrayPush(pConsumerNew->rebNewTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
551 552
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
H
Haojun Liao 已提交
553 554 555

      mndTransDrop(pTrans);
      return -1;
556
    }
H
Haojun Liao 已提交
557

L
Liu Jicong 已提交
558 559
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
560 561 562 563 564 565
  }

  // 3.3 set removed consumer
  consumerNum = taosArrayGetSize(pOutput->removedConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
566

567 568 569 570 571
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
572
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
573 574 575
    taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
576 577
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
H
Haojun Liao 已提交
578 579 580

      mndTransDrop(pTrans);
      return -1;
581
    }
H
Haojun Liao 已提交
582

L
Liu Jicong 已提交
583 584
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
585
  }
L
Liu Jicong 已提交
586

L
Liu Jicong 已提交
587 588
  // 4. TODO commit log: modification log

L
Liu Jicong 已提交
589
  // 5. set cb
590
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
L
Liu Jicong 已提交
591 592

  // 6. execution
L
Liu Jicong 已提交
593
  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
594
    mError("failed to prepare trans rebalance since %s", terrstr());
H
Haojun Liao 已提交
595 596
    mndTransDrop(pTrans);
    return -1;
L
Liu Jicong 已提交
597
  }
598 599 600 601 602

  mndTransDrop(pTrans);
  return 0;
}

S
Shengliang Guan 已提交
603 604 605
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
  SMnode            *pMnode = pMsg->info.node;
  SMqDoRebalanceMsg *pReq = pMsg->pCont;
606
  void              *pIter = NULL;
607
//  bool               rebalanceOnce = false;  // to ensure only once.
608

609
  mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
610

611
  // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
612 613
  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
614 615 616 617
    if (pIter == NULL) {
      break;
    }

X
Xiaoyu Wang 已提交
618
    SMqRebInputObj  rebInput = {0};
L
Liu Jicong 已提交
619
    SMqRebOutputObj rebOutput = {0};
L
Liu Jicong 已提交
620 621
    rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
    rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
622
    rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
L
Liu Jicong 已提交
623 624
    rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));

625 626 627 628 629 630 631 632 633 634 635 636 637 638
    if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL ||
        rebOutput.rebVgs == NULL) {
      taosArrayDestroy(rebOutput.newConsumers);
      taosArrayDestroy(rebOutput.removedConsumers);
      taosArrayDestroy(rebOutput.modifyConsumers);
      taosArrayDestroy(rebOutput.rebVgs);

      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mInfo("mq re-balance failed, due to out of memory");
      taosHashCleanup(pReq->rebSubHash);
      mndRebEnd();
      return -1;
    }

L
Liu Jicong 已提交
639 640 641 642
    SMqRebInfo      *pRebInfo = (SMqRebInfo *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);

    rebInput.pRebInfo = pRebInfo;
643 644 645 646 647

    if (pSub == NULL) {
      // split sub key and extract topic
      char topic[TSDB_TOPIC_FNAME_LEN];
      char cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
648
      mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
649

650
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
L
Liu Jicong 已提交
651
      if (pTopic == NULL) {
652
        mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic);
L
Liu Jicong 已提交
653 654
        continue;
      }
655

656 657
      taosRLockLatch(&pTopic->lock);

658
      rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key);
L
Liu Jicong 已提交
659 660

      if (rebOutput.pSub == NULL) {
661
        mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr());
L
Liu Jicong 已提交
662 663 664 665
        taosRUnLockLatch(&pTopic->lock);
        mndReleaseTopic(pMnode, pTopic);
        continue;
      }
L
Liu Jicong 已提交
666

667
      memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
668 669 670 671
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      rebInput.oldConsumerNum = 0;
672
      mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key);
L
Liu Jicong 已提交
673 674 675 676 677
    } else {
      taosRLockLatch(&pSub->lock);
      rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
      rebOutput.pSub = tCloneSubscribeObj(pSub);
      taosRUnLockLatch(&pSub->lock);
678

679
      mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
L
Liu Jicong 已提交
680 681
      mndReleaseSubscribe(pMnode, pSub);
    }
682

683
    if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
684
      mError("mq re-balance internal error");
685
    }
686

L
Liu Jicong 已提交
687 688
    // if add more consumer to balanced subscribe,
    // possibly no vg is changed
689
    // when each topic is re-balanced, issue an trans to save the results in sdb.
L
Liu Jicong 已提交
690
    if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
691
      mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr());
L
Liu Jicong 已提交
692
    }
693

L
Liu Jicong 已提交
694
    taosArrayDestroy(rebOutput.newConsumers);
695
    taosArrayDestroy(rebOutput.modifyConsumers);
L
Liu Jicong 已提交
696 697 698 699
    taosArrayDestroy(rebOutput.removedConsumers);
    taosArrayDestroy(rebOutput.rebVgs);
    tDeleteSubscribeObj(rebOutput.pSub);
    taosMemoryFree(rebOutput.pSub);
700 701 702
  }

  // reset flag
703
  mInfo("mq re-balance completed successfully");
704
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
705
  mndRebEnd();
706 707 708

  return 0;
}
L
Liu Jicong 已提交
709

710 711
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
L
Liu Jicong 已提交
712 713
  SMDropCgroupReq dropReq = {0};

714
  if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
L
Liu Jicong 已提交
715 716 717 718 719 720 721
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
  if (pSub == NULL) {
    if (dropReq.igNotExists) {
722
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
723 724 725 726 727 728 729 730
      return 0;
    } else {
      terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr());
      return -1;
    }
  }

L
Liu Jicong 已提交
731
  if (taosHashGetSize(pSub->consumerHash) != 0) {
L
Liu Jicong 已提交
732 733
    terrno = TSDB_CODE_MND_CGROUP_USED;
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
734
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
735 736 737
    return -1;
  }

738
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
L
Liu Jicong 已提交
739 740
  if (pTrans == NULL) {
    mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
741
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
742
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
743 744 745
    return -1;
  }

746
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
747 748 749

  if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
    mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
750
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
751
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
752 753 754
    return -1;
  }

L
Liu Jicong 已提交
755 756 757 758 759
  if (mndTransPrepare(pMnode, pTrans) < 0) {
    mndReleaseSubscribe(pMnode, pSub);
    mndTransDrop(pTrans);
    return -1;
  }
L
Liu Jicong 已提交
760 761 762 763 764
  mndReleaseSubscribe(pMnode, pSub);

  return TSDB_CODE_ACTION_IN_PROGRESS;
}

L
Liu Jicong 已提交
765 766 767
void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
768
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
769
  void   *buf = NULL;
L
Liu Jicong 已提交
770
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
S
Shengliang Guan 已提交
771
  if (tlen <= 0) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
772
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
773 774 775 776

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

wafwerar's avatar
wafwerar 已提交
777
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
778
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
779

L
Liu Jicong 已提交
780 781
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
782 783 784 785 786 787 788

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
789 790
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
791
SUB_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
792
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
793
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
794 795 796 797 798 799 800 801 802 803 804
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
805 806 807
  SSdbRow         *pRow = NULL;
  SMqSubscribeObj *pSub = NULL;
  void            *buf = NULL;
L
Liu Jicong 已提交
808 809 810 811

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

812
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
L
Liu Jicong 已提交
813 814 815 816
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

817
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
L
Liu Jicong 已提交
818 819
  if (pRow == NULL) goto SUB_DECODE_OVER;

820
  pSub = sdbGetRowObj(pRow);
L
Liu Jicong 已提交
821 822 823 824
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
825
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
826
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
827 828 829 830
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

831
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
L
Liu Jicong 已提交
832 833 834
    goto SUB_DECODE_OVER;
  }

835 836 837 838
  // update epset saved in mnode
  if (pSub->unassignedVgs != NULL) {
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < size; ++i) {
wmmhello's avatar
wmmhello 已提交
839
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
840 841 842 843 844 845 846 847 848
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
    }
  }
  if (pSub->consumerHash != NULL) {
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
    while (pIter) {
      SMqConsumerEp *pConsumerEp = pIter;
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
      for (int32_t i = 0; i < size; ++i) {
wmmhello's avatar
wmmhello 已提交
849
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
850 851 852 853 854 855
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
      }
      pIter = taosHashIterate(pSub->consumerHash, pIter);
    }
  }

L
Liu Jicong 已提交
856 857
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
858
SUB_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
859
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
860
  if (terrno != TSDB_CODE_SUCCESS) {
861
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
862
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
863 864 865
    return NULL;
  }

S
Shengliang Guan 已提交
866
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
L
Liu Jicong 已提交
867 868 869 870 871 872 873 874 875 876
  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
877
  tDeleteSubscribeObj(pSub);
L
Liu Jicong 已提交
878 879 880 881 882
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
883 884 885 886 887 888
  taosWLockLatch(&pOldSub->lock);

  SHashObj *tmp = pOldSub->consumerHash;
  pOldSub->consumerHash = pNewSub->consumerHash;
  pNewSub->consumerHash = tmp;

889 890 891 892
  SArray *tmp1 = pOldSub->unassignedVgs;
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
  pNewSub->unassignedVgs = tmp1;

893
  taosWUnLockLatch(&pOldSub->lock);
L
Liu Jicong 已提交
894 895 896
  return 0;
}

897
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
898
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
899
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
900
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
901
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
902
  return 0;
L
Liu Jicong 已提交
903 904
}

L
Liu Jicong 已提交
905
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
906 907 908
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
909 910
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
911
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
912 913 914 915
  }
  return pSub;
}

L
Liu Jicong 已提交
916 917 918 919
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
920
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
921 922 923 924
  }
  return pSub;
}

L
Liu Jicong 已提交
925 926 927 928 929
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

L
Liu Jicong 已提交
930 931 932 933 934 935 936 937
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

L
Liu Jicong 已提交
938
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
939 940 941 942 943 944 945 946
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
947
  int32_t code = 0;
L
Liu Jicong 已提交
948 949 950 951 952 953 954 955 956 957 958 959 960 961
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    if (pSub->dbUid != pDb->uid) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
L
Liu Jicong 已提交
962
      sdbRelease(pSdb, pSub);
963 964 965
      sdbCancelFetch(pSdb, pIter);
      code = -1;
      break;
L
Liu Jicong 已提交
966
    }
967 968

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992
  }

  return code;
}

int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    char topic[TSDB_TOPIC_FNAME_LEN];
    char cgroup[TSDB_CGROUP_LEN];
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
    if (strcmp(topic, topicName) != 0) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    // iter all vnode to delete handle
993 994
    if (taosHashGetSize(pSub->consumerHash) != 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
995
      terrno = TSDB_CODE_MND_IN_REBALANCE;
996 997
      return -1;
    }
L
Liu Jicong 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp       *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
      SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
      pReq->head.vgId = htonl(pVgEp->vgId);
      pReq->vgId = pVgEp->vgId;
      pReq->consumerId = -1;
      memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
      STransAction action = {0};
      action.epSet = pVgEp->epSet;
      action.pCont = pReq;
      action.contLen = sizeof(SMqVDeleteReq);
L
Liu Jicong 已提交
1010
      action.msgType = TDMT_VND_TMQ_DELETE_SUB;
L
Liu Jicong 已提交
1011 1012
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        taosMemoryFree(pReq);
S
Shengliang Guan 已提交
1013
        sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1014 1015 1016
        return -1;
      }
    }
L
Liu Jicong 已提交
1017 1018 1019

    if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1020 1021
      goto END;
    }
S
Shengliang Guan 已提交
1022 1023

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1024 1025 1026 1027 1028 1029
  }

  code = 0;
END:
  return code;
}
L
Liu Jicong 已提交
1030

1031
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
S
Shengliang Guan 已提交
1032
  SMnode          *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1033 1034 1035 1036
  SSdb            *pSdb = pMnode->pSdb;
  int32_t          numOfRows = 0;
  SMqSubscribeObj *pSub = NULL;

L
Liu Jicong 已提交
1037 1038
  mDebug("mnd show subscriptions begin");

L
Liu Jicong 已提交
1039 1040
  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
1041 1042 1043
    if (pShow->pIter == NULL) {
      break;
    }
L
Liu Jicong 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067

    taosRLockLatch(&pSub->lock);

    if (numOfRows + pSub->vgNum > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
    }

    SMqConsumerEp *pConsumerEp = NULL;
    void          *pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pSub->consumerHash, pIter);
      if (pIter == NULL) break;
      pConsumerEp = (SMqConsumerEp *)pIter;

      int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
      for (int32_t j = 0; j < sz; j++) {
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);

        SColumnInfoData *pColInfo;
        int32_t          cols = 0;

        // topic and cgroup
        char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
1068
        mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
L
Liu Jicong 已提交
1069 1070 1071 1072
        varDataSetLen(topic, strlen(varDataVal(topic)));
        varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1073
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1074 1075

        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1076
        colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1077 1078 1079

        // vg id
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1080
        colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
L
Liu Jicong 已提交
1081 1082 1083

        // consumer id
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1084
        colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
L
Liu Jicong 已提交
1085

D
dapan1121 已提交
1086
        mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
L
Liu Jicong 已提交
1087 1088
               pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);

L
Liu Jicong 已提交
1089
        // offset
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122
        OffsetRows *data = NULL;
        for(int i = 0; i < taosArrayGetSize(pConsumerEp->offsetRows); i++){
          OffsetRows *tmp = taosArrayGet(pConsumerEp->offsetRows, i);
          if(data->vgId != pVgEp->vgId){
            continue;
          }
          data = tmp;
        }
        if(data){
          // vg id
          char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
          tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
          varDataSetLen(buf, strlen(varDataVal(buf)));
          pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
          colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
          pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
          colDataSetVal(pColInfo, numOfRows, (const char *)&data->rows, false);
        }else{
          pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
          colDataSetNULL(pColInfo, numOfRows);
          pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
          colDataSetNULL(pColInfo, numOfRows);
          mError("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
        }
//#if 0
//      // subscribe time
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
//
//      // rebalance time
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
//#endif
L
Liu Jicong 已提交
1123 1124 1125 1126 1127

        numOfRows++;
      }
    }

L
Liu Jicong 已提交
1128
    // do not show for cleared subscription
L
Liu Jicong 已提交
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);

      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // topic and cgroup
      char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
L
Liu Jicong 已提交
1139
      mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
L
Liu Jicong 已提交
1140 1141 1142 1143
      varDataSetLen(topic, strlen(varDataVal(topic)));
      varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1144
      colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1145 1146

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1147
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1148 1149 1150

      // vg id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1151
      colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
L
Liu Jicong 已提交
1152 1153 1154

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1155
      colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1156

1157
      mDebug("mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup),
L
Liu Jicong 已提交
1158 1159
             pVgEp->vgId);

L
Liu Jicong 已提交
1160 1161 1162 1163
      // offset
#if 0
      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1164
      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
L
Liu Jicong 已提交
1165 1166 1167

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1168
      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1169 1170 1171 1172 1173
#endif

      numOfRows++;
    }

1174 1175
    pBlock->info.rows = numOfRows;

L
Liu Jicong 已提交
1176 1177 1178 1179
    taosRUnLockLatch(&pSub->lock);
    sdbRelease(pSdb, pSub);
  }

L
Liu Jicong 已提交
1180 1181
  mDebug("mnd end show subscriptions");

L
Liu Jicong 已提交
1182 1183 1184 1185
  pShow->numOfRows += numOfRows;
  return numOfRows;
}

1186
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
L
Liu Jicong 已提交
1187 1188 1189
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}