mndConsumer.c 35.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
21
#include "mndPrivilege.h"
L
Liu Jicong 已提交
22 23
#include "mndShow.h"
#include "mndStb.h"
24
#include "mndSubscribe.h"
L
Liu Jicong 已提交
25 26 27 28
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
29
#include "tcompare.h"
L
Liu Jicong 已提交
30 31
#include "tname.h"

32
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
33 34
#define MND_CONSUMER_RESERVE_SIZE 64

35 36
#define MND_CONSUMER_LOST_HB_CNT 3

L
Liu Jicong 已提交
37
static int8_t mqRebInExecCnt = 0;
38

L
Liu Jicong 已提交
39 40
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
41 42 43
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
44 45
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
static int32_t mndRetrieveConsumer(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
46
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
47

S
Shengliang Guan 已提交
48 49
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
50
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
51 52 53
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
54

L
Liu Jicong 已提交
55
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
56 57 58 59 60 61 62 63 64
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
65

L
Liu Jicong 已提交
66 67 68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
69
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
L
Liu Jicong 已提交
70 71
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
72 73 74 75

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
76 77 78 79 80
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
81
bool mndRebTryStart() {
L
Liu Jicong 已提交
82
  int8_t old = atomic_val_compare_exchange_8(&mqRebInExecCnt, 0, 1);
L
Liu Jicong 已提交
83 84 85
  return old == 0;
}

L
Liu Jicong 已提交
86
void mndRebEnd() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
87

L
Liu Jicong 已提交
88
void mndRebCntInc() { atomic_add_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
89

L
Liu Jicong 已提交
90
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
91

S
Shengliang Guan 已提交
92 93 94
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
  SMnode             *pMnode = pMsg->info.node;
  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
95
  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
L
Liu Jicong 已提交
96 97 98
  if (pConsumer == NULL) {
    return 0;
  }
99

S
Shengliang Guan 已提交
100
  mInfo("receive consumer lost msg, consumer id %" PRId64 ", status %s", pLostMsg->consumerId,
L
Liu Jicong 已提交
101 102
        mndConsumerStatusName(pConsumer->status));

D
dapan1121 已提交
103 104 105 106 107
  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

108 109 110 111 112
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

113
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
114 115 116 117
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

L
Liu Jicong 已提交
118
  tDeleteSMqConsumerObj(pConsumerNew);
119 120
  taosMemoryFree(pConsumerNew);
  mndTransDrop(pTrans);
121 122
  return 0;
FAIL:
L
Liu Jicong 已提交
123
  tDeleteSMqConsumerObj(pConsumerNew);
124
  taosMemoryFree(pConsumerNew);
125 126 127 128
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
129 130 131
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
132 133 134
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
  ASSERT(pConsumer);

S
Shengliang Guan 已提交
135
  mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId,
L
Liu Jicong 已提交
136 137
        mndConsumerStatusName(pConsumer->status));

L
Liu Jicong 已提交
138
  if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
L
Liu Jicong 已提交
139
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
140
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
141 142 143
    return -1;
  }

L
Liu Jicong 已提交
144 145 146 147 148
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;

  mndReleaseConsumer(pMnode, pConsumer);

149
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
L
Liu Jicong 已提交
150 151 152 153
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

L
Liu Jicong 已提交
154 155
  tDeleteSMqConsumerObj(pConsumerNew);
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
156 157 158 159
  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
160
  taosMemoryFree(pConsumerNew);
L
Liu Jicong 已提交
161 162 163 164
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
165
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
166 167 168 169
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
170 171 172
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
173 174 175
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
176
  }
L
Liu Jicong 已提交
177
  return pRebInfo;
178 179
}

S
Shengliang Guan 已提交
180 181
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
182 183 184 185
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

186 187
  mTrace("start to process mq timer");

188
  // rebalance cannot be parallel
L
Liu Jicong 已提交
189
  if (!mndRebTryStart()) {
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
    mInfo("mq rebalance already in progress, do nothing");
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  // TODO set cleanfp

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
    if (status == MQ_CONSUMER_STATUS__READY && hbStatus > MND_CONSUMER_LOST_HB_CNT) {
      SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));

      pLostMsg->consumerId = pConsumer->consumerId;
209
      SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
210
          .msgType = TDMT_MND_TMQ_CONSUMER_LOST,
211 212 213 214
          .pCont = pLostMsg,
          .contLen = sizeof(SMqConsumerLostMsg),
      };
      tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
215 216 217 218
    }
    if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) {
      // do nothing
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
L
Liu Jicong 已提交
219
      taosRLockLatch(&pConsumer->lock);
220 221 222 223 224
      int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
      for (int32_t i = 0; i < topicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
225
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
226 227
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
228
      taosRUnLockLatch(&pConsumer->lock);
229
    } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
L
Liu Jicong 已提交
230
      taosRLockLatch(&pConsumer->lock);
231 232 233 234 235
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
236
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
237 238 239 240 241 242 243 244
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
245
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
246 247
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
248
      taosRUnLockLatch(&pConsumer->lock);
249 250 251 252 253 254 255 256 257 258
    } else {
      // do nothing
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
259
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
260 261 262 263 264 265 266
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
L
Liu Jicong 已提交
267
    mTrace("mq rebalance finished, no modification");
L
Liu Jicong 已提交
268
    mndRebEnd();
269 270 271 272
  }
  return 0;
}

273 274 275 276 277 278
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
  SMnode   *pMnode = pMsg->info.node;
  SMqHbReq *pReq = (SMqHbReq *)pMsg->pCont;
  int64_t   consumerId = be64toh(pReq->consumerId);

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
279
  if (pConsumer == NULL) {
280
    mError("consumer %" PRId64 " not exist", consumerId);
L
Liu Jicong 已提交
281 282 283
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
284 285 286 287 288 289

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
290
    mInfo("try to recover consumer %" PRId64 "", consumerId);
291 292 293
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
294
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
295
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
296 297 298 299
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
300 301 302 303 304 305 306
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
307 308 309
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
  SMnode      *pMnode = pMsg->info.node;
  SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont;
L
Liu Jicong 已提交
310 311 312
  SMqAskEpRsp  rsp = {0};
  int64_t      consumerId = be64toh(pReq->consumerId);
  int32_t      epoch = ntohl(pReq->epoch);
313

314
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
315
  if (pConsumer == NULL) {
316 317 318 319 320
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

  ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
321

322 323 324 325
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
326

L
Liu Jicong 已提交
327
#if 1
L
Liu Jicong 已提交
328
  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
329
    mInfo("try to recover consumer %" PRId64 "", consumerId);
L
Liu Jicong 已提交
330 331 332
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
333
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
334
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
335 336 337 338
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
339
  }
340
#endif
341 342

  if (status != MQ_CONSUMER_STATUS__READY) {
343
    mInfo("consumer %" PRId64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
344 345 346 347 348 349 350 351 352
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

  // 2. check epoch, only send ep info when epoches do not match
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
S
Shengliang Guan 已提交
353
    mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch);
354 355 356 357 358
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
359
      taosRUnLockLatch(&pConsumer->lock);
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
      goto FAIL;
    }

    // handle all topic subscribed by the consumer
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);

      // txn guarantees pSub is created
      ASSERT(pSub);
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      ASSERT(pTopic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
379
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
380
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
381 382 383 384
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
385 386 387 388
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
389 390
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
391 392 393 394

      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
395
        taosRUnLockLatch(&pConsumer->lock);
396 397 398 399
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
400
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
  // encode rsp
L
Liu Jicong 已提交
420
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
421 422
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
423
    terrno = TSDB_CODE_OUT_OF_MEMORY;
424
    return -1;
L
Liu Jicong 已提交
425
  }
426 427 428
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = serverEpoch;
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
S
Shengliang Guan 已提交
429

430
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
431
  tEncodeSMqAskEpRsp(&abuf, &rsp);
432 433

  // release consumer and free memory
L
Liu Jicong 已提交
434
  tDeleteSMqAskEpRsp(&rsp);
435 436 437
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
438 439
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
440 441
  return 0;
FAIL:
L
Liu Jicong 已提交
442
  tDeleteSMqAskEpRsp(&rsp);
443 444 445 446 447 448 449 450 451 452 453 454
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
455 456 457
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
  char           *msgStr = pMsg->pCont;
458 459 460 461 462 463 464 465 466 467
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
  int64_t         consumerId = subscribe.consumerId;
  char           *cgroup = subscribe.cgroup;
  SMqConsumerObj *pConsumerOld = NULL;
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
  SArray *newSub = subscribe.topicNames;
  taosArraySortString(newSub, taosArrayCompareString);
468
  taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree);
469 470 471

  int32_t newTopicNum = taosArrayGetSize(newSub);
  // check topic existance
472
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
L
Liu Jicong 已提交
473 474
  if (pTrans == NULL) goto SUBSCRIBE_OVER;

475 476 477 478 479 480 481
  for (int32_t i = 0; i < newTopicNum; i++) {
    char        *topic = taosArrayGetP(newSub, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
    if (pTopic == NULL) {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
      goto SUBSCRIBE_OVER;
    }
L
Liu Jicong 已提交
482

S
Shengliang Guan 已提交
483
    if (mndCheckDbPrivilegeByName(pMnode, pMsg->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
484 485 486
      goto SUBSCRIBE_OVER;
    }

487 488 489 490 491
    mndReleaseTopic(pMnode, pTopic);
  }

  pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumerOld == NULL) {
S
Shengliang Guan 已提交
492
    mInfo("receive subscribe request from new consumer:%" PRId64, consumerId);
L
Liu Jicong 已提交
493

494
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
L
Liu Jicong 已提交
495
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
496
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
L
Liu Jicong 已提交
497
    taosArrayDestroy(pConsumerNew->rebNewTopics);
498 499 500
    pConsumerNew->rebNewTopics = newSub;
    subscribe.topicNames = NULL;

L
Liu Jicong 已提交
501 502 503 504 505
    for (int32_t i = 0; i < newTopicNum; i++) {
      char *newTopicCopy = strdup(taosArrayGetP(newSub, i));
      taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
    }

506 507 508 509
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;

  } else {
L
fix  
Liu Jicong 已提交
510
    /*taosRLockLatch(&pConsumerOld->lock);*/
L
Liu Jicong 已提交
511

512
    int32_t status = atomic_load_32(&pConsumerOld->status);
L
Liu Jicong 已提交
513

L
Liu Jicong 已提交
514 515
    mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d",
          consumerId, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
516

517 518 519 520 521 522 523 524 525 526 527 528
    if (status != MQ_CONSUMER_STATUS__READY) {
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
      goto SUBSCRIBE_OVER;
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto SUBSCRIBE_OVER;
    }
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;

L
Liu Jicong 已提交
529 530 531 532 533
    for (int32_t i = 0; i < newTopicNum; i++) {
      char *newTopicCopy = strdup(taosArrayGetP(newSub, i));
      taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
    }

534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
    int32_t oldTopicNum = 0;
    if (pConsumerOld->currentTopics) {
      oldTopicNum = taosArrayGetSize(pConsumerOld->currentTopics);
    }

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
        char *newTopicCopy = strdup(taosArrayGetP(newSub, j));
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
        char *oldTopicCopy = strdup(taosArrayGetP(pConsumerOld->currentTopics, i));
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
        char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
        char *newTopic = taosArrayGetP(newSub, j);
        int   comp = compareLenPrefixedStr(oldTopic, newTopic);
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
          char *oldTopicCopy = strdup(oldTopic);
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
          char *newTopicCopy = strdup(newTopic);
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

573 574 575 576 577 578 579 580
    if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
        taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
      /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
      /*pConsumerNew->updateType = */
      /*}*/
      goto SUBSCRIBE_OVER;
    }

581 582 583 584
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
  }

S
Shengliang Guan 已提交
585
  code = TSDB_CODE_ACTION_IN_PROGRESS;
586 587

SUBSCRIBE_OVER:
L
Liu Jicong 已提交
588 589
  mndTransDrop(pTrans);

590
  if (pConsumerOld) {
L
fix  
Liu Jicong 已提交
591
    /*taosRUnLockLatch(&pConsumerOld->lock);*/
592 593 594 595
    mndReleaseConsumer(pMnode, pConsumerOld);
  }
  if (pConsumerNew) {
    tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
596
    taosMemoryFree(pConsumerNew);
597 598 599 600
  }
  // TODO: replace with destroy subscribe msg
  if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
  return code;
L
Liu Jicong 已提交
601 602
}

L
Liu Jicong 已提交
603 604
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
605 606

  void   *buf = NULL;
L
Liu Jicong 已提交
607
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
608 609 610
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
611
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
612

wafwerar's avatar
wafwerar 已提交
613
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
614 615
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
616
  void *abuf = buf;
L
Liu Jicong 已提交
617 618
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
619
  int32_t dataPos = 0;
L
Liu Jicong 已提交
620 621
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
622 623
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
624

L
Liu Jicong 已提交
625 626
  terrno = TSDB_CODE_SUCCESS;

627
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
628
  taosMemoryFreeClear(buf);
629
  if (terrno != 0) {
630
    mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
631 632 633 634
    sdbFreeRaw(pRaw);
    return NULL;
  }

635
  mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
636 637 638
  return pRaw;
}

L
Liu Jicong 已提交
639
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
640
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
641
  void *buf = NULL;
642

L
Liu Jicong 已提交
643
  int8_t sver = 0;
L
Liu Jicong 已提交
644
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
L
Liu Jicong 已提交
645 646 647

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
648
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
649 650
  }

L
Liu Jicong 已提交
651
  SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
L
Liu Jicong 已提交
652
  if (pRow == NULL) goto CM_DECODE_OVER;
653

L
Liu Jicong 已提交
654
  SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
L
Liu Jicong 已提交
655
  if (pConsumer == NULL) goto CM_DECODE_OVER;
L
Liu Jicong 已提交
656 657

  int32_t dataPos = 0;
L
Liu Jicong 已提交
658
  int32_t len;
L
Liu Jicong 已提交
659
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
660
  buf = taosMemoryMalloc(len);
L
Liu Jicong 已提交
661 662 663
  if (buf == NULL) goto CM_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
664

L
Liu Jicong 已提交
665 666 667
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
668 669 670

  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
671
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
672
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
673
  if (terrno != TSDB_CODE_SUCCESS) {
674
    mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
675
    taosMemoryFreeClear(pRow);
676
    return NULL;
L
Liu Jicong 已提交
677
  }
L
Liu Jicong 已提交
678 679 680 681

  return pRow;
}

L
Liu Jicong 已提交
682
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
683
  mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId);
L
Liu Jicong 已提交
684
  pConsumer->subscribeTime = pConsumer->upTime;
L
Liu Jicong 已提交
685 686 687
  return 0;
}

L
Liu Jicong 已提交
688
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
689
  mTrace("consumer:%" PRId64 ", perform delete action", pConsumer->consumerId);
690
  tDeleteSMqConsumerObj(pConsumer);
L
Liu Jicong 已提交
691 692 693
  return 0;
}

L
Liu Jicong 已提交
694
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
695
  mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
L
Liu Jicong 已提交
696

697 698 699 700 701 702
  taosWLockLatch(&pOldConsumer->lock);

  if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
    ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
    ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);

703 704 705 706 707 708 709 710 711 712
    if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
      pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
    } else {
      SArray *tmp = pOldConsumer->rebNewTopics;
      pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
      pNewConsumer->rebNewTopics = tmp;

      tmp = pOldConsumer->rebRemovedTopics;
      pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
      pNewConsumer->rebRemovedTopics = tmp;
713

714 715 716
      tmp = pOldConsumer->assignedTopics;
      pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
      pNewConsumer->assignedTopics = tmp;
L
Liu Jicong 已提交
717

718
      pOldConsumer->subscribeTime = pNewConsumer->upTime;
L
Liu Jicong 已提交
719

720 721
      pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
    }
722
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
L
Liu Jicong 已提交
723 724 725
    ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
    ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);

726
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
L
Liu Jicong 已提交
727
    /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
728 729
    for (int32_t i = 0; i < sz; i++) {
      char *topic = strdup(taosArrayGetP(pOldConsumer->currentTopics, i));
L
Liu Jicong 已提交
730
      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
731
    }
L
Liu Jicong 已提交
732 733 734

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

735
    pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
L
Liu Jicong 已提交
736 737 738 739 740 741 742 743 744 745 746 747 748
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
    ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0);
    ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);

    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = strdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

    pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
749 750
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
751 752 753

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
    ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);
    ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);

    char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
    // not exist in current topic
#if 1
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
      ASSERT(strcmp(topic, addedTopic) != 0);
    }
#endif

    // remove from new topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
      if (strcmp(addedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->rebNewTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }

    // add to current topic
    taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
    taosArraySortString(pOldConsumer->currentTopics, taosArrayCompareString);
    // set status
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
      if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
          pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
      } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB ||
                 pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
      if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
          pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
      } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST ||
                 pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
798 799 800

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
    ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
    ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // not exist in new topic
#if 1
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
      ASSERT(strcmp(topic, removedTopic) != 0);
    }
#endif

    // remove from removed topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
L
fix  
Liu Jicong 已提交
817
      char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i);
818
      if (strcmp(removedTopic, topic) == 0) {
L
fix  
Liu Jicong 已提交
819
        taosArrayRemove(pOldConsumer->rebRemovedTopics, i);
820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
        taosMemoryFree(topic);
        break;
      }
    }

    // remove from current topic
    int32_t i = 0;
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
      if (strcmp(removedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->currentTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }
    // must find the topic
    ASSERT(i < sz);

    // set status
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
      if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
          pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
      } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB ||
                 pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
      if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
          pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
      } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST ||
                 pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
857 858 859

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

860 861 862 863
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
864 865 866
  return 0;
}

L
Liu Jicong 已提交
867
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
868 869
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
870
  if (pConsumer == NULL) {
871
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
872 873 874 875
  }
  return pConsumer;
}

L
Liu Jicong 已提交
876
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
877 878 879
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
880

S
Shengliang Guan 已提交
881 882
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
883 884 885 886 887 888 889
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
    if (pShow->pIter == NULL) break;
L
Liu Jicong 已提交
890
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
S
Shengliang Guan 已提交
891
      mDebug("showing consumer %" PRId64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
892 893 894
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
895 896 897

    taosRLockLatch(&pConsumer->lock);

S
Shengliang Guan 已提交
898
    mDebug("showing consumer %" PRId64, pConsumer->consumerId);
L
Liu Jicong 已提交
899

L
Liu Jicong 已提交
900 901 902 903 904 905 906
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
907 908 909 910
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
911 912 913 914 915 916 917 918
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);

L
Liu Jicong 已提交
919 920 921 922
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
      tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN);
      varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
L
Liu Jicong 已提交
923
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
924
      colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
925

926
      // client id
L
Liu Jicong 已提交
927 928
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
      tstrncpy(varDataVal(clientId), pConsumer->clientId, 256);
L
Liu Jicong 已提交
929
      varDataSetLen(clientId, strlen(varDataVal(clientId)));
L
Liu Jicong 已提交
930
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
931
      colDataAppend(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952

      // status
      char status[20 + VARSTR_HEADER_SIZE] = {0};
      tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
      varDataSetLen(status, strlen(varDataVal(status)));
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)status, false);

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
        tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN);
        varDataSetLen(topic, strlen(varDataVal(topic)));
        colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
      } else {
        colDataAppend(pColInfo, numOfRows, NULL, true);
      }

      // end point
L
Liu Jicong 已提交
953 954
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
      /*colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
955 956 957 958 959 960 961 962 963 964 965 966 967 968 969

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);

      numOfRows++;
    }
L
Liu Jicong 已提交
970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
    case MQ_CONSUMER_STATUS__READY:
      return "ready";
    case MQ_CONSUMER_STATUS__LOST:
    case MQ_CONSUMER_STATUS__LOST_REBD:
    case MQ_CONSUMER_STATUS__LOST_IN_REB:
      return "lost";
    case MQ_CONSUMER_STATUS__MODIFY:
    case MQ_CONSUMER_STATUS__MODIFY_IN_REB:
      return "rebalancing";
    default:
      return "unknown";
  }
}