mndConsumer.c 42.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   2
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

wmmhello's avatar
wmmhello 已提交
29
#define MND_MAX_GROUP_PER_TOPIC           100
H
Haojun Liao 已提交
30
#define MND_CONSUMER_LOST_HB_CNT          6
L
Liu Jicong 已提交
31
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
32

H
Haojun Liao 已提交
33
static int32_t mqRebInExecCnt = 0;
34

L
Liu Jicong 已提交
35 36
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
37 38
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
39
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
40
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
41
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
42
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
43

S
Shengliang Guan 已提交
44 45
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
46
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
47 48
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
49
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
50
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
51

L
Liu Jicong 已提交
52
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
53 54 55 56 57 58 59 60 61
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64 65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
66
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
wmmhello's avatar
wmmhello 已提交
67
//  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
69
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
70 71 72 73

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
74 75 76 77 78
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
  SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
  if (pClearMsg == NULL) {
    mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
    return;
  }

  pClearMsg->consumerId = consumerId;
  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};

  mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return;
}

L
Liu Jicong 已提交
95
bool mndRebTryStart() {
H
Haojun Liao 已提交
96
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
C
cadem 已提交
97
  mDebug("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
98 99 100
  return old == 0;
}

X
Xiaoyu Wang 已提交
101
void mndRebEnd() { mndRebCntDec(); }
L
Liu Jicong 已提交
102

S
Shengliang Guan 已提交
103 104
void mndRebCntInc() {
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
105
  mInfo("rebalance trans start, rebalance counter:%d", val);
S
Shengliang Guan 已提交
106
}
L
Liu Jicong 已提交
107

S
Shengliang Guan 已提交
108
void mndRebCntDec() {
109 110 111 112 113 114 115 116 117 118
  while (1) {
    int32_t val = atomic_load_32(&mqRebInExecCnt);
    if (val <= 0) {
      mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
      break;
    }

    int32_t newVal = val - 1;
    int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
    if (oldVal == val) {
C
cadem 已提交
119
      mDebug("rebalance trans end, rebalance counter:%d", newVal);
120 121 122
      break;
    }
  }
S
Shengliang Guan 已提交
123
}
L
Liu Jicong 已提交
124

wmmhello's avatar
wmmhello 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
//  SMnode             *pMnode = pMsg->info.node;
//  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
//  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
//  if (pConsumer == NULL) {
//    return 0;
//  }
//
//  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
//        mndConsumerStatusName(pConsumer->status));
//
//  if (pConsumer->status != MQ_CONSUMER_STATUS_READY) {
//    mndReleaseConsumer(pMnode, pConsumer);
//    return -1;
//  }
//
//  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
//
//  mndReleaseConsumer(pMnode, pConsumer);
//
//  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
//  if (pTrans == NULL) {
//    goto FAIL;
//  }
//
//  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
//    goto FAIL;
//  }
//
//  if (mndTransPrepare(pMnode, pTrans) != 0) {
//    goto FAIL;
//  }
//
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return 0;
//FAIL:
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return -1;
//}
167

S
Shengliang Guan 已提交
168 169 170
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
171
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
172 173 174 175
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
176

H
Haojun Liao 已提交
177 178
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
179

wmmhello's avatar
wmmhello 已提交
180
  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
L
Liu Jicong 已提交
181
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
182
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
183 184 185
    return -1;
  }

L
Liu Jicong 已提交
186
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
wmmhello's avatar
wmmhello 已提交
187
  pConsumerNew->updateType = CONSUMER_UPDATE_RECOVER;
L
Liu Jicong 已提交
188 189 190

  mndReleaseConsumer(pMnode, pConsumer);

191
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
192 193 194 195
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
196 197 198
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
199 200
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
201 202 203
  mndTransDrop(pTrans);
  return 0;
FAIL:
wmmhello's avatar
wmmhello 已提交
204 205
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
206 207 208 209
  mndTransDrop(pTrans);
  return -1;
}

210
// todo check the clear process
L
Liu Jicong 已提交
211 212 213
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
H
Haojun Liao 已提交
214 215

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
L
Liu Jicong 已提交
216
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
217
    mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId);
L
Liu Jicong 已提交
218 219 220
    return 0;
  }

H
Haojun Liao 已提交
221
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
L
Liu Jicong 已提交
222 223
        mndConsumerStatusName(pConsumer->status));

wmmhello's avatar
wmmhello 已提交
224 225 226 227
//  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
//    mndReleaseConsumer(pMnode, pConsumer);
//    return -1;
//  }
L
Liu Jicong 已提交
228 229

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
wmmhello's avatar
wmmhello 已提交
230
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
L
Liu Jicong 已提交
231 232 233 234 235

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
H
Haojun Liao 已提交
236 237

  // this is the drop action, not the update action
L
Liu Jicong 已提交
238 239 240
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
241 242
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
243 244
  mndTransDrop(pTrans);
  return 0;
245

L
Liu Jicong 已提交
246
FAIL:
wmmhello's avatar
wmmhello 已提交
247 248
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
249 250 251 252
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
253
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
254 255 256 257
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
258 259 260
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
261 262 263
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
264
  }
L
Liu Jicong 已提交
265
  return pRebInfo;
266 267
}

X
Xiaoyu Wang 已提交
268 269
static void freeRebalanceItem(void *param) {
  SMqRebInfo *pInfo = param;
H
Haojun Liao 已提交
270 271 272 273
  taosArrayDestroy(pInfo->newConsumers);
  taosArrayDestroy(pInfo->removedConsumers);
}

S
Shengliang Guan 已提交
274 275
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
276 277 278 279
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

D
dapan1121 已提交
280
  mDebug("start to process mq timer");
281

282
  // rebalance cannot be parallel
L
Liu Jicong 已提交
283
  if (!mndRebTryStart()) {
D
dapan1121 已提交
284
    mDebug("mq rebalance already in progress, do nothing");
285 286 287 288
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
289
  if (pRebMsg == NULL) {
X
Xiaoyu Wang 已提交
290
    mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
291 292 293 294
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

295
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
H
Haojun Liao 已提交
296 297 298 299 300 301 302 303
  if (pRebMsg->rebSubHash == NULL) {
    mError("failed to create rebalance hashmap");
    rpcFreeCont(pRebMsg);
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
304 305 306 307

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
308 309 310
    if (pIter == NULL) {
      break;
    }
311 312 313

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
314

wmmhello's avatar
wmmhello 已提交
315 316
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
X
Xiaoyu Wang 已提交
317
           hbStatus);
L
Liu Jicong 已提交
318

wmmhello's avatar
wmmhello 已提交
319
    if (status == MQ_CONSUMER_STATUS_READY) {
wmmhello's avatar
wmmhello 已提交
320 321 322
      if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {   // unsubscribe or close
        mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
      } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
wmmhello's avatar
wmmhello 已提交
323 324 325 326 327 328 329 330
        taosRLockLatch(&pConsumer->lock);
        int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < topicNum; i++) {
          char  key[TSDB_SUBSCRIBE_KEY_LEN];
          char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
          mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
          SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
H
Haojun Liao 已提交
331
        }
wmmhello's avatar
wmmhello 已提交
332
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
333
      }
wmmhello's avatar
wmmhello 已提交
334
    } else if (status == MQ_CONSUMER_STATUS_LOST) {
wmmhello's avatar
wmmhello 已提交
335
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {   // clear consumer if lost a day
336
        mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
337
      }
338
    } else {  // MQ_CONSUMER_STATUS_REBALANCE
L
Liu Jicong 已提交
339
      taosRLockLatch(&pConsumer->lock);
340

341 342 343 344 345
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
346
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
347 348 349 350 351 352 353 354
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
355
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
356 357
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
358
      taosRUnLockLatch(&pConsumer->lock);
359 360 361 362 363 364 365 366
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
367
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
368 369 370 371 372 373 374
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
375
    mDebug("mq timer finished, no need to re-balance");
L
Liu Jicong 已提交
376
    mndRebEnd();
377 378 379 380
  }
  return 0;
}

381
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
382
  int32_t code = 0;
L
Liu Jicong 已提交
383 384
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
385

386
  if ((code = tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req)) < 0) {
D
dapan1121 已提交
387
    terrno = TSDB_CODE_OUT_OF_MEMORY;
388
    goto end;
D
dapan1121 已提交
389 390
  }

L
Liu Jicong 已提交
391
  int64_t         consumerId = req.consumerId;
392
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
393
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
394
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
395
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
396 397
    code = -1;
    goto end;
L
Liu Jicong 已提交
398
  }
399 400 401 402 403

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

wmmhello's avatar
wmmhello 已提交
404
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
405
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
406 407 408
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
409
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
410
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
411 412 413 414
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
415 416
  }

417 418 419 420 421
  for(int i = 0; i < taosArrayGetSize(req.topics); i++){
    TopicOffsetRows* data = taosArrayGet(req.topics, i);
    mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);

    SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
422 423 424
    if(pSub == NULL){
      continue;
    }
425
    taosWLockLatch(&pSub->lock);
426
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
427
    if(pConsumerEp){
428
      taosArrayDestroy(pConsumerEp->offsetRows);
429 430 431
      pConsumerEp->offsetRows = data->offsetRows;
      data->offsetRows = NULL;
    }
432
    taosWUnLockLatch(&pSub->lock);
433 434 435 436

    mndReleaseSubscribe(pMnode, pSub);
  }

437 438
  mndReleaseConsumer(pMnode, pConsumer);

439 440 441
end:
  tDeatroySMqHbReq(&req);
  return code;
442 443
}

S
Shengliang Guan 已提交
444
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
445 446 447
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
448 449 450 451 452 453

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
454 455
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
456

457
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
458
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
459
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
460 461 462 463
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
464 465 466 467 468
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
wmmhello's avatar
wmmhello 已提交
469
    goto FAIL;
H
Haojun Liao 已提交
470
  }
471

472 473 474 475
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
476

wmmhello's avatar
wmmhello 已提交
477
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
478
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
479 480 481
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
482
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
483
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
484 485 486
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
487

488
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
489 490
  }

wmmhello's avatar
wmmhello 已提交
491
  if (status != MQ_CONSUMER_STATUS_READY) {
X
Xiaoyu Wang 已提交
492
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
493
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
wmmhello's avatar
wmmhello 已提交
494
    goto FAIL;
495 496 497 498
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

499
  // 2. check epoch, only send ep info when epochs do not match
500 501
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
502 503
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
504 505 506 507 508
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
509
      taosRUnLockLatch(&pConsumer->lock);
510 511 512
      goto FAIL;
    }

H
Haojun Liao 已提交
513
    // handle all topics subscribed by this consumer
514 515 516 517
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
518

519 520 521 522 523 524 525 526
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
527
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
528
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
529 530 531 532
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
533 534 535 536
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
537 538
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
539

H
Haojun Liao 已提交
540
      // this customer assigned vgroups
541 542 543
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
544
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
545 546
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
547 548 549 550
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
551
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
570

571
  // encode rsp
L
Liu Jicong 已提交
572
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
573 574
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
575
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
576
    goto FAIL;
L
Liu Jicong 已提交
577
  }
H
Haojun Liao 已提交
578

579 580 581 582 583 584 585 586
  SMqRspHead* pHead = buf;

  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  pHead->epoch = serverEpoch;
  pHead->consumerId = pConsumer->consumerId;
  pHead->walsver = 0;
  pHead->walever = 0;

S
Shengliang Guan 已提交
587

588
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
589
  tEncodeSMqAskEpRsp(&abuf, &rsp);
590 591

  // release consumer and free memory
L
Liu Jicong 已提交
592
  tDeleteSMqAskEpRsp(&rsp);
593 594 595
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
596 597
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
598
  return 0;
H
Haojun Liao 已提交
599

600
FAIL:
L
Liu Jicong 已提交
601
  tDeleteSMqAskEpRsp(&rsp);
602 603 604 605
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
606 607 608 609 610 611 612 613
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

614 615 616 617 618 619 620 621
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

X
Xiaoyu Wang 已提交
622
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

X
Xiaoyu Wang 已提交
643
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
644

X
Xiaoyu Wang 已提交
645 646
static void freeItem(void *param) {
  void *pItem = *(void **)param;
647 648 649
  if (pItem != NULL) {
    taosMemoryFree(pItem);
  }
650 651
}

652 653 654 655
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

656 657
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
658

659
  int64_t        consumerId = subscribe.consumerId;
660
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
661
  SMqConsumerObj *pExistedConsumer = NULL;
662
  SMqConsumerObj *pConsumerNew = NULL;
wmmhello's avatar
wmmhello 已提交
663
  STrans         *pTrans       = NULL;
664 665

  int32_t code = -1;
666 667
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
668
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
669

670
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
wmmhello's avatar
wmmhello 已提交
671
  for(int i = 0; i < newTopicNum; i++){
wmmhello's avatar
wmmhello 已提交
672
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char*)taosArrayGetP(pTopicList, i));
wmmhello's avatar
wmmhello 已提交
673
    if(gNum >= MND_MAX_GROUP_PER_TOPIC){
wmmhello's avatar
wmmhello 已提交
674 675 676 677 678
      terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
      code = terrno;
      goto _over;
    }
  }
679

H
Haojun Liao 已提交
680
  // check topic existence
wmmhello's avatar
wmmhello 已提交
681
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
682 683 684
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
685

686 687 688
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
689 690
  }

H
Haojun Liao 已提交
691 692
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
X
Xiaoyu Wang 已提交
693 694
    mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
          subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
L
Liu Jicong 已提交
695

696
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
697
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
698

699 700 701 702 703
    pConsumerNew->withTbName = subscribe.withTbName;
    pConsumerNew->autoCommit = subscribe.autoCommit;
    pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
    pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;

wmmhello's avatar
wmmhello 已提交
704
//  pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;   // use insert logic
H
Haojun Liao 已提交
705
    taosArrayDestroy(pConsumerNew->assignedTopics);
706 707 708
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
709
    taosArrayDestroy(pConsumerNew->rebNewTopics);
710
    pConsumerNew->rebNewTopics = pTopicList;
711 712
    subscribe.topicNames = NULL;

713 714
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
715 716

  } else {
H
Haojun Liao 已提交
717
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
718

X
Xiaoyu Wang 已提交
719 720 721
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
          " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
722

wmmhello's avatar
wmmhello 已提交
723
    if (status != MQ_CONSUMER_STATUS_READY) {
724
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
725
      goto _over;
726 727 728 729
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
730
      goto _over;
731
    }
H
Haojun Liao 已提交
732

733
    // set the update type
wmmhello's avatar
wmmhello 已提交
734
    pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;
H
Haojun Liao 已提交
735
    taosArrayDestroy(pConsumerNew->assignedTopics);
736
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
737

wmmhello's avatar
wmmhello 已提交
738
    int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
739 740 741 742

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
743
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
744 745 746 747
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
748
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
749 750 751 752
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
753
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
754
        char *newTopic = taosArrayGetP(pTopicList, j);
755
        int   comp = strcmp(oldTopic, newTopic);
756 757 758 759 760
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
761
          char *oldTopicCopy = taosStrdup(oldTopic);
762 763 764 765
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
766
          char *newTopicCopy = taosStrdup(newTopic);
767 768 769 770 771 772 773
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

774 775
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
776
      goto _over;
777 778
    }

779 780
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
781 782
  }

S
Shengliang Guan 已提交
783
  code = TSDB_CODE_ACTION_IN_PROGRESS;
784

785
_over:
L
Liu Jicong 已提交
786 787
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
788 789 790
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
791
  }
H
Haojun Liao 已提交
792

wmmhello's avatar
wmmhello 已提交
793
  tDeleteSMqConsumerObj(pConsumerNew, true);
794

795
  // TODO: replace with destroy subscribe msg
796
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
797
  return code;
L
Liu Jicong 已提交
798 799
}

L
Liu Jicong 已提交
800 801
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
802 803

  void   *buf = NULL;
L
Liu Jicong 已提交
804
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
805 806 807
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
808
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
809

wafwerar's avatar
wafwerar 已提交
810
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
811 812
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
813
  void *abuf = buf;
L
Liu Jicong 已提交
814 815
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
816
  int32_t dataPos = 0;
L
Liu Jicong 已提交
817 818
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
819 820
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
821

L
Liu Jicong 已提交
822 823
  terrno = TSDB_CODE_SUCCESS;

824
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
825
  taosMemoryFreeClear(buf);
826
  if (terrno != 0) {
827
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
828 829 830 831
    sdbFreeRaw(pRaw);
    return NULL;
  }

832
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
833 834 835
  return pRaw;
}

L
Liu Jicong 已提交
836
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
837 838 839
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
840

H
Haojun Liao 已提交
841
  terrno = 0;
L
Liu Jicong 已提交
842
  int8_t sver = 0;
H
Haojun Liao 已提交
843 844 845
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
846

847
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
L
Liu Jicong 已提交
848
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
849
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
850 851
  }

852
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
853 854 855
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
856

857
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
858 859 860
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
861 862

  int32_t dataPos = 0;
L
Liu Jicong 已提交
863
  int32_t len;
L
Liu Jicong 已提交
864
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
865
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
866 867 868 869 870
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
871 872
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
873

874
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
H
Haojun Liao 已提交
875
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
876 877
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
878

H
Haojun Liao 已提交
879
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
880

L
Liu Jicong 已提交
881
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
882
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
883
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
884 885
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
886
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
887
  }
L
Liu Jicong 已提交
888 889 890 891

  return pRow;
}

L
Liu Jicong 已提交
892
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
893
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
H
Haojun Liao 已提交
894 895
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
wmmhello's avatar
wmmhello 已提交
896
  pConsumer->subscribeTime = taosGetTimestampMs();
L
Liu Jicong 已提交
897 898 899
  return 0;
}

L
Liu Jicong 已提交
900
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
901
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
X
Xiaoyu Wang 已提交
902
         mndConsumerStatusName(pConsumer->status));
wmmhello's avatar
wmmhello 已提交
903
  tDeleteSMqConsumerObj(pConsumer, false);
L
Liu Jicong 已提交
904 905 906
  return 0;
}

X
Xiaoyu Wang 已提交
907
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
908 909 910
  int32_t status = pConsumer->status;

  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
911
    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
wmmhello's avatar
wmmhello 已提交
912 913 914
      pConsumer->status = MQ_CONSUMER_STATUS_READY;
    } else if (status == MQ_CONSUMER_STATUS_READY) {
      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
915 916 917 918 919
    }
  }
}

// remove from new topic
X
Xiaoyu Wang 已提交
920
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
921
  int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
922
  for (int32_t i = 0; i < size; i++) {
923 924 925 926 927
    char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebNewTopics, i);
      taosMemoryFree(p);

wmmhello's avatar
wmmhello 已提交
928
      mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
X
Xiaoyu Wang 已提交
929
             pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
930 931 932 933 934 935
      break;
    }
  }
}

// remove from removed topic
X
Xiaoyu Wang 已提交
936
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
937 938 939 940 941 942
  int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
  for (int32_t i = 0; i < size; i++) {
    char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebRemovedTopics, i);
      taosMemoryFree(p);
943

wmmhello's avatar
wmmhello 已提交
944
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
945
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
946 947 948 949 950
      break;
    }
  }
}

951 952 953 954 955 956 957 958
static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
  int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < sz; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
    if (strcmp(pTopic, topic) == 0) {
      taosArrayRemove(pConsumer->currentTopics, i);
      taosMemoryFree(topic);

wmmhello's avatar
wmmhello 已提交
959
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
960 961 962 963 964 965 966 967 968 969 970 971 972 973
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
      break;
    }
  }
}

static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
  bool    existing = false;
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < size; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);

    if (strcmp(topic, pTopic) == 0) {
      existing = true;
974 975 976
      break;
    }
  }
977 978

  return existing;
979 980
}

L
Liu Jicong 已提交
981
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
wmmhello's avatar
wmmhello 已提交
982 983
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
L
Liu Jicong 已提交
984

985 986
  taosWLockLatch(&pOldConsumer->lock);

wmmhello's avatar
wmmhello 已提交
987
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB_MODIFY) {
988 989 990
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
991

wmmhello's avatar
wmmhello 已提交
992
    pOldConsumer->subscribeTime = taosGetTimestampMs();
993
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
994
    mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer",pOldConsumer->consumerId);
wmmhello's avatar
wmmhello 已提交
995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006
//  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
//    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
//    for (int32_t i = 0; i < sz; i++) {
//      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
//      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
//    }
//
//    int32_t prevStatus = pOldConsumer->status;
//    pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
//    mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
//           pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
//           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
wmmhello's avatar
wmmhello 已提交
1007
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) {
L
Liu Jicong 已提交
1008 1009
    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
1010
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
1011 1012 1013
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

1014
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
1015 1016
    mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_NOTOPIC) {
1017
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
1018

wmmhello's avatar
wmmhello 已提交
1019 1020 1021
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
    mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_TOPIC) {
1022
    char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
1023

1024
    // check if exist in current topic
1025
    removeFromNewTopicList(pOldConsumer, pNewTopic);
1026 1027

    // add to current topic
1028 1029
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
    if (existing) {
wmmhello's avatar
wmmhello 已提交
1030
      mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
1031 1032
      taosMemoryFree(pNewTopic);
    } else {  // added into current topic list
1033
      taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
L
Liu Jicong 已提交
1034 1035
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
    }
H
Haojun Liao 已提交
1036

1037
    // set status
H
Haojun Liao 已提交
1038
    int32_t status = pOldConsumer->status;
1039
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1040

H
Haojun Liao 已提交
1041
    // the re-balance is triggered when the new consumer is launched.
wmmhello's avatar
wmmhello 已提交
1042
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1043

1044
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
wmmhello's avatar
wmmhello 已提交
1045
    mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
1046
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1047
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1048 1049 1050 1051
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));

wmmhello's avatar
wmmhello 已提交
1052
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_REMOVE) {
1053 1054 1055
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // remove from removed topic
1056
    removeFromRemoveTopicList(pOldConsumer, removedTopic);
1057 1058

    // remove from current topic
1059
    removeFromCurrentTopicList(pOldConsumer, removedTopic);
1060 1061

    // set status
H
Haojun Liao 已提交
1062
    int32_t status = pOldConsumer->status;
1063
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1064

wmmhello's avatar
wmmhello 已提交
1065
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
1066
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1067

wmmhello's avatar
wmmhello 已提交
1068
    mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
X
Xiaoyu Wang 已提交
1069
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1070
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1071 1072 1073
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1074 1075 1076
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1077 1078 1079
  return 0;
}

L
Liu Jicong 已提交
1080
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1081 1082
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1083
  if (pConsumer == NULL) {
1084
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1085 1086 1087 1088
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1089
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1090 1091 1092
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1093

S
Shengliang Guan 已提交
1094 1095
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1096 1097 1098 1099 1100 1101
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1102 1103 1104 1105
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1106
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1107
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1108 1109 1110
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1111 1112

    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
1113
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1114

L
Liu Jicong 已提交
1115 1116 1117 1118 1119 1120 1121
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1122 1123 1124 1125
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1126 1127 1128 1129 1130
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
wmmhello's avatar
wmmhello 已提交
1131 1132 1133 1134
      char        consumerIdHex[32] = {0};
      sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumer->consumerId);
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));

L
Liu Jicong 已提交
1135
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1136
      colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false);
L
Liu Jicong 已提交
1137

L
Liu Jicong 已提交
1138 1139
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1140 1141
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1142
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1143
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1144

1145
      // client id
L
Liu Jicong 已提交
1146
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1147 1148
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1149
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1150
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1151 1152

      // status
X
Xiaoyu Wang 已提交
1153 1154
      char        status[20 + VARSTR_HEADER_SIZE] = {0};
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
1155 1156
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1157
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1158
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1159 1160 1161 1162 1163 1164

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1165
        STR_TO_VARSTR(topic, topicName);
1166
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1167
      } else {
1168
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1169 1170 1171
      }

      // end point
L
Liu Jicong 已提交
1172
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1173
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1174 1175 1176

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1177
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false);
L
Liu Jicong 已提交
1178 1179 1180

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1181
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1182 1183 1184

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1185
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1186

1187
      char buf[TSDB_OFFSET_LEN] = {0};
1188
      STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
1189 1190 1191
      tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);

      char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
wmmhello's avatar
wmmhello 已提交
1192
      sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
1193
      varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
1194

1195
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1196
      colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false);
1197

L
Liu Jicong 已提交
1198 1199
      numOfRows++;
    }
1200

L
Liu Jicong 已提交
1201 1202
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1203 1204

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
wmmhello's avatar
wmmhello 已提交
1218
    case MQ_CONSUMER_STATUS_READY:
L
Liu Jicong 已提交
1219
      return "ready";
wmmhello's avatar
wmmhello 已提交
1220
    case MQ_CONSUMER_STATUS_LOST:
L
Liu Jicong 已提交
1221
      return "lost";
1222
    case MQ_CONSUMER_STATUS_REBALANCE:
L
Liu Jicong 已提交
1223 1224 1225 1226 1227
      return "rebalancing";
    default:
      return "unknown";
  }
}