mndSubscribe.c 39.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndSubscribe.h"
L
Liu Jicong 已提交
18
#include "mndConsumer.h"
L
Liu Jicong 已提交
19
#include "mndScheduler.h"
L
Liu Jicong 已提交
20 21 22 23 24 25 26
#include "mndShow.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"

27
#define MND_SUBSCRIBE_VER_NUMBER   2
L
Liu Jicong 已提交
28 29
#define MND_SUBSCRIBE_RESERVE_SIZE 64

L
Liu Jicong 已提交
30
#define MND_SUBSCRIBE_REBALANCE_CNT 3
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34 35 36
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
37 38 39 40
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
41

42 43 44 45 46 47 48
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
49

50 51 52 53 54 55 56
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}
L
Liu Jicong 已提交
57

L
Liu Jicong 已提交
58
int32_t mndInitSubscribe(SMnode *pMnode) {
59 60 61 62 63 64 65 66 67 68
  SSdbTable table = {
      .sdbType = SDB_SUBSCRIBE,
      .keyType = SDB_KEY_BINARY,
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
      .insertFp = (SdbInsertFp)mndSubActionInsert,
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
  };

L
Liu Jicong 已提交
69 70 71 72 73
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
L
Liu Jicong 已提交
74 75 76 77

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);

L
Liu Jicong 已提交
78 79 80
  return sdbSetTable(pMnode->pSdb, table);
}

81
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
82 83 84 85 86
  SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
  if (pSub == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
87

L
Liu Jicong 已提交
88
  pSub->dbUid = pTopic->dbUid;
L
Liu Jicong 已提交
89
  pSub->stbUid = pTopic->stbUid;
L
Liu Jicong 已提交
90
  pSub->subType = pTopic->subType;
L
Liu Jicong 已提交
91
  pSub->withMeta = pTopic->withMeta;
L
Liu Jicong 已提交
92

93 94 95 96 97 98 99 100 101
  if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
    tDeleteSubscribeObj(pSub);
    taosMemoryFree(pSub);
    return NULL;
  }

  return pSub;
}

L
Liu Jicong 已提交
102 103
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub,
                                    const SMqRebOutputVg *pRebVg) {
104 105 106 107
  SMqRebVgReq req = {0};
  req.oldConsumerId = pRebVg->oldConsumerId;
  req.newConsumerId = pRebVg->newConsumerId;
  req.vgId = pRebVg->pVgEp->vgId;
L
Liu Jicong 已提交
108
  req.qmsg = pRebVg->pVgEp->qmsg;
L
Liu Jicong 已提交
109
  req.subType = pSub->subType;
L
Liu Jicong 已提交
110
  req.withMeta = pSub->withMeta;
L
Liu Jicong 已提交
111
  req.suid = pSub->stbUid;
S
Shengliang Guan 已提交
112
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
113

114 115 116 117 118 119 120 121
  int32_t tlen = 0;
  int32_t ret = 0;
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret);
  if (ret < 0) {
    return -1;
  }

  tlen += sizeof(SMsgHead);
122 123 124 125 126
  void   *buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
127

128 129 130 131 132
  SMsgHead *pMsgHead = (SMsgHead *)buf;

  pMsgHead->contLen = htonl(tlen);
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);

133 134 135 136 137 138 139 140
  SEncoder encoder = {0};
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
  if (tEncodeSMqRebVgReq(&encoder, &req) < 0) {
    taosMemoryFreeClear(buf);
    tEncoderClear(&encoder);
    return -1;
  }
  tEncoderClear(&encoder);
141 142 143 144 145 146
  *pBuf = buf;
  *pLen = tlen;

  return 0;
}

L
Liu Jicong 已提交
147
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
148
                                        const SMqRebOutputVg *pRebVg) {
149 150 151 152
//  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
//    terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
//    return -1;
//  }
153 154 155

  void   *buf;
  int32_t tlen;
L
Liu Jicong 已提交
156
  if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) {
157 158 159 160 161
    return -1;
  }

  int32_t vgId = pRebVg->pVgEp->vgId;
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
L
Liu Jicong 已提交
162 163
  if (pVgObj == NULL) {
    taosMemoryFree(buf);
164
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
165 166
    return -1;
  }
167 168 169 170 171

  STransAction action = {0};
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
  action.pCont = buf;
  action.contLen = tlen;
L
Liu Jicong 已提交
172
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
173 174 175 176 177 178 179 180

  mndReleaseVgroup(pMnode, pVgObj);
  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
    taosMemoryFree(buf);
    return -1;
  }
  return 0;
}
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
S
Shengliang Guan 已提交
183
  int32_t i = 0;
L
Liu Jicong 已提交
184
  while (key[i] != TMQ_SEPARATOR) {
L
Liu Jicong 已提交
185 186
    i++;
  }
L
Liu Jicong 已提交
187 188
  memcpy(cgroup, key, i);
  cgroup[i] = 0;
L
Liu Jicong 已提交
189 190 191 192 193 194 195 196
  if (fullName) {
    strcpy(topic, &key[i + 1]);
  } else {
    while (key[i] != '.') {
      i++;
    }
    strcpy(topic, &key[i + 1]);
  }
L
Liu Jicong 已提交
197 198 199
  return 0;
}

L
Liu Jicong 已提交
200 201
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
  SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
L
Liu Jicong 已提交
202 203 204
  if (pRebSub == NULL) {
    pRebSub = tNewSMqRebSubscribe(key);
    if (pRebSub == NULL) {
L
Liu Jicong 已提交
205
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
206 207
      return NULL;
    }
L
Liu Jicong 已提交
208
    taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
L
Liu Jicong 已提交
209 210 211 212
  }
  return pRebSub;
}

213
static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
214 215
  int32_t     numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  const char *pSubKey = pOutput->pSub->key;
216 217

  int32_t actualRemoved = 0;
218
  for (int32_t i = 0; i < numOfRemoved; i++) {
219
    uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
220

221
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
222

223
    // consumer exists till now
224
    if (pConsumerEp) {
225
      actualRemoved++;
226

227
      int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
228
      for (int32_t j = 0; j < consumerVgNum; j++) {
229
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
230

231
        SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
232
        taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
233
        mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
234
      }
235

L
Liu Jicong 已提交
236
      taosArrayDestroy(pConsumerEp->vgs);
237
      taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
238

239 240 241 242
      // put into removed
      taosArrayPush(pOutput->removedConsumers, &consumerId);
    }
  }
243

244 245 246 247
  if (numOfRemoved != actualRemoved) {
    mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved);
  } else {
    mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved);
248
  }
249
}
250

251
static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
X
Xiaoyu Wang 已提交
252
  int32_t     numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
253 254 255 256 257
  const char *pSubKey = pOutput->pSub->key;

  for (int32_t i = 0; i < numOfNewConsumers; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);

258
    SMqConsumerEp newConsumerEp = {0};
259 260 261 262 263 264
    newConsumerEp.consumerId = consumerId;
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));

    taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
    taosArrayPush(pOutput->newConsumers, &consumerId);
    mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId);
265
  }
266
}
267

X
Xiaoyu Wang 已提交
268
static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
269
  const char *pSubKey = pOutput->pSub->key;
X
Xiaoyu Wang 已提交
270
  int32_t     numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
271 272 273 274 275 276 277 278 279 280

  for (int32_t i = 0; i < numOfVgroups; i++) {
    SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
    SMqRebOutputVg rebOutput = {
        .oldConsumerId = -1,
        .newConsumerId = -1,
        .pVgEp = pVgEp,
    };

    taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
281
    mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId);
L
Liu Jicong 已提交
282
  }
283
}
284

285 286 287 288 289 290 291 292 293 294 295 296
static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
  for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
    SMqVgEp       *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
    SMqRebOutputVg outputVg = {
        .oldConsumerId = pConsumerEp->consumerId,
        .newConsumerId = pConsumerEp->consumerId,
        .pVgEp = pVgEp,
    };
    taosArrayPush(pOutput->rebVgs, &outputVg);
  }
}

X
Xiaoyu Wang 已提交
297 298
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
                                        int32_t imbConsumerNum) {
299
  const char *pSubKey = pOutput->pSub->key;
300 301 302

  int32_t imbCnt = 0;
  void   *pIter = NULL;
303

304 305
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
306 307 308 309
    if (pIter == NULL) {
      break;
    }

310
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
311
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
312

313 314 315
    // all old consumers still existing need to be modified
    // TODO optimize: modify only consumer whose vgs changed
    taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
316 317
    if (consumerVgNum > minVgCnt) {
      if (imbCnt < imbConsumerNum) {
318 319 320 321 322 323 324 325 326 327 328
        // pop until equal minVg + 1
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
          SMqRebOutputVg outputVg = {
              .oldConsumerId = pConsumerEp->consumerId,
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
                pConsumerEp->consumerId);
329
        }
330
        imbCnt++;
331
      } else {
332
        // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
333 334
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
335
          SMqRebOutputVg outputVg = {
336
              .oldConsumerId = pConsumerEp->consumerId,
337 338 339 340
              .newConsumerId = -1,
              .pVgEp = pVgEp,
          };
          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
341
          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
L
Liu Jicong 已提交
342
                pConsumerEp->consumerId);
343 344 345
        }
      }
    }
346
    putNoTransferToOutput(pOutput, pConsumerEp);
347
  }
348
}
349

350 351 352 353 354 355 356 357 358
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
  int32_t     totalVgNum = pOutput->pSub->vgNum;
  const char *pSubKey = pOutput->pSub->key;

  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
  mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
        pInput->oldConsumerNum, numOfAdded, numOfRemoved);

359
  // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned
360 361
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);

362
  // 2. check and get actual removed consumers, put their vg into pHash
363
  doRemoveLostConsumers(pOutput, pHash, pInput);
364

365
  // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
366 367 368 369
  addUnassignedVgroups(pOutput, pHash);

  // 4. calc vg number of each consumer
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
H
Haojun Liao 已提交
370

371 372
  int32_t minVgCnt = 0;
  int32_t imbConsumerNum = 0;
H
Haojun Liao 已提交
373

374 375 376 377
  // calc num
  if (numOfFinal) {
    minVgCnt = totalVgNum / numOfFinal;
    imbConsumerNum = totalVgNum % numOfFinal;
378 379 380 381
    mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value",
          pSubKey, numOfFinal, minVgCnt, imbConsumerNum);
  } else {
    mInfo("sub:%s no consumer subscribe this topic", pSubKey);
382 383
  }

384
  // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash
385 386 387 388 389
  transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);

  // 6. add new consumer into sub
  doAddNewConsumers(pOutput, pInput);

390 391
  SMqRebOutputVg *pRebVg = NULL;
  void           *pRemovedIter = NULL;
392 393
  void           *pIter = NULL;

394
  // 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups
395 396
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
397 398 399 400
    if (pIter == NULL) {
      break;
    }

401
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
L
Liu Jicong 已提交
402 403

    // push until equal minVg
404
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
L
Liu Jicong 已提交
405 406
      // iter hash and find one vg
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
S
Shengliang Guan 已提交
407
      if (pRemovedIter == NULL) {
408
        mError("sub:%s removed iter is null, never can reach hear", pSubKey);
409
        break;
S
Shengliang Guan 已提交
410
      }
411

L
Liu Jicong 已提交
412
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
413
      pRebVg->newConsumerId = pConsumerEp->consumerId;
414 415
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
416
    }
417 418
  }

419 420 421 422 423 424 425 426 427 428
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) {
      break;
    }
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;

    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
      pRemovedIter = taosHashIterate(pHash, pRemovedIter);
      if (pRemovedIter == NULL) {
429
        mInfo("sub:%s removed iter is null", pSubKey);
L
Liu Jicong 已提交
430 431
        break;
      }
432

433 434 435 436
      pRebVg = (SMqRebOutputVg *)pRemovedIter;
      pRebVg->newConsumerId = pConsumerEp->consumerId;
      taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
      mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
L
Liu Jicong 已提交
437
    }
438 439 440 441
  }

  // All assigned vg should be put into pOutput->rebVgs
  if(pRemovedIter != NULL){
442
    mError("sub:%s error pRemovedIter should be NULL", pSubKey);
443 444 445 446 447 448
  }
  while (1) {
    pRemovedIter = taosHashIterate(pHash, pRemovedIter);
    if (pRemovedIter == NULL) {
      break;
    }
449

450 451
    SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
    taosArrayPush(pOutput->rebVgs, pRebOutput);
452 453
    if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){    // if all consumer is removed
      taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);  // put all vg into unassigned
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
    }
  }

  if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) {                            // if all consumer is removed
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key);  // put all offset rows
    if (pSub) {
      taosRLockLatch(&pSub->lock);
      bool init = false;
      if (pOutput->pSub->offsetRows == NULL) {
        pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
        init = true;
      }
      pIter = NULL;
      while (1) {
        pIter = taosHashIterate(pSub->consumerHash, pIter);
        if (pIter == NULL) break;
        SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
        if (init) {
472
          taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
473 474 475 476 477 478 479 480 481 482 483 484 485
          mDebug("pSub->offsetRows is init");
        } else {
          for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
            OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
            for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
              OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
              if (d1->vgId == d2->vgId) {
                d2->rows += d1->rows;
                d2->offset = d1->offset;
                mDebug("pSub->offsetRows add vgId:%d, after:%lld, before:%lld", d2->vgId, d2->rows, d1->rows);
              }
            }
          }
486 487
        }
      }
488 489
      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
490 491 492
    }
  }

L
Liu Jicong 已提交
493
  // 8. generate logs
494
  mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey);
L
Liu Jicong 已提交
495 496
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
497
    mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
498
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
L
Liu Jicong 已提交
499
  }
500 501 502 503 504 505 506 507 508 509 510 511

  pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
    if (pIter == NULL) break;
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
    mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
      mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
            pConsumerEp->consumerId);
L
Liu Jicong 已提交
512 513
    }
  }
514 515 516 517 518 519 520

  // 9. clear
  taosHashCleanup(pHash);

  return 0;
}

S
Shengliang Guan 已提交
521
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
522
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
523 524 525
  if (pTrans == NULL) {
    return -1;
  }
526

527
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
528 529 530 531
  if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
    mndTransDrop(pTrans);
    return -1;
  }
532

533 534 535 536 537 538
  // make txn:
  // 1. redo action: action to all vg
  const SArray *rebVgs = pOutput->rebVgs;
  int32_t       vgNum = taosArrayGetSize(rebVgs);
  for (int32_t i = 0; i < vgNum; i++) {
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
L
Liu Jicong 已提交
539
    if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
H
Haojun Liao 已提交
540 541
      mndTransDrop(pTrans);
      return -1;
542 543 544 545 546
    }
  }

  // 2. redo log: subscribe and vg assignment
  // subscribe
L
Liu Jicong 已提交
547
  if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
H
Haojun Liao 已提交
548 549
    mndTransDrop(pTrans);
    return -1;
550 551 552 553
  }

  // 3. commit log: consumer to update status and epoch
  // 3.1 set touched consumer
554
  int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
555
  for (int32_t i = 0; i < consumerNum; i++) {
556
    int64_t         consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
557 558 559 560 561
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
562 563
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
H
Haojun Liao 已提交
564 565 566

      mndTransDrop(pTrans);
      return -1;
567
    }
H
Haojun Liao 已提交
568

L
Liu Jicong 已提交
569 570
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
571
  }
H
Haojun Liao 已提交
572

573 574 575
  // 3.2 set new consumer
  consumerNum = taosArrayGetSize(pOutput->newConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
L
Liu Jicong 已提交
576
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
577

578 579 580 581 582
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
583
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
584 585 586
    taosArrayPush(pConsumerNew->rebNewTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
587 588
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
H
Haojun Liao 已提交
589 590 591

      mndTransDrop(pTrans);
      return -1;
592
    }
H
Haojun Liao 已提交
593

L
Liu Jicong 已提交
594 595
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
596 597 598 599 600 601
  }

  // 3.3 set removed consumer
  consumerNum = taosArrayGetSize(pOutput->removedConsumers);
  for (int32_t i = 0; i < consumerNum; i++) {
    int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
602

603 604 605 606 607
    SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
    SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
    pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
    char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    char  cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
608
    mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
609 610 611
    taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
    mndReleaseConsumer(pMnode, pConsumerOld);
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
L
Liu Jicong 已提交
612 613
      tDeleteSMqConsumerObj(pConsumerNew);
      taosMemoryFree(pConsumerNew);
H
Haojun Liao 已提交
614 615 616

      mndTransDrop(pTrans);
      return -1;
617
    }
H
Haojun Liao 已提交
618

L
Liu Jicong 已提交
619 620
    tDeleteSMqConsumerObj(pConsumerNew);
    taosMemoryFree(pConsumerNew);
621
  }
L
Liu Jicong 已提交
622

L
Liu Jicong 已提交
623 624
  // 4. TODO commit log: modification log

L
Liu Jicong 已提交
625
  // 5. set cb
626
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
L
Liu Jicong 已提交
627 628

  // 6. execution
L
Liu Jicong 已提交
629
  if (mndTransPrepare(pMnode, pTrans) != 0) {
L
Liu Jicong 已提交
630
    mError("failed to prepare trans rebalance since %s", terrstr());
H
Haojun Liao 已提交
631 632
    mndTransDrop(pTrans);
    return -1;
L
Liu Jicong 已提交
633
  }
634 635 636 637 638

  mndTransDrop(pTrans);
  return 0;
}

S
Shengliang Guan 已提交
639 640 641
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
  SMnode            *pMnode = pMsg->info.node;
  SMqDoRebalanceMsg *pReq = pMsg->pCont;
642
  void              *pIter = NULL;
643
//  bool               rebalanceOnce = false;  // to ensure only once.
644

645
  mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
646

647
  // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
648 649
  while (1) {
    pIter = taosHashIterate(pReq->rebSubHash, pIter);
650 651 652 653
    if (pIter == NULL) {
      break;
    }

X
Xiaoyu Wang 已提交
654
    SMqRebInputObj  rebInput = {0};
L
Liu Jicong 已提交
655
    SMqRebOutputObj rebOutput = {0};
L
Liu Jicong 已提交
656 657
    rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
    rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
658
    rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
L
Liu Jicong 已提交
659 660
    rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));

661 662 663 664 665 666 667 668 669 670 671 672 673 674
    if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL ||
        rebOutput.rebVgs == NULL) {
      taosArrayDestroy(rebOutput.newConsumers);
      taosArrayDestroy(rebOutput.removedConsumers);
      taosArrayDestroy(rebOutput.modifyConsumers);
      taosArrayDestroy(rebOutput.rebVgs);

      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mInfo("mq re-balance failed, due to out of memory");
      taosHashCleanup(pReq->rebSubHash);
      mndRebEnd();
      return -1;
    }

L
Liu Jicong 已提交
675 676 677 678
    SMqRebInfo      *pRebInfo = (SMqRebInfo *)pIter;
    SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);

    rebInput.pRebInfo = pRebInfo;
679 680 681 682 683

    if (pSub == NULL) {
      // split sub key and extract topic
      char topic[TSDB_TOPIC_FNAME_LEN];
      char cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
684
      mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
685

686
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
L
Liu Jicong 已提交
687
      if (pTopic == NULL) {
688
        mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic);
L
Liu Jicong 已提交
689 690
        continue;
      }
691

692 693
      taosRLockLatch(&pTopic->lock);

694
      rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key);
L
Liu Jicong 已提交
695 696

      if (rebOutput.pSub == NULL) {
697
        mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr());
L
Liu Jicong 已提交
698 699 700 701
        taosRUnLockLatch(&pTopic->lock);
        mndReleaseTopic(pMnode, pTopic);
        continue;
      }
L
Liu Jicong 已提交
702

703
      memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
704 705 706 707
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      rebInput.oldConsumerNum = 0;
708
      mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key);
L
Liu Jicong 已提交
709 710 711 712 713
    } else {
      taosRLockLatch(&pSub->lock);
      rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
      rebOutput.pSub = tCloneSubscribeObj(pSub);
      taosRUnLockLatch(&pSub->lock);
714

715
      mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
L
Liu Jicong 已提交
716 717
      mndReleaseSubscribe(pMnode, pSub);
    }
718

719
    if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
720
      mError("mq re-balance internal error");
721
    }
722

L
Liu Jicong 已提交
723 724
    // if add more consumer to balanced subscribe,
    // possibly no vg is changed
725
    // when each topic is re-balanced, issue an trans to save the results in sdb.
L
Liu Jicong 已提交
726
    if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
727
      mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr());
L
Liu Jicong 已提交
728
    }
729

L
Liu Jicong 已提交
730
    taosArrayDestroy(rebOutput.newConsumers);
731
    taosArrayDestroy(rebOutput.modifyConsumers);
L
Liu Jicong 已提交
732 733 734 735
    taosArrayDestroy(rebOutput.removedConsumers);
    taosArrayDestroy(rebOutput.rebVgs);
    tDeleteSubscribeObj(rebOutput.pSub);
    taosMemoryFree(rebOutput.pSub);
736 737 738
  }

  // reset flag
739
  mInfo("mq re-balance completed successfully");
740
  taosHashCleanup(pReq->rebSubHash);
L
Liu Jicong 已提交
741
  mndRebEnd();
742 743 744

  return 0;
}
L
Liu Jicong 已提交
745

746 747
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
L
Liu Jicong 已提交
748 749
  SMDropCgroupReq dropReq = {0};

750
  if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
L
Liu Jicong 已提交
751 752 753 754 755 756 757
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
  if (pSub == NULL) {
    if (dropReq.igNotExists) {
758
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
759 760 761 762 763 764 765 766
      return 0;
    } else {
      terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr());
      return -1;
    }
  }

L
Liu Jicong 已提交
767
  if (taosHashGetSize(pSub->consumerHash) != 0) {
L
Liu Jicong 已提交
768 769
    terrno = TSDB_CODE_MND_CGROUP_USED;
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
770
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
771 772 773
    return -1;
  }

774
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
L
Liu Jicong 已提交
775 776
  if (pTrans == NULL) {
    mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
777
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
778
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
779 780 781
    return -1;
  }

782
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
L
Liu Jicong 已提交
783 784 785

  if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
    mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
L
Liu Jicong 已提交
786
    mndReleaseSubscribe(pMnode, pSub);
L
Liu Jicong 已提交
787
    mndTransDrop(pTrans);
L
Liu Jicong 已提交
788 789 790
    return -1;
  }

L
Liu Jicong 已提交
791 792 793 794 795
  if (mndTransPrepare(pMnode, pTrans) < 0) {
    mndReleaseSubscribe(pMnode, pSub);
    mndTransDrop(pTrans);
    return -1;
  }
L
Liu Jicong 已提交
796 797 798 799 800
  mndReleaseSubscribe(pMnode, pSub);

  return TSDB_CODE_ACTION_IN_PROGRESS;
}

L
Liu Jicong 已提交
801 802 803
void mndCleanupSubscribe(SMnode *pMnode) {}

static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
804
  terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
805
  void   *buf = NULL;
L
Liu Jicong 已提交
806
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
S
Shengliang Guan 已提交
807
  if (tlen <= 0) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
808
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
L
Liu Jicong 已提交
809 810 811 812

  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
  if (pRaw == NULL) goto SUB_ENCODE_OVER;

wafwerar's avatar
wafwerar 已提交
813
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
814
  if (buf == NULL) goto SUB_ENCODE_OVER;
L
Liu Jicong 已提交
815

L
Liu Jicong 已提交
816 817
  void *abuf = buf;
  tEncodeSubscribeObj(&abuf, pSub);
L
Liu Jicong 已提交
818 819 820 821 822 823 824

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);

L
Liu Jicong 已提交
825 826
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
827
SUB_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
828
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
829
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
830 831 832 833 834 835 836 837 838 839 840
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
  return pRaw;
}

static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
841 842 843
  SSdbRow         *pRow = NULL;
  SMqSubscribeObj *pSub = NULL;
  void            *buf = NULL;
L
Liu Jicong 已提交
844 845 846 847

  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;

848
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
L
Liu Jicong 已提交
849 850 851 852
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    goto SUB_DECODE_OVER;
  }

853
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
L
Liu Jicong 已提交
854 855
  if (pRow == NULL) goto SUB_DECODE_OVER;

856
  pSub = sdbGetRowObj(pRow);
L
Liu Jicong 已提交
857 858 859 860
  if (pSub == NULL) goto SUB_DECODE_OVER;

  int32_t dataPos = 0;
  int32_t tlen;
L
Liu Jicong 已提交
861
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
L
Liu Jicong 已提交
862
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
863 864 865 866
  if (buf == NULL) goto SUB_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);

867
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
L
Liu Jicong 已提交
868 869 870
    goto SUB_DECODE_OVER;
  }

871 872 873 874
  // update epset saved in mnode
  if (pSub->unassignedVgs != NULL) {
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < size; ++i) {
wmmhello's avatar
wmmhello 已提交
875
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
876 877 878 879 880 881 882 883 884
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
    }
  }
  if (pSub->consumerHash != NULL) {
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
    while (pIter) {
      SMqConsumerEp *pConsumerEp = pIter;
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
      for (int32_t i = 0; i < size; ++i) {
wmmhello's avatar
wmmhello 已提交
885
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
886 887 888 889 890 891
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
      }
      pIter = taosHashIterate(pSub->consumerHash, pIter);
    }
  }

L
Liu Jicong 已提交
892 893
  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
894
SUB_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
895
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
896
  if (terrno != TSDB_CODE_SUCCESS) {
897
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
898
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
899 900 901
    return NULL;
  }

S
Shengliang Guan 已提交
902
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
L
Liu Jicong 已提交
903 904 905 906 907 908 909 910 911 912
  return pRow;
}

static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform insert action", pSub->key);
  return 0;
}

static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
  mTrace("subscribe:%s, perform delete action", pSub->key);
913
  tDeleteSubscribeObj(pSub);
L
Liu Jicong 已提交
914 915 916 917 918
  return 0;
}

static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
  mTrace("subscribe:%s, perform update action", pOldSub->key);
919 920 921 922 923 924
  taosWLockLatch(&pOldSub->lock);

  SHashObj *tmp = pOldSub->consumerHash;
  pOldSub->consumerHash = pNewSub->consumerHash;
  pNewSub->consumerHash = tmp;

925 926 927 928
  SArray *tmp1 = pOldSub->unassignedVgs;
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
  pNewSub->unassignedVgs = tmp1;

929 930 931 932
  SArray *tmp2 = pOldSub->offsetRows;
  pOldSub->offsetRows = pNewSub->offsetRows;
  pNewSub->offsetRows = tmp2;

933
  taosWUnLockLatch(&pOldSub->lock);
L
Liu Jicong 已提交
934 935 936
  return 0;
}

937
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
S
Shengliang Guan 已提交
938
  int32_t tlen = strlen(cgroup);
L
Liu Jicong 已提交
939
  memcpy(key, cgroup, tlen);
L
Liu Jicong 已提交
940
  key[tlen] = TMQ_SEPARATOR;
L
Liu Jicong 已提交
941
  strcpy(key + tlen + 1, topicName);
L
Liu Jicong 已提交
942
  return 0;
L
Liu Jicong 已提交
943 944
}

L
Liu Jicong 已提交
945
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
L
Liu Jicong 已提交
946 947 948
  SSdb *pSdb = pMnode->pSdb;
  char  key[TSDB_SUBSCRIBE_KEY_LEN];
  mndMakeSubscribeKey(key, cgroup, topicName);
L
Liu Jicong 已提交
949 950
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
951
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
952 953 954 955
  }
  return pSub;
}

L
Liu Jicong 已提交
956 957 958 959
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
  SSdb            *pSdb = pMnode->pSdb;
  SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
  if (pSub == NULL) {
960
    terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
L
Liu Jicong 已提交
961 962 963 964
  }
  return pSub;
}

L
Liu Jicong 已提交
965 966 967 968 969
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pSub);
}

L
Liu Jicong 已提交
970 971 972 973 974 975 976 977
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
  SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

L
Liu Jicong 已提交
978
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
979 980 981 982 983 984 985 986
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
987
  int32_t code = 0;
L
Liu Jicong 已提交
988 989 990 991 992 993 994 995 996 997 998 999 1000 1001
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    if (pSub->dbUid != pDb->uid) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
L
Liu Jicong 已提交
1002
      sdbRelease(pSdb, pSub);
1003 1004 1005
      sdbCancelFetch(pSdb, pIter);
      code = -1;
      break;
L
Liu Jicong 已提交
1006
    }
1007 1008

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
  }

  return code;
}

int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
  int32_t code = -1;
  SSdb   *pSdb = pMnode->pSdb;

  void            *pIter = NULL;
  SMqSubscribeObj *pSub = NULL;
  while (1) {
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
    if (pIter == NULL) break;

    char topic[TSDB_TOPIC_FNAME_LEN];
    char cgroup[TSDB_CGROUP_LEN];
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
    if (strcmp(topic, topicName) != 0) {
      sdbRelease(pSdb, pSub);
      continue;
    }

    // iter all vnode to delete handle
1033 1034
    if (taosHashGetSize(pSub->consumerHash) != 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1035
      terrno = TSDB_CODE_MND_IN_REBALANCE;
1036 1037
      return -1;
    }
L
Liu Jicong 已提交
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
    int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
    for (int32_t i = 0; i < sz; i++) {
      SMqVgEp       *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
      SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
      pReq->head.vgId = htonl(pVgEp->vgId);
      pReq->vgId = pVgEp->vgId;
      pReq->consumerId = -1;
      memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
      STransAction action = {0};
      action.epSet = pVgEp->epSet;
      action.pCont = pReq;
      action.contLen = sizeof(SMqVDeleteReq);
L
Liu Jicong 已提交
1050
      action.msgType = TDMT_VND_TMQ_DELETE_SUB;
L
Liu Jicong 已提交
1051 1052
      if (mndTransAppendRedoAction(pTrans, &action) != 0) {
        taosMemoryFree(pReq);
S
Shengliang Guan 已提交
1053
        sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1054 1055 1056
        return -1;
      }
    }
L
Liu Jicong 已提交
1057 1058 1059

    if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
      sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1060 1061
      goto END;
    }
S
Shengliang Guan 已提交
1062 1063

    sdbRelease(pSdb, pSub);
L
Liu Jicong 已提交
1064 1065 1066 1067 1068 1069
  }

  code = 0;
END:
  return code;
}
L
Liu Jicong 已提交
1070

1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){
  int32_t sz = taosArrayGetSize(vgs);
  for (int32_t j = 0; j < sz; j++) {
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);

    SColumnInfoData *pColInfo;
    int32_t          cols = 0;

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false);

    // vg id
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false);

    // consumer id
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataSetVal(pColInfo, *numOfRows, (const char *)&consumerId, consumerId == -1);

    mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
           consumerId, varDataVal(cgroup), pVgEp->vgId);

    // offset
    OffsetRows *data = NULL;
    for(int i = 0; i < taosArrayGetSize(offsetRows); i++){
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
      if(tmp->vgId != pVgEp->vgId){
        continue;
      }
      data = tmp;
    }
    if(data){
      // vg id
      char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
      tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
      varDataSetLen(buf, strlen(varDataVal(buf)));
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false);
    }else{
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataSetNULL(pColInfo, *numOfRows);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataSetNULL(pColInfo, *numOfRows);
      mError("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
    }
    (*numOfRows)++;
  }
  return 0;
}

1126
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
S
Shengliang Guan 已提交
1127
  SMnode          *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1128 1129 1130 1131
  SSdb            *pSdb = pMnode->pSdb;
  int32_t          numOfRows = 0;
  SMqSubscribeObj *pSub = NULL;

L
Liu Jicong 已提交
1132 1133
  mDebug("mnd show subscriptions begin");

L
Liu Jicong 已提交
1134 1135
  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
1136 1137 1138
    if (pShow->pIter == NULL) {
      break;
    }
L
Liu Jicong 已提交
1139 1140 1141 1142 1143 1144 1145

    taosRLockLatch(&pSub->lock);

    if (numOfRows + pSub->vgNum > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
    }

1146 1147 1148 1149 1150 1151 1152
    // topic and cgroup
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
    varDataSetLen(topic, strlen(varDataVal(topic)));
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

L
Liu Jicong 已提交
1153 1154 1155 1156 1157 1158 1159
    SMqConsumerEp *pConsumerEp = NULL;
    void          *pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pSub->consumerHash, pIter);
      if (pIter == NULL) break;
      pConsumerEp = (SMqConsumerEp *)pIter;

1160
      buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows);
L
Liu Jicong 已提交
1161 1162
    }

L
Liu Jicong 已提交
1163
    // do not show for cleared subscription
1164
    buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows);
L
Liu Jicong 已提交
1165

1166 1167
    pBlock->info.rows = numOfRows;

L
Liu Jicong 已提交
1168 1169 1170 1171
    taosRUnLockLatch(&pSub->lock);
    sdbRelease(pSdb, pSub);
  }

L
Liu Jicong 已提交
1172 1173
  mDebug("mnd end show subscriptions");

L
Liu Jicong 已提交
1174 1175 1176 1177
  pShow->numOfRows += numOfRows;
  return numOfRows;
}

1178
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
L
Liu Jicong 已提交
1179 1180 1181
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}