mndConsumer.c 42.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   2
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

wmmhello's avatar
wmmhello 已提交
29
#define MND_MAX_GROUP_PER_TOPIC           100
H
Haojun Liao 已提交
30
#define MND_CONSUMER_LOST_HB_CNT          6
L
Liu Jicong 已提交
31
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
32

H
Haojun Liao 已提交
33
static int32_t mqRebInExecCnt = 0;
34

L
Liu Jicong 已提交
35 36
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
37 38
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
39
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
40
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
41
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
42
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
43

S
Shengliang Guan 已提交
44 45
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
46
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
47 48
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
49
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
50
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
51

L
Liu Jicong 已提交
52
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
53 54 55 56 57 58 59 60 61
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64 65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
66
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
wmmhello's avatar
wmmhello 已提交
67
//  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
69
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
70 71 72 73

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
74 75 76 77 78
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
  SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
  if (pClearMsg == NULL) {
    mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
    return;
  }

  pClearMsg->consumerId = consumerId;
  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};

  mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return;
}

L
Liu Jicong 已提交
95
bool mndRebTryStart() {
H
Haojun Liao 已提交
96
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
wmmhello's avatar
wmmhello 已提交
97
  mDebug("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
98 99 100
  return old == 0;
}

X
Xiaoyu Wang 已提交
101
void mndRebEnd() { mndRebCntDec(); }
L
Liu Jicong 已提交
102

S
Shengliang Guan 已提交
103 104
void mndRebCntInc() {
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
105
  mInfo("rebalance trans start, rebalance counter:%d", val);
S
Shengliang Guan 已提交
106
}
L
Liu Jicong 已提交
107

S
Shengliang Guan 已提交
108
void mndRebCntDec() {
109 110 111 112 113 114 115 116 117 118
  while (1) {
    int32_t val = atomic_load_32(&mqRebInExecCnt);
    if (val <= 0) {
      mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
      break;
    }

    int32_t newVal = val - 1;
    int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
    if (oldVal == val) {
wmmhello's avatar
wmmhello 已提交
119
      mDebug("rebalance trans end, rebalance counter:%d", newVal);
120 121 122
      break;
    }
  }
S
Shengliang Guan 已提交
123
}
L
Liu Jicong 已提交
124

wmmhello's avatar
wmmhello 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
//  SMnode             *pMnode = pMsg->info.node;
//  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
//  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
//  if (pConsumer == NULL) {
//    return 0;
//  }
//
//  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
//        mndConsumerStatusName(pConsumer->status));
//
//  if (pConsumer->status != MQ_CONSUMER_STATUS_READY) {
//    mndReleaseConsumer(pMnode, pConsumer);
//    return -1;
//  }
//
//  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
//
//  mndReleaseConsumer(pMnode, pConsumer);
//
//  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
//  if (pTrans == NULL) {
//    goto FAIL;
//  }
//
//  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
//    goto FAIL;
//  }
//
//  if (mndTransPrepare(pMnode, pTrans) != 0) {
//    goto FAIL;
//  }
//
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return 0;
//FAIL:
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return -1;
//}
167

S
Shengliang Guan 已提交
168 169 170
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
171
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
172 173 174 175
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
176

H
Haojun Liao 已提交
177 178
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
179

wmmhello's avatar
wmmhello 已提交
180
  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
L
Liu Jicong 已提交
181
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
182
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
183 184 185
    return -1;
  }

L
Liu Jicong 已提交
186
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
187
  pConsumerNew->updateType = CONSUMER_UPDATE_REC;
L
Liu Jicong 已提交
188 189 190

  mndReleaseConsumer(pMnode, pConsumer);

191
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
192 193 194 195
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
196 197 198
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
199 200
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
201 202 203
  mndTransDrop(pTrans);
  return 0;
FAIL:
wmmhello's avatar
wmmhello 已提交
204 205
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
206 207 208 209
  mndTransDrop(pTrans);
  return -1;
}

210
// todo check the clear process
L
Liu Jicong 已提交
211 212 213
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
H
Haojun Liao 已提交
214 215

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
L
Liu Jicong 已提交
216
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
217
    mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId);
L
Liu Jicong 已提交
218 219 220
    return 0;
  }

H
Haojun Liao 已提交
221
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
L
Liu Jicong 已提交
222 223
        mndConsumerStatusName(pConsumer->status));

wmmhello's avatar
wmmhello 已提交
224 225 226 227
//  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
//    mndReleaseConsumer(pMnode, pConsumer);
//    return -1;
//  }
L
Liu Jicong 已提交
228 229

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
wmmhello's avatar
wmmhello 已提交
230
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
L
Liu Jicong 已提交
231 232 233 234 235

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
H
Haojun Liao 已提交
236 237

  // this is the drop action, not the update action
L
Liu Jicong 已提交
238 239 240
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
241 242
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
243 244
  mndTransDrop(pTrans);
  return 0;
245

L
Liu Jicong 已提交
246
FAIL:
wmmhello's avatar
wmmhello 已提交
247 248
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
249 250 251 252
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
253
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
254 255 256 257
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
258 259 260
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
261 262 263
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
264
  }
L
Liu Jicong 已提交
265
  return pRebInfo;
266 267
}

X
Xiaoyu Wang 已提交
268 269
static void freeRebalanceItem(void *param) {
  SMqRebInfo *pInfo = param;
H
Haojun Liao 已提交
270 271 272 273
  taosArrayDestroy(pInfo->newConsumers);
  taosArrayDestroy(pInfo->removedConsumers);
}

S
Shengliang Guan 已提交
274 275
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
276 277 278 279
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

D
dapan1121 已提交
280
  mDebug("start to process mq timer");
281

282
  // rebalance cannot be parallel
L
Liu Jicong 已提交
283
  if (!mndRebTryStart()) {
284
    mInfo("mq rebalance already in progress, do nothing");
285 286 287 288
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
289
  if (pRebMsg == NULL) {
X
Xiaoyu Wang 已提交
290
    mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
291 292 293 294
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

295
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
H
Haojun Liao 已提交
296 297 298 299 300 301 302 303
  if (pRebMsg->rebSubHash == NULL) {
    mError("failed to create rebalance hashmap");
    rpcFreeCont(pRebMsg);
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
304 305 306 307

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
308 309 310
    if (pIter == NULL) {
      break;
    }
311 312 313

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
314

315
    mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
wmmhello's avatar
wmmhello 已提交
316
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
X
Xiaoyu Wang 已提交
317
           hbStatus);
L
Liu Jicong 已提交
318

wmmhello's avatar
wmmhello 已提交
319
    if (status == MQ_CONSUMER_STATUS_READY) {
wmmhello's avatar
wmmhello 已提交
320 321 322
      if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {   // unsubscribe or close
        mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
      } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
wmmhello's avatar
wmmhello 已提交
323 324 325 326 327 328 329 330
        taosRLockLatch(&pConsumer->lock);
        int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < topicNum; i++) {
          char  key[TSDB_SUBSCRIBE_KEY_LEN];
          char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
          mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
          SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
H
Haojun Liao 已提交
331
        }
wmmhello's avatar
wmmhello 已提交
332
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
333
      }
wmmhello's avatar
wmmhello 已提交
334
    } else if (status == MQ_CONSUMER_STATUS_LOST) {
wmmhello's avatar
wmmhello 已提交
335
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {   // clear consumer if lost a day
336
        mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
337
      }
338
    } else {  // MQ_CONSUMER_STATUS_REBALANCE
L
Liu Jicong 已提交
339
      taosRLockLatch(&pConsumer->lock);
340

341 342 343 344 345
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
346
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
347 348 349 350 351 352 353 354
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
355
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
356 357
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
358
      taosRUnLockLatch(&pConsumer->lock);
359 360 361 362 363 364
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
365
      mInfo("mq rebalance will be triggered");
366
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
367
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
368 369 370 371 372 373 374
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
375
    mDebug("mq timer finished, no need to re-balance");
L
Liu Jicong 已提交
376
    mndRebEnd();
377 378 379 380
  }
  return 0;
}

381
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
382
  int32_t code = 0;
L
Liu Jicong 已提交
383 384
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
385

386
  if ((code = tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req)) < 0) {
D
dapan1121 已提交
387
    terrno = TSDB_CODE_OUT_OF_MEMORY;
388
    goto end;
D
dapan1121 已提交
389 390
  }

L
Liu Jicong 已提交
391
  int64_t         consumerId = req.consumerId;
392
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
393
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
394
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
395
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
396 397
    code = -1;
    goto end;
L
Liu Jicong 已提交
398
  }
399 400 401 402 403

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

wmmhello's avatar
wmmhello 已提交
404
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
405
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
406 407 408
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
409
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
410
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
411 412 413 414
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
415 416
  }

417 418
  for(int i = 0; i < taosArrayGetSize(req.topics); i++){
    TopicOffsetRows* data = taosArrayGet(req.topics, i);
419
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
420 421

    SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
422 423 424
    if(pSub == NULL){
      continue;
    }
425
    taosWLockLatch(&pSub->lock);
426
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
427
    if(pConsumerEp){
428
      taosArrayDestroy(pConsumerEp->offsetRows);
429 430 431
      pConsumerEp->offsetRows = data->offsetRows;
      data->offsetRows = NULL;
    }
432
    taosWUnLockLatch(&pSub->lock);
433 434 435 436

    mndReleaseSubscribe(pMnode, pSub);
  }

437 438
  mndReleaseConsumer(pMnode, pConsumer);

439 440 441
end:
  tDeatroySMqHbReq(&req);
  return code;
442 443
}

S
Shengliang Guan 已提交
444
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
445 446 447
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
448 449 450 451 452 453

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
454 455
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
456

457
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
458
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
459
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
460 461 462 463
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
464 465 466 467 468
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
wmmhello's avatar
wmmhello 已提交
469
    goto FAIL;
H
Haojun Liao 已提交
470
  }
471

472 473 474 475
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
476

wmmhello's avatar
wmmhello 已提交
477
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
478
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
479 480 481
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
482
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
483
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
484 485 486
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
487

488
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
489 490
  }

wmmhello's avatar
wmmhello 已提交
491
  if (status != MQ_CONSUMER_STATUS_READY) {
X
Xiaoyu Wang 已提交
492
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
493
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
wmmhello's avatar
wmmhello 已提交
494
    goto FAIL;
495 496 497 498
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

499
  // 2. check epoch, only send ep info when epochs do not match
500 501
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
502 503
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
504 505 506 507 508
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
509
      taosRUnLockLatch(&pConsumer->lock);
510 511 512
      goto FAIL;
    }

H
Haojun Liao 已提交
513
    // handle all topics subscribed by this consumer
514 515 516 517
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
wmmhello's avatar
wmmhello 已提交
518
      if(pSub == NULL) continue;
519 520 521 522 523 524 525
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
wmmhello's avatar
wmmhello 已提交
526 527 528 529 530
      if(pTopic == NULL) {
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
        continue;
      }
531
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
532
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
533
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
534 535 536 537
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
538 539 540 541
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
542 543
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
544

H
Haojun Liao 已提交
545
      // this customer assigned vgroups
546 547 548
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
549
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
550 551
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
552 553 554 555
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
556
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
575

576
  // encode rsp
L
Liu Jicong 已提交
577
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
578 579
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
580
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
581
    goto FAIL;
L
Liu Jicong 已提交
582
  }
H
Haojun Liao 已提交
583

584 585 586 587 588 589 590 591
  SMqRspHead* pHead = buf;

  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  pHead->epoch = serverEpoch;
  pHead->consumerId = pConsumer->consumerId;
  pHead->walsver = 0;
  pHead->walever = 0;

S
Shengliang Guan 已提交
592

593
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
594
  tEncodeSMqAskEpRsp(&abuf, &rsp);
595 596

  // release consumer and free memory
L
Liu Jicong 已提交
597
  tDeleteSMqAskEpRsp(&rsp);
598 599 600
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
601 602
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
603
  return 0;
H
Haojun Liao 已提交
604

605
FAIL:
L
Liu Jicong 已提交
606
  tDeleteSMqAskEpRsp(&rsp);
607 608 609 610
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
611 612 613 614 615 616 617 618
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

619 620 621 622 623 624 625 626
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

X
Xiaoyu Wang 已提交
627
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

X
Xiaoyu Wang 已提交
648
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
649

X
Xiaoyu Wang 已提交
650 651
static void freeItem(void *param) {
  void *pItem = *(void **)param;
652 653 654
  if (pItem != NULL) {
    taosMemoryFree(pItem);
  }
655 656
}

657 658 659 660
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

661 662
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
663

664
  int64_t        consumerId = subscribe.consumerId;
665
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
666
  SMqConsumerObj *pExistedConsumer = NULL;
667
  SMqConsumerObj *pConsumerNew = NULL;
wmmhello's avatar
wmmhello 已提交
668
  STrans         *pTrans       = NULL;
669 670

  int32_t code = -1;
671 672
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
673
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
674

675
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
wmmhello's avatar
wmmhello 已提交
676
  for(int i = 0; i < newTopicNum; i++){
wmmhello's avatar
wmmhello 已提交
677
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char*)taosArrayGetP(pTopicList, i));
wmmhello's avatar
wmmhello 已提交
678
    if(gNum >= MND_MAX_GROUP_PER_TOPIC){
wmmhello's avatar
wmmhello 已提交
679 680 681 682 683
      terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
      code = terrno;
      goto _over;
    }
  }
684

H
Haojun Liao 已提交
685
  // check topic existence
wmmhello's avatar
wmmhello 已提交
686
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
687 688 689
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
690

691 692 693
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
694 695
  }

H
Haojun Liao 已提交
696 697
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
X
Xiaoyu Wang 已提交
698 699
    mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
          subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
L
Liu Jicong 已提交
700

701
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
702
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
703

704 705 706 707 708
    pConsumerNew->withTbName = subscribe.withTbName;
    pConsumerNew->autoCommit = subscribe.autoCommit;
    pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
    pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;

709
//  pConsumerNew->updateType = CONSUMER_UPDATE_SUB;   // use insert logic
H
Haojun Liao 已提交
710
    taosArrayDestroy(pConsumerNew->assignedTopics);
711 712 713
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
714
    taosArrayDestroy(pConsumerNew->rebNewTopics);
715
    pConsumerNew->rebNewTopics = pTopicList;
716 717
    subscribe.topicNames = NULL;

718 719
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
720 721

  } else {
H
Haojun Liao 已提交
722
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
723

X
Xiaoyu Wang 已提交
724 725 726
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
          " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
727

wmmhello's avatar
wmmhello 已提交
728
    if (status != MQ_CONSUMER_STATUS_READY) {
729
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
730
      goto _over;
731 732 733 734
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
735
      goto _over;
736
    }
H
Haojun Liao 已提交
737

738
    // set the update type
739
    pConsumerNew->updateType = CONSUMER_UPDATE_SUB;
H
Haojun Liao 已提交
740
    taosArrayDestroy(pConsumerNew->assignedTopics);
741
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
742

wmmhello's avatar
wmmhello 已提交
743
    int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
744 745 746 747

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
748
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
749 750 751 752
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
753
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
754 755 756 757
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
758
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
759
        char *newTopic = taosArrayGetP(pTopicList, j);
760
        int   comp = strcmp(oldTopic, newTopic);
761 762 763 764 765
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
766
          char *oldTopicCopy = taosStrdup(oldTopic);
767 768 769 770
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
771
          char *newTopicCopy = taosStrdup(newTopic);
772 773 774 775 776 777 778
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

779 780
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
781
      goto _over;
782 783
    }

784 785
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
786 787
  }

S
Shengliang Guan 已提交
788
  code = TSDB_CODE_ACTION_IN_PROGRESS;
789

790
_over:
L
Liu Jicong 已提交
791 792
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
793 794 795
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
796
  }
H
Haojun Liao 已提交
797

wmmhello's avatar
wmmhello 已提交
798
  tDeleteSMqConsumerObj(pConsumerNew, true);
799

800
  // TODO: replace with destroy subscribe msg
801
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
802
  return code;
L
Liu Jicong 已提交
803 804
}

L
Liu Jicong 已提交
805 806
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
807 808

  void   *buf = NULL;
L
Liu Jicong 已提交
809
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
810 811 812
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
813
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
814

wafwerar's avatar
wafwerar 已提交
815
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
816 817
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
818
  void *abuf = buf;
L
Liu Jicong 已提交
819 820
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
821
  int32_t dataPos = 0;
L
Liu Jicong 已提交
822 823
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
824 825
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
826

L
Liu Jicong 已提交
827 828
  terrno = TSDB_CODE_SUCCESS;

829
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
830
  taosMemoryFreeClear(buf);
831
  if (terrno != 0) {
832
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
833 834 835 836
    sdbFreeRaw(pRaw);
    return NULL;
  }

837
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
838 839 840
  return pRaw;
}

L
Liu Jicong 已提交
841
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
842 843 844
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
845

H
Haojun Liao 已提交
846
  terrno = 0;
L
Liu Jicong 已提交
847
  int8_t sver = 0;
H
Haojun Liao 已提交
848 849 850
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
851

852
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
L
Liu Jicong 已提交
853
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
854
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
855 856
  }

857
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
858 859 860
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
861

862
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
863 864 865
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
866 867

  int32_t dataPos = 0;
L
Liu Jicong 已提交
868
  int32_t len;
L
Liu Jicong 已提交
869
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
870
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
871 872 873 874 875
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
876 877
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
878

879
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
H
Haojun Liao 已提交
880
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
881 882
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
883

H
Haojun Liao 已提交
884
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
885

L
Liu Jicong 已提交
886
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
887
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
888
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
889 890
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
891
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
892
  }
L
Liu Jicong 已提交
893 894 895 896

  return pRow;
}

L
Liu Jicong 已提交
897
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
898
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
H
Haojun Liao 已提交
899 900
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
901
  pConsumer->subscribeTime = pConsumer->createTime;
L
Liu Jicong 已提交
902 903 904
  return 0;
}

L
Liu Jicong 已提交
905
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
906
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
X
Xiaoyu Wang 已提交
907
         mndConsumerStatusName(pConsumer->status));
wmmhello's avatar
wmmhello 已提交
908
  tDeleteSMqConsumerObj(pConsumer, false);
L
Liu Jicong 已提交
909 910 911
  return 0;
}

X
Xiaoyu Wang 已提交
912
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
913 914 915
  int32_t status = pConsumer->status;

  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
916
    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
wmmhello's avatar
wmmhello 已提交
917 918 919
      pConsumer->status = MQ_CONSUMER_STATUS_READY;
    } else if (status == MQ_CONSUMER_STATUS_READY) {
      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
920 921 922 923 924
    }
  }
}

// remove from new topic
X
Xiaoyu Wang 已提交
925
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
926
  int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
927
  for (int32_t i = 0; i < size; i++) {
928 929 930 931 932
    char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebNewTopics, i);
      taosMemoryFree(p);

wmmhello's avatar
wmmhello 已提交
933
      mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
X
Xiaoyu Wang 已提交
934
             pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
935 936 937 938 939 940
      break;
    }
  }
}

// remove from removed topic
X
Xiaoyu Wang 已提交
941
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
942 943 944 945 946 947
  int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
  for (int32_t i = 0; i < size; i++) {
    char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebRemovedTopics, i);
      taosMemoryFree(p);
948

wmmhello's avatar
wmmhello 已提交
949
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
950
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
951 952 953 954 955
      break;
    }
  }
}

956 957 958 959 960 961 962 963
static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
  int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < sz; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
    if (strcmp(pTopic, topic) == 0) {
      taosArrayRemove(pConsumer->currentTopics, i);
      taosMemoryFree(topic);

wmmhello's avatar
wmmhello 已提交
964
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
965 966 967 968 969 970 971 972 973 974 975 976 977 978
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
      break;
    }
  }
}

static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
  bool    existing = false;
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < size; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);

    if (strcmp(topic, pTopic) == 0) {
      existing = true;
979 980 981
      break;
    }
  }
982 983

  return existing;
984 985
}

L
Liu Jicong 已提交
986
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
wmmhello's avatar
wmmhello 已提交
987 988
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
L
Liu Jicong 已提交
989

990 991
  taosWLockLatch(&pOldConsumer->lock);

992
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
993 994 995
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
996

wmmhello's avatar
wmmhello 已提交
997
    pOldConsumer->subscribeTime = taosGetTimestampMs();
998
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
999
    mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer",pOldConsumer->consumerId);
wmmhello's avatar
wmmhello 已提交
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
//  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
//    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
//    for (int32_t i = 0; i < sz; i++) {
//      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
//      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
//    }
//
//    int32_t prevStatus = pOldConsumer->status;
//    pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
//    mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
//           pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
//           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1012
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
L
Liu Jicong 已提交
1013 1014
    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
1015
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
1016 1017 1018
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

1019
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
1020
    mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId);
1021
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
1022
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
1023

wmmhello's avatar
wmmhello 已提交
1024 1025
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
    mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId);
1026
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
1027
    char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
1028

1029
    // check if exist in current topic
1030
    removeFromNewTopicList(pOldConsumer, pNewTopic);
1031 1032

    // add to current topic
1033 1034
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
    if (existing) {
wmmhello's avatar
wmmhello 已提交
1035
      mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
1036 1037
      taosMemoryFree(pNewTopic);
    } else {  // added into current topic list
1038
      taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
L
Liu Jicong 已提交
1039 1040
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
    }
H
Haojun Liao 已提交
1041

1042
    // set status
H
Haojun Liao 已提交
1043
    int32_t status = pOldConsumer->status;
1044
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1045

H
Haojun Liao 已提交
1046
    // the re-balance is triggered when the new consumer is launched.
wmmhello's avatar
wmmhello 已提交
1047
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1048

1049
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
wmmhello's avatar
wmmhello 已提交
1050
    mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
1051
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1052
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1053 1054 1055 1056
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));

1057
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
1058 1059 1060
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // remove from removed topic
1061
    removeFromRemoveTopicList(pOldConsumer, removedTopic);
1062 1063

    // remove from current topic
1064
    removeFromCurrentTopicList(pOldConsumer, removedTopic);
1065 1066

    // set status
H
Haojun Liao 已提交
1067
    int32_t status = pOldConsumer->status;
1068
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1069

wmmhello's avatar
wmmhello 已提交
1070
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
1071
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1072

wmmhello's avatar
wmmhello 已提交
1073
    mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
X
Xiaoyu Wang 已提交
1074
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1075
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1076 1077 1078
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1079 1080 1081
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1082 1083 1084
  return 0;
}

L
Liu Jicong 已提交
1085
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1086 1087
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1088
  if (pConsumer == NULL) {
1089
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1090 1091 1092 1093
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1094
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1095 1096 1097
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1098

S
Shengliang Guan 已提交
1099 1100
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1101 1102 1103 1104 1105 1106
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1107 1108 1109 1110
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1111
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
1112
      mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1113 1114 1115
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1116 1117

    taosRLockLatch(&pConsumer->lock);
1118
    mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1119

L
Liu Jicong 已提交
1120 1121 1122 1123 1124 1125 1126
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1127 1128 1129 1130
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1131 1132 1133 1134 1135
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
wmmhello's avatar
wmmhello 已提交
1136 1137 1138 1139
      char        consumerIdHex[32] = {0};
      sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumer->consumerId);
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));

L
Liu Jicong 已提交
1140
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1141
      colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false);
L
Liu Jicong 已提交
1142

L
Liu Jicong 已提交
1143 1144
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1145 1146
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1147
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1148
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1149

1150
      // client id
L
Liu Jicong 已提交
1151
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1152 1153
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1154
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1155
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1156 1157

      // status
X
Xiaoyu Wang 已提交
1158 1159
      char        status[20 + VARSTR_HEADER_SIZE] = {0};
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
1160 1161
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1162
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1163
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1164 1165 1166 1167 1168 1169

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1170
        STR_TO_VARSTR(topic, topicName);
1171
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1172
      } else {
1173
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1174 1175 1176
      }

      // end point
L
Liu Jicong 已提交
1177
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1178
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1179 1180 1181

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1182
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false);
L
Liu Jicong 已提交
1183 1184 1185

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1186
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1187 1188 1189

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1190
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1191

1192
      char buf[TSDB_OFFSET_LEN] = {0};
1193
      STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
1194 1195 1196
      tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);

      char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
wmmhello's avatar
wmmhello 已提交
1197
      sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
1198
      varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
1199

1200
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1201
      colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false);
1202

L
Liu Jicong 已提交
1203 1204
      numOfRows++;
    }
1205

L
Liu Jicong 已提交
1206 1207
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1208 1209

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
wmmhello's avatar
wmmhello 已提交
1223
    case MQ_CONSUMER_STATUS_READY:
L
Liu Jicong 已提交
1224
      return "ready";
wmmhello's avatar
wmmhello 已提交
1225
    case MQ_CONSUMER_STATUS_LOST:
L
Liu Jicong 已提交
1226
      return "lost";
1227
    case MQ_CONSUMER_STATUS_REBALANCE:
L
Liu Jicong 已提交
1228 1229 1230 1231 1232
      return "rebalancing";
    default:
      return "unknown";
  }
}