mndConsumer.c 34.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19 20 21
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
22
#include "mndOffset.h"
L
Liu Jicong 已提交
23 24
#include "mndShow.h"
#include "mndStb.h"
25
#include "mndSubscribe.h"
L
Liu Jicong 已提交
26 27 28 29
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
30
#include "tcompare.h"
L
Liu Jicong 已提交
31 32
#include "tname.h"

33
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
34 35
#define MND_CONSUMER_RESERVE_SIZE 64

36 37
#define MND_CONSUMER_LOST_HB_CNT 3

L
Liu Jicong 已提交
38
static int8_t mqRebInExecCnt = 0;
39

L
Liu Jicong 已提交
40 41
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
42 43 44
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
45 46
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
static int32_t mndRetrieveConsumer(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
47
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
48

S
Shengliang Guan 已提交
49 50 51 52 53
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
54

L
Liu Jicong 已提交
55 56
int32_t mndInitConsumer(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_CONSUMER,
L
Liu Jicong 已提交
57
                     .keyType = SDB_KEY_INT64,
L
Liu Jicong 已提交
58 59 60 61 62 63
                     .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
                     .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
                     .insertFp = (SdbInsertFp)mndConsumerActionInsert,
                     .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndConsumerActionDelete};

64
  mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq);
66 67
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
78
bool mndRebTryStart() {
L
Liu Jicong 已提交
79
  int8_t old = atomic_val_compare_exchange_8(&mqRebInExecCnt, 0, 1);
L
Liu Jicong 已提交
80 81 82
  return old == 0;
}

L
Liu Jicong 已提交
83
void mndRebEnd() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
84

L
Liu Jicong 已提交
85
void mndRebCntInc() { atomic_add_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
L
Liu Jicong 已提交
88

S
Shengliang Guan 已提交
89 90 91
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
  SMnode             *pMnode = pMsg->info.node;
  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
92 93 94
  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
  ASSERT(pConsumer);

L
Liu Jicong 已提交
95 96 97
  mInfo("receive consumer lost msg, consumer id %ld, status %s", pLostMsg->consumerId,
        mndConsumerStatusName(pConsumer->status));

D
dapan1121 已提交
98 99 100 101 102
  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

103 104 105 106 107
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__LOST;

  mndReleaseConsumer(pMnode, pConsumer);

L
Liu Jicong 已提交
108
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
109 110 111 112 113 114 115
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

  mndTransDrop(pTrans);
  return 0;
FAIL:
L
Liu Jicong 已提交
116
  tDeleteSMqConsumerObj(pConsumerNew);
117 118 119 120
  mndTransDrop(pTrans);
  return -1;
}

S
Shengliang Guan 已提交
121 122 123
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
124 125 126
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
  ASSERT(pConsumer);

L
Liu Jicong 已提交
127 128 129 130 131 132 133 134
  mInfo("receive consumer recover msg, consumer id %ld, status %s", pRecoverMsg->consumerId,
        mndConsumerStatusName(pConsumer->status));

  if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

L
Liu Jicong 已提交
135 136 137 138 139
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
  pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;

  mndReleaseConsumer(pMnode, pConsumer);

140
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
L
Liu Jicong 已提交
141 142 143 144 145 146 147 148 149 150 151 152
  if (pTrans == NULL) goto FAIL;
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

  mndTransDrop(pTrans);
  return 0;
FAIL:
  tDeleteSMqConsumerObj(pConsumerNew);
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
153
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
154 155 156 157
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
158 159 160
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
161 162 163
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
164
  }
L
Liu Jicong 已提交
165
  return pRebInfo;
166 167
}

S
Shengliang Guan 已提交
168 169
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
170 171 172 173 174
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

  // rebalance cannot be parallel
L
Liu Jicong 已提交
175
  if (!mndRebTryStart()) {
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
    mInfo("mq rebalance already in progress, do nothing");
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
  // TODO set cleanfp

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
    if (status == MQ_CONSUMER_STATUS__READY && hbStatus > MND_CONSUMER_LOST_HB_CNT) {
      SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));

      pLostMsg->consumerId = pConsumer->consumerId;
      SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg));
      pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_LOST;
      pRpcMsg->pCont = pLostMsg;
      pRpcMsg->contLen = sizeof(SMqConsumerLostMsg);
      tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
    }
    if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) {
      // do nothing
    } else if (status == MQ_CONSUMER_STATUS__LOST) {
L
Liu Jicong 已提交
204
      taosRLockLatch(&pConsumer->lock);
205 206 207 208 209
      int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
      for (int32_t i = 0; i < topicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
210
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
211 212
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
213
      taosRUnLockLatch(&pConsumer->lock);
214
    } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
L
Liu Jicong 已提交
215
      taosRLockLatch(&pConsumer->lock);
216 217 218 219 220
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
221
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
222 223 224 225 226 227 228 229
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
230
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
231 232
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
233
      taosRUnLockLatch(&pConsumer->lock);
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
    } else {
      // do nothing
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_MQ_DO_REBALANCE,
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
L
Liu Jicong 已提交
252
    mTrace("mq rebalance finished, no modification");
L
Liu Jicong 已提交
253
    mndRebEnd();
254 255 256 257
  }
  return 0;
}

S
Shengliang Guan 已提交
258 259 260
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
  SMnode      *pMnode = pMsg->info.node;
  SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont;
L
Liu Jicong 已提交
261 262 263
  SMqAskEpRsp  rsp = {0};
  int64_t      consumerId = be64toh(pReq->consumerId);
  int32_t      epoch = ntohl(pReq->epoch);
264

S
Shengliang Guan 已提交
265
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->info.node, consumerId);
L
Liu Jicong 已提交
266
  if (pConsumer == NULL) {
267 268 269 270 271 272 273 274 275 276
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

  ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
  /*int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);*/
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
277

L
Liu Jicong 已提交
278 279 280 281 282 283 284 285 286
  if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
    SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg));
    pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER;
    pRpcMsg->pCont = pRecoverMsg;
    pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg);
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
287 288 289 290 291 292 293 294 295 296 297 298
  }

  if (status != MQ_CONSUMER_STATUS__READY) {
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

  // 2. check epoch, only send ep info when epoches do not match
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
299
    mInfo("process ask ep, consumer %ld(epoch %d), server epoch %d", consumerId, epoch, serverEpoch);
300 301 302 303 304
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
305
      taosRUnLockLatch(&pConsumer->lock);
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
      goto FAIL;
    }

    // handle all topic subscribed by the consumer
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);

      // txn guarantees pSub is created
      ASSERT(pSub);
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      ASSERT(pTopic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
325
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
326
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
327 328 329 330
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
331 332 333 334
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
335 336
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
337 338 339 340

      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
341
        taosRUnLockLatch(&pConsumer->lock);
342 343 344 345
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
346
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        // 2.2.2 fetch vg offset
        SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
        if (pOffsetObj != NULL) {
          vgEp.offset = atomic_load_64(&pOffsetObj->offset);
          mndReleaseOffset(pMnode, pOffsetObj);
        }
        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
  // encode rsp
L
Liu Jicong 已提交
372
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
373 374
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
375
    terrno = TSDB_CODE_OUT_OF_MEMORY;
376
    return -1;
L
Liu Jicong 已提交
377
  }
378 379 380
  ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  ((SMqRspHead *)buf)->epoch = serverEpoch;
  ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
S
Shengliang Guan 已提交
381

382
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
383
  tEncodeSMqAskEpRsp(&abuf, &rsp);
384 385

  // release consumer and free memory
L
Liu Jicong 已提交
386
  tDeleteSMqAskEpRsp(&rsp);
387 388 389
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
390 391
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
392 393
  return 0;
FAIL:
L
Liu Jicong 已提交
394
  tDeleteSMqAskEpRsp(&rsp);
395 396 397 398 399 400 401 402 403 404 405 406
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
407 408 409
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
  char           *msgStr = pMsg->pCont;
410 411 412 413 414 415 416 417 418 419 420 421 422
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
  int64_t         consumerId = subscribe.consumerId;
  char           *cgroup = subscribe.cgroup;
  SMqConsumerObj *pConsumerOld = NULL;
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
  SArray *newSub = subscribe.topicNames;
  taosArraySortString(newSub, taosArrayCompareString);

  int32_t newTopicNum = taosArrayGetSize(newSub);
  // check topic existance
423
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
L
Liu Jicong 已提交
424 425
  if (pTrans == NULL) goto SUBSCRIBE_OVER;

426 427 428 429 430 431 432
  for (int32_t i = 0; i < newTopicNum; i++) {
    char        *topic = taosArrayGetP(newSub, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
    if (pTopic == NULL) {
      terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
      goto SUBSCRIBE_OVER;
    }
L
Liu Jicong 已提交
433

434 435 436 437
    if (mndCheckDbPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
      goto SUBSCRIBE_OVER;
    }

L
Liu Jicong 已提交
438
#if 0
L
Liu Jicong 已提交
439 440 441 442 443
    // ref topic to prevent drop
    // TODO make topic complete
    SMqTopicObj topicObj = {0};
    memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
    topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
L
Liu Jicong 已提交
444 445
    mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup,
          topicObj.refConsumerCnt);
446
    if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
L
Liu Jicong 已提交
447
#endif
L
Liu Jicong 已提交
448

449 450 451 452 453
    mndReleaseTopic(pMnode, pTopic);
  }

  pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
  if (pConsumerOld == NULL) {
L
Liu Jicong 已提交
454 455
    mInfo("receive subscribe request from new consumer: %ld", consumerId);

456
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
L
Liu Jicong 已提交
457
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
458 459 460 461
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
    pConsumerNew->rebNewTopics = newSub;
    subscribe.topicNames = NULL;

L
Liu Jicong 已提交
462 463 464 465 466
    for (int32_t i = 0; i < newTopicNum; i++) {
      char *newTopicCopy = strdup(taosArrayGetP(newSub, i));
      taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
    }

467 468 469 470
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;

  } else {
L
fix  
Liu Jicong 已提交
471
    /*taosRLockLatch(&pConsumerOld->lock);*/
L
Liu Jicong 已提交
472

473
    int32_t status = atomic_load_32(&pConsumerOld->status);
L
Liu Jicong 已提交
474 475 476 477

    mInfo("receive subscribe request from old consumer: %ld, current status: %s", consumerId,
          mndConsumerStatusName(status));

478 479 480 481 482 483 484 485 486 487 488 489
    if (status != MQ_CONSUMER_STATUS__READY) {
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
      goto SUBSCRIBE_OVER;
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto SUBSCRIBE_OVER;
    }
    pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;

L
Liu Jicong 已提交
490 491 492 493 494
    for (int32_t i = 0; i < newTopicNum; i++) {
      char *newTopicCopy = strdup(taosArrayGetP(newSub, i));
      taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
    }

495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
    int32_t oldTopicNum = 0;
    if (pConsumerOld->currentTopics) {
      oldTopicNum = taosArrayGetSize(pConsumerOld->currentTopics);
    }

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
        char *newTopicCopy = strdup(taosArrayGetP(newSub, j));
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
        char *oldTopicCopy = strdup(taosArrayGetP(pConsumerOld->currentTopics, i));
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
        char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
        char *newTopic = taosArrayGetP(newSub, j);
        int   comp = compareLenPrefixedStr(oldTopic, newTopic);
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
          char *oldTopicCopy = strdup(oldTopic);
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
          char *newTopicCopy = strdup(newTopic);
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

534 535 536 537 538 539 540 541
    if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
        taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
      /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
      /*pConsumerNew->updateType = */
      /*}*/
      goto SUBSCRIBE_OVER;
    }

542 543 544 545
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
  }

S
Shengliang Guan 已提交
546
  code = TSDB_CODE_ACTION_IN_PROGRESS;
547 548

SUBSCRIBE_OVER:
L
Liu Jicong 已提交
549 550
  mndTransDrop(pTrans);

551
  if (pConsumerOld) {
L
fix  
Liu Jicong 已提交
552
    /*taosRUnLockLatch(&pConsumerOld->lock);*/
553 554 555 556
    mndReleaseConsumer(pMnode, pConsumerOld);
  }
  if (pConsumerNew) {
    tDeleteSMqConsumerObj(pConsumerNew);
L
Liu Jicong 已提交
557
    taosMemoryFree(pConsumerNew);
558 559 560 561
  }
  // TODO: replace with destroy subscribe msg
  if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
  return code;
L
Liu Jicong 已提交
562 563
}

L
Liu Jicong 已提交
564 565
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
566 567

  void   *buf = NULL;
L
Liu Jicong 已提交
568
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
569 570 571
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
572
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
573

wafwerar's avatar
wafwerar 已提交
574
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
575 576
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
577
  void *abuf = buf;
L
Liu Jicong 已提交
578 579
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
580
  int32_t dataPos = 0;
L
Liu Jicong 已提交
581 582
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
583 584
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
585

L
Liu Jicong 已提交
586 587
  terrno = TSDB_CODE_SUCCESS;

588
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
589
  taosMemoryFreeClear(buf);
590
  if (terrno != 0) {
591
    mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
592 593 594 595
    sdbFreeRaw(pRaw);
    return NULL;
  }

596
  mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
597 598 599
  return pRaw;
}

L
Liu Jicong 已提交
600
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
601
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
602
  void *buf = NULL;
603

L
Liu Jicong 已提交
604
  int8_t sver = 0;
L
Liu Jicong 已提交
605
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
L
Liu Jicong 已提交
606 607 608

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
609
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
610 611
  }

L
Liu Jicong 已提交
612
  SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
L
Liu Jicong 已提交
613
  if (pRow == NULL) goto CM_DECODE_OVER;
614

L
Liu Jicong 已提交
615
  SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
L
Liu Jicong 已提交
616
  if (pConsumer == NULL) goto CM_DECODE_OVER;
L
Liu Jicong 已提交
617 618

  int32_t dataPos = 0;
L
Liu Jicong 已提交
619
  int32_t len;
L
Liu Jicong 已提交
620
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
621
  buf = taosMemoryMalloc(len);
L
Liu Jicong 已提交
622 623 624
  if (buf == NULL) goto CM_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
625

L
Liu Jicong 已提交
626 627 628
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
629 630 631

  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
632
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
633
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
634
  if (terrno != TSDB_CODE_SUCCESS) {
635
    mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
636
    taosMemoryFreeClear(pRow);
637
    return NULL;
L
Liu Jicong 已提交
638
  }
L
Liu Jicong 已提交
639 640 641 642

  return pRow;
}

L
Liu Jicong 已提交
643
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
644
  mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId);
L
Liu Jicong 已提交
645
  pConsumer->subscribeTime = pConsumer->upTime;
L
Liu Jicong 已提交
646 647 648
  return 0;
}

L
Liu Jicong 已提交
649
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
650
  mTrace("consumer:%" PRId64 ", perform delete action", pConsumer->consumerId);
651
  tDeleteSMqConsumerObj(pConsumer);
L
Liu Jicong 已提交
652 653 654
  return 0;
}

L
Liu Jicong 已提交
655
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
656
  mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
L
Liu Jicong 已提交
657

658 659 660 661 662 663
  taosWLockLatch(&pOldConsumer->lock);

  if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
    ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
    ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);

664 665 666 667 668 669 670 671 672 673
    if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
      pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
    } else {
      SArray *tmp = pOldConsumer->rebNewTopics;
      pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
      pNewConsumer->rebNewTopics = tmp;

      tmp = pOldConsumer->rebRemovedTopics;
      pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
      pNewConsumer->rebRemovedTopics = tmp;
674

675 676 677
      tmp = pOldConsumer->assignedTopics;
      pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
      pNewConsumer->assignedTopics = tmp;
L
Liu Jicong 已提交
678

679
      pOldConsumer->subscribeTime = pNewConsumer->upTime;
L
Liu Jicong 已提交
680

681 682
      pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
    }
683
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
L
Liu Jicong 已提交
684 685 686
    ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
    ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);

687
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
L
Liu Jicong 已提交
688
    /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
689 690
    for (int32_t i = 0; i < sz; i++) {
      char *topic = strdup(taosArrayGetP(pOldConsumer->currentTopics, i));
L
Liu Jicong 已提交
691
      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
692
    }
L
Liu Jicong 已提交
693 694 695

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

696
    pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
L
Liu Jicong 已提交
697 698 699 700 701 702 703 704 705 706 707 708 709
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
    ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0);
    ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);

    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = strdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

    pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
710 711
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
712 713 714

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
    ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);
    ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);

    char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
    // not exist in current topic
#if 1
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
      ASSERT(strcmp(topic, addedTopic) != 0);
    }
#endif

    // remove from new topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
      if (strcmp(addedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->rebNewTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }

    // add to current topic
    taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
    taosArraySortString(pOldConsumer->currentTopics, taosArrayCompareString);
    // set status
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
      if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
          pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
      } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB ||
                 pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
      if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
          pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
      } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST ||
                 pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
759 760 761

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
    ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
    ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // not exist in new topic
#if 1
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
      char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
      ASSERT(strcmp(topic, removedTopic) != 0);
    }
#endif

    // remove from removed topic
    for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
L
fix  
Liu Jicong 已提交
778
      char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i);
779
      if (strcmp(removedTopic, topic) == 0) {
L
fix  
Liu Jicong 已提交
780
        taosArrayRemove(pOldConsumer->rebRemovedTopics, i);
781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
        taosMemoryFree(topic);
        break;
      }
    }

    // remove from current topic
    int32_t i = 0;
    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
    for (i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
      if (strcmp(removedTopic, topic) == 0) {
        taosArrayRemove(pOldConsumer->currentTopics, i);
        taosMemoryFree(topic);
        break;
      }
    }
    // must find the topic
    ASSERT(i < sz);

    // set status
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
      if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
          pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
      } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB ||
                 pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
      }
    } else {
      if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
          pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
      } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST ||
                 pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
        pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
      }
    }
L
Liu Jicong 已提交
818 819 820

    pOldConsumer->rebalanceTime = pNewConsumer->upTime;

821 822 823 824
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
825 826 827
  return 0;
}

L
Liu Jicong 已提交
828
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
829 830
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
831
  if (pConsumer == NULL) {
832
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
833 834 835 836
  }
  return pConsumer;
}

L
Liu Jicong 已提交
837
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
838 839 840
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
841

S
Shengliang Guan 已提交
842 843
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
844 845 846 847 848 849 850
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
    if (pShow->pIter == NULL) break;
L
Liu Jicong 已提交
851 852 853 854
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
855 856 857

    taosRLockLatch(&pConsumer->lock);

L
Liu Jicong 已提交
858 859 860 861 862 863 864
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
865 866 867 868
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
869 870 871 872 873 874 875 876
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);

L
Liu Jicong 已提交
877 878 879 880
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
      tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN);
      varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
L
Liu Jicong 已提交
881
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
882
      colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
883

884
      // client id
L
Liu Jicong 已提交
885 886 887
      char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
      tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN);
      varDataSetLen(clientId, strlen(varDataVal(clientId)));
L
Liu Jicong 已提交
888
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
L
Liu Jicong 已提交
889
      colDataAppend(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912

      // status
      char status[20 + VARSTR_HEADER_SIZE] = {0};
      tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
      varDataSetLen(status, strlen(varDataVal(status)));
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)status, false);

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
        tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN);
        varDataSetLen(topic, strlen(varDataVal(topic)));
        colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
      } else {
        colDataAppend(pColInfo, numOfRows, NULL, true);
      }

      // pid
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
L
Liu Jicong 已提交
913

L
Liu Jicong 已提交
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931
      // end point
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);

      numOfRows++;
    }
L
Liu Jicong 已提交
932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
    case MQ_CONSUMER_STATUS__READY:
      return "ready";
    case MQ_CONSUMER_STATUS__LOST:
    case MQ_CONSUMER_STATUS__LOST_REBD:
    case MQ_CONSUMER_STATUS__LOST_IN_REB:
      return "lost";
    case MQ_CONSUMER_STATUS__MODIFY:
    case MQ_CONSUMER_STATUS__MODIFY_IN_REB:
      return "rebalancing";
    default:
      return "unknown";
  }
}