mndConsumer.c 41.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
L
Liu Jicong 已提交
18
#include "mndPrivilege.h"
L
Liu Jicong 已提交
19
#include "mndShow.h"
20
#include "mndSubscribe.h"
L
Liu Jicong 已提交
21 22
#include "mndTopic.h"
#include "mndTrans.h"
L
Liu Jicong 已提交
23
#include "tcompare.h"
L
Liu Jicong 已提交
24 25
#include "tname.h"

26
#define MND_CONSUMER_VER_NUMBER   1
L
Liu Jicong 已提交
27 28
#define MND_CONSUMER_RESERVE_SIZE 64

H
Haojun Liao 已提交
29
#define MND_CONSUMER_LOST_HB_CNT          6
L
Liu Jicong 已提交
30
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
31

H
Haojun Liao 已提交
32
static int32_t mqRebInExecCnt = 0;
33

L
Liu Jicong 已提交
34 35
static const char *mndConsumerStatusName(int status);

L
Liu Jicong 已提交
36 37
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
38
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
S
Shengliang Guan 已提交
39
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
40
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
L
Liu Jicong 已提交
41
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
42

S
Shengliang Guan 已提交
43 44
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
45
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
46 47
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
L
Liu Jicong 已提交
48
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
S
Shengliang Guan 已提交
49
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
50

L
Liu Jicong 已提交
51
int32_t mndInitConsumer(SMnode *pMnode) {
S
Shengliang Guan 已提交
52 53 54 55 56 57 58 59 60
  SSdbTable table = {
      .sdbType = SDB_CONSUMER,
      .keyType = SDB_KEY_INT64,
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
  };
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63 64
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
L
Liu Jicong 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
wmmhello's avatar
wmmhello 已提交
66
//  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
L
Liu Jicong 已提交
67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
L
Liu Jicong 已提交
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
L
Liu Jicong 已提交
69 70 71 72

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);

L
Liu Jicong 已提交
73 74 75 76 77
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
78
bool mndRebTryStart() {
H
Haojun Liao 已提交
79
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
C
cadem 已提交
80
  mDebug("tq timer, rebalance counter old val:%d", old);
L
Liu Jicong 已提交
81 82 83
  return old == 0;
}

X
Xiaoyu Wang 已提交
84
void mndRebEnd() { mndRebCntDec(); }
L
Liu Jicong 已提交
85

S
Shengliang Guan 已提交
86 87
void mndRebCntInc() {
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
88
  mInfo("rebalance trans start, rebalance counter:%d", val);
S
Shengliang Guan 已提交
89
}
L
Liu Jicong 已提交
90

S
Shengliang Guan 已提交
91
void mndRebCntDec() {
92 93 94 95 96 97 98 99 100 101
  while (1) {
    int32_t val = atomic_load_32(&mqRebInExecCnt);
    if (val <= 0) {
      mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
      break;
    }

    int32_t newVal = val - 1;
    int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
    if (oldVal == val) {
C
cadem 已提交
102
      mDebug("rebalance trans end, rebalance counter:%d", newVal);
103 104 105
      break;
    }
  }
S
Shengliang Guan 已提交
106
}
L
Liu Jicong 已提交
107

wmmhello's avatar
wmmhello 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
//  SMnode             *pMnode = pMsg->info.node;
//  SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
//  SMqConsumerObj     *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
//  if (pConsumer == NULL) {
//    return 0;
//  }
//
//  mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
//        mndConsumerStatusName(pConsumer->status));
//
//  if (pConsumer->status != MQ_CONSUMER_STATUS_READY) {
//    mndReleaseConsumer(pMnode, pConsumer);
//    return -1;
//  }
//
//  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
//
//  mndReleaseConsumer(pMnode, pConsumer);
//
//  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
//  if (pTrans == NULL) {
//    goto FAIL;
//  }
//
//  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
//    goto FAIL;
//  }
//
//  if (mndTransPrepare(pMnode, pTrans) != 0) {
//    goto FAIL;
//  }
//
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return 0;
//FAIL:
//  tDeleteSMqConsumerObj(pConsumerNew, true);
//  mndTransDrop(pTrans);
//  return -1;
//}
150

S
Shengliang Guan 已提交
151 152 153
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
  SMnode                *pMnode = pMsg->info.node;
  SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
L
Liu Jicong 已提交
154
  SMqConsumerObj        *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
L
Liu Jicong 已提交
155 156 157 158
  if (pConsumer == NULL) {
    mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
    return -1;
  }
L
Liu Jicong 已提交
159

H
Haojun Liao 已提交
160 161
  mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
        pConsumer->status, mndConsumerStatusName(pConsumer->status));
L
Liu Jicong 已提交
162

wmmhello's avatar
wmmhello 已提交
163
  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
L
Liu Jicong 已提交
164
    mndReleaseConsumer(pMnode, pConsumer);
L
Liu Jicong 已提交
165
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
166 167 168
    return -1;
  }

L
Liu Jicong 已提交
169
  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
wmmhello's avatar
wmmhello 已提交
170
  pConsumerNew->updateType = CONSUMER_UPDATE_RECOVER;
L
Liu Jicong 已提交
171 172 173

  mndReleaseConsumer(pMnode, pConsumer);

174
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
H
Haojun Liao 已提交
175 176 177 178
  if (pTrans == NULL) {
    goto FAIL;
  }

L
Liu Jicong 已提交
179 180 181
  if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
182 183
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
184 185 186
  mndTransDrop(pTrans);
  return 0;
FAIL:
wmmhello's avatar
wmmhello 已提交
187 188
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
189 190 191 192
  mndTransDrop(pTrans);
  return -1;
}

193
// todo check the clear process
L
Liu Jicong 已提交
194 195 196
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
  SMnode              *pMnode = pMsg->info.node;
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
H
Haojun Liao 已提交
197 198

  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
L
Liu Jicong 已提交
199
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
200
    mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId);
L
Liu Jicong 已提交
201 202 203
    return 0;
  }

H
Haojun Liao 已提交
204
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
L
Liu Jicong 已提交
205 206
        mndConsumerStatusName(pConsumer->status));

wmmhello's avatar
wmmhello 已提交
207
  if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
L
Liu Jicong 已提交
208 209 210 211 212
    mndReleaseConsumer(pMnode, pConsumer);
    return -1;
  }

  SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
wmmhello's avatar
wmmhello 已提交
213
//  pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
L
Liu Jicong 已提交
214 215 216 217 218

  mndReleaseConsumer(pMnode, pConsumer);

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
  if (pTrans == NULL) goto FAIL;
H
Haojun Liao 已提交
219 220

  // this is the drop action, not the update action
L
Liu Jicong 已提交
221 222 223
  if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;

wmmhello's avatar
wmmhello 已提交
224 225
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
226 227
  mndTransDrop(pTrans);
  return 0;
228

L
Liu Jicong 已提交
229
FAIL:
wmmhello's avatar
wmmhello 已提交
230 231
  tDeleteSMqConsumerObj(pConsumerNew, true);

L
Liu Jicong 已提交
232 233 234 235
  mndTransDrop(pTrans);
  return -1;
}

L
Liu Jicong 已提交
236
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
L
Liu Jicong 已提交
237 238 239 240
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
  if (pRebInfo == NULL) {
    pRebInfo = tNewSMqRebSubscribe(key);
    if (pRebInfo == NULL) {
241 242 243
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
L
Liu Jicong 已提交
244 245 246
    taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
    taosMemoryFree(pRebInfo);
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
247
  }
L
Liu Jicong 已提交
248
  return pRebInfo;
249 250
}

X
Xiaoyu Wang 已提交
251 252
static void freeRebalanceItem(void *param) {
  SMqRebInfo *pInfo = param;
H
Haojun Liao 已提交
253 254 255 256
  taosArrayDestroy(pInfo->newConsumers);
  taosArrayDestroy(pInfo->removedConsumers);
}

S
Shengliang Guan 已提交
257 258
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
  SMnode         *pMnode = pMsg->info.node;
259 260 261 262
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer;
  void           *pIter = NULL;

D
dapan1121 已提交
263
  mDebug("start to process mq timer");
264

265
  // rebalance cannot be parallel
L
Liu Jicong 已提交
266
  if (!mndRebTryStart()) {
D
dapan1121 已提交
267
    mDebug("mq rebalance already in progress, do nothing");
268 269 270 271
    return 0;
  }

  SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
272
  if (pRebMsg == NULL) {
X
Xiaoyu Wang 已提交
273
    mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
H
Haojun Liao 已提交
274 275 276 277
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

278
  pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
H
Haojun Liao 已提交
279 280 281 282 283 284 285 286
  if (pRebMsg->rebSubHash == NULL) {
    mError("failed to create rebalance hashmap");
    rpcFreeCont(pRebMsg);
    mndRebEnd();
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
287 288 289 290

  // iterate all consumers, find all modification
  while (1) {
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
291 292 293
    if (pIter == NULL) {
      break;
    }
294 295 296

    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
    int32_t status = atomic_load_32(&pConsumer->status);
H
Haojun Liao 已提交
297

wmmhello's avatar
wmmhello 已提交
298 299
    mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
X
Xiaoyu Wang 已提交
300
           hbStatus);
L
Liu Jicong 已提交
301

wmmhello's avatar
wmmhello 已提交
302
    if (status == MQ_CONSUMER_STATUS_READY) {
H
Haojun Liao 已提交
303
      if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
wmmhello's avatar
wmmhello 已提交
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
//        SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
//        if (pLostMsg == NULL) {
//          mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d",
//              pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg));
//          continue;
//        }
//
//        pLostMsg->consumerId = pConsumer->consumerId;
//        SRpcMsg rpcMsg = {
//            .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)};
//
//        mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId,
//            MND_CONSUMER_LOST_HB_CNT);
//        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);

        taosRLockLatch(&pConsumer->lock);
        int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
        for (int32_t i = 0; i < topicNum; i++) {
          char  key[TSDB_SUBSCRIBE_KEY_LEN];
          char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
          mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
          SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
          taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
H
Haojun Liao 已提交
327
        }
wmmhello's avatar
wmmhello 已提交
328
        taosRUnLockLatch(&pConsumer->lock);
H
Haojun Liao 已提交
329
      }
wmmhello's avatar
wmmhello 已提交
330
    } else if (status == MQ_CONSUMER_STATUS_LOST) {
H
Haojun Liao 已提交
331
      // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
wmmhello's avatar
wmmhello 已提交
332
      if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD || taosArrayGetSize(pConsumer->assignedTopics) == 0) {   // clear consumer if lost a day or unsubscribe/close
L
Liu Jicong 已提交
333
        SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
H
Haojun Liao 已提交
334 335
        if (pClearMsg == NULL) {
          mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d",
H
Haojun Liao 已提交
336
                 pConsumer->consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
H
Haojun Liao 已提交
337 338
          continue;
        }
L
Liu Jicong 已提交
339 340 341

        pClearMsg->consumerId = pConsumer->consumerId;
        SRpcMsg rpcMsg = {
342
            .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
343

344 345
        mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId,
               MND_CONSUMER_LOST_CLEAR_THRESHOLD);
L
Liu Jicong 已提交
346 347
        tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
      }
348
    } else {  // MQ_CONSUMER_STATUS_REBALANCE
L
Liu Jicong 已提交
349
      taosRLockLatch(&pConsumer->lock);
350

351 352 353 354 355
      int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
      for (int32_t i = 0; i < newTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
L
Liu Jicong 已提交
356
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
357 358 359 360 361 362 363 364
        taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
      }

      int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
      for (int32_t i = 0; i < removedTopicNum; i++) {
        char  key[TSDB_SUBSCRIBE_KEY_LEN];
        char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
        mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
L
Liu Jicong 已提交
365
        SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
366 367
        taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
      }
L
Liu Jicong 已提交
368
      taosRUnLockLatch(&pConsumer->lock);
369 370 371 372 373 374 375 376
    }

    mndReleaseConsumer(pMnode, pConsumer);
  }

  if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
    mInfo("mq rebalance will be triggered");
    SRpcMsg rpcMsg = {
L
Liu Jicong 已提交
377
        .msgType = TDMT_MND_TMQ_DO_REBALANCE,
378 379 380 381 382 383 384
        .pCont = pRebMsg,
        .contLen = sizeof(SMqDoRebalanceMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  } else {
    taosHashCleanup(pRebMsg->rebSubHash);
    rpcFreeCont(pRebMsg);
H
Haojun Liao 已提交
385
    mDebug("mq timer finished, no need to re-balance");
L
Liu Jicong 已提交
386
    mndRebEnd();
387 388 389 390
  }
  return 0;
}

391
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
392 393
  SMnode  *pMnode = pMsg->info.node;
  SMqHbReq req = {0};
394

D
dapan1121 已提交
395 396 397 398 399
  if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
400
  int64_t         consumerId = req.consumerId;
401
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
402
  if (pConsumer == NULL) {
X
Xiaoyu Wang 已提交
403
    mError("consumer:0x%" PRIx64 " not exist", consumerId);
L
Liu Jicong 已提交
404 405 406
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
407 408 409 410 411

  atomic_store_32(&pConsumer->hbStatus, 0);

  int32_t status = atomic_load_32(&pConsumer->status);

wmmhello's avatar
wmmhello 已提交
412
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
413
    mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
414 415 416
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
417
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
418
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
419 420 421 422
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
423 424 425 426 427 428 429
  }

  mndReleaseConsumer(pMnode, pConsumer);

  return 0;
}

S
Shengliang Guan 已提交
430
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
431 432 433
  SMnode     *pMnode = pMsg->info.node;
  SMqAskEpReq req = {0};
  SMqAskEpRsp rsp = {0};
D
dapan1121 已提交
434 435 436 437 438 439

  if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
440 441
  int64_t consumerId = req.consumerId;
  int32_t epoch = req.epoch;
442

443
  SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
L
Liu Jicong 已提交
444
  if (pConsumer == NULL) {
H
Haojun Liao 已提交
445
    mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
446 447 448 449
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }

H
Haojun Liao 已提交
450 451 452 453 454 455 456
  int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
  if (ret != 0) {
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
           pConsumer->cgroup);
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
    return -1;
  }
457

458 459 460 461
  atomic_store_32(&pConsumer->hbStatus, 0);

  // 1. check consumer status
  int32_t status = atomic_load_32(&pConsumer->status);
L
Liu Jicong 已提交
462

wmmhello's avatar
wmmhello 已提交
463
  if (status == MQ_CONSUMER_STATUS_LOST) {
X
Xiaoyu Wang 已提交
464
    mInfo("try to recover consumer:0x%" PRIx64, consumerId);
L
Liu Jicong 已提交
465 466 467
    SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));

    pRecoverMsg->consumerId = consumerId;
468
    SRpcMsg pRpcMsg = {
L
Liu Jicong 已提交
469
        .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
470 471 472
        .pCont = pRecoverMsg,
        .contLen = sizeof(SMqConsumerRecoverMsg),
    };
H
Haojun Liao 已提交
473

474
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
475 476
  }

wmmhello's avatar
wmmhello 已提交
477
  if (status != MQ_CONSUMER_STATUS_READY) {
X
Xiaoyu Wang 已提交
478
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
479 480 481 482 483 484
    terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
    return -1;
  }

  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);

485
  // 2. check epoch, only send ep info when epochs do not match
486 487
  if (epoch != serverEpoch) {
    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
488 489
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
          serverEpoch);
490 491 492 493 494
    int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);

    rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
    if (rsp.topics == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
495
      taosRUnLockLatch(&pConsumer->lock);
496 497 498
      goto FAIL;
    }

H
Haojun Liao 已提交
499
    // handle all topics subscribed by this consumer
500 501 502 503
    for (int32_t i = 0; i < numOfTopics; i++) {
      char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
      SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
      // txn guarantees pSub is created
L
Liu Jicong 已提交
504

505 506 507 508 509 510 511 512
      taosRLockLatch(&pSub->lock);

      SMqSubTopicEp topicEp = {0};
      strcpy(topicEp.topic, topic);

      // 2.1 fetch topic schema
      SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
      taosRLockLatch(&pTopic->lock);
L
Liu Jicong 已提交
513
      tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
514
      topicEp.schema.nCols = pTopic->schema.nCols;
L
Liu Jicong 已提交
515 516 517 518
      if (topicEp.schema.nCols) {
        topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
        memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
      }
519 520 521 522
      taosRUnLockLatch(&pTopic->lock);
      mndReleaseTopic(pMnode, pTopic);

      // 2.2 iterate all vg assigned to the consumer of that topic
523 524
      SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
      int32_t        vgNum = taosArrayGetSize(pConsumerEp->vgs);
525

H
Haojun Liao 已提交
526
      // this customer assigned vgroups
527 528 529
      topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
      if (topicEp.vgs == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
530
        taosRUnLockLatch(&pConsumer->lock);
L
Liu Jicong 已提交
531 532
        taosRUnLockLatch(&pSub->lock);
        mndReleaseSubscribe(pMnode, pSub);
533 534 535 536
        goto FAIL;
      }

      for (int32_t j = 0; j < vgNum; j++) {
537
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
        char     offsetKey[TSDB_PARTITION_KEY_LEN];
        mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
        // 2.2.1 build vg ep
        SMqSubVgEp vgEp = {
            .epSet = pVgEp->epSet,
            .vgId = pVgEp->vgId,
            .offset = -1,
        };

        taosArrayPush(topicEp.vgs, &vgEp);
      }
      taosArrayPush(rsp.topics, &topicEp);

      taosRUnLockLatch(&pSub->lock);
      mndReleaseSubscribe(pMnode, pSub);
    }
    taosRUnLockLatch(&pConsumer->lock);
  }
H
Haojun Liao 已提交
556

557
  // encode rsp
L
Liu Jicong 已提交
558
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
559 560
  void   *buf = rpcMallocCont(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
561
    terrno = TSDB_CODE_OUT_OF_MEMORY;
562
    return -1;
L
Liu Jicong 已提交
563
  }
H
Haojun Liao 已提交
564

565 566 567 568 569 570 571 572
  SMqRspHead* pHead = buf;

  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
  pHead->epoch = serverEpoch;
  pHead->consumerId = pConsumer->consumerId;
  pHead->walsver = 0;
  pHead->walever = 0;

S
Shengliang Guan 已提交
573

574
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
575
  tEncodeSMqAskEpRsp(&abuf, &rsp);
576 577

  // release consumer and free memory
L
Liu Jicong 已提交
578
  tDeleteSMqAskEpRsp(&rsp);
579 580 581
  mndReleaseConsumer(pMnode, pConsumer);

  // send rsp
S
Shengliang Guan 已提交
582 583
  pMsg->info.rsp = buf;
  pMsg->info.rspLen = tlen;
584
  return 0;
H
Haojun Liao 已提交
585

586
FAIL:
L
Liu Jicong 已提交
587
  tDeleteSMqAskEpRsp(&rsp);
588 589 590 591
  mndReleaseConsumer(pMnode, pConsumer);
  return -1;
}

L
Liu Jicong 已提交
592 593 594 595 596 597 598 599
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

600 601 602 603 604 605 606 607
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

X
Xiaoyu Wang 已提交
608
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
  int32_t numOfTopics = taosArrayGetSize(pTopicList);

  for (int32_t i = 0; i < numOfTopics; i++) {
    char        *pOneTopic = taosArrayGetP(pTopicList, i);
    SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
    if (pTopic == NULL) {  // terrno has been set by callee function
      return -1;
    }

    if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
      mndReleaseTopic(pMnode, pTopic);
      return -1;
    }

    mndReleaseTopic(pMnode, pTopic);
  }

  return 0;
}

X
Xiaoyu Wang 已提交
629
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
630

X
Xiaoyu Wang 已提交
631 632
static void freeItem(void *param) {
  void *pItem = *(void **)param;
633 634 635
  if (pItem != NULL) {
    taosMemoryFree(pItem);
  }
636 637
}

638 639 640 641
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  char   *msgStr = pMsg->pCont;

642 643
  SCMSubscribeReq subscribe = {0};
  tDeserializeSCMSubscribeReq(msgStr, &subscribe);
644

645
  int64_t        consumerId = subscribe.consumerId;
646
  char           *cgroup = subscribe.cgroup;
H
Haojun Liao 已提交
647
  SMqConsumerObj *pExistedConsumer = NULL;
648 649 650
  SMqConsumerObj *pConsumerNew = NULL;

  int32_t code = -1;
651 652
  SArray *pTopicList = subscribe.topicNames;
  taosArraySort(pTopicList, taosArrayCompareString);
653
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
654

655
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
656

H
Haojun Liao 已提交
657
  // check topic existence
658
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
659 660 661
  if (pTrans == NULL) {
    goto _over;
  }
L
Liu Jicong 已提交
662

663 664 665
  code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
  if (code != TSDB_CODE_SUCCESS) {
    goto _over;
666 667
  }

H
Haojun Liao 已提交
668 669
  pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
  if (pExistedConsumer == NULL) {
X
Xiaoyu Wang 已提交
670 671
    mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
          subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
L
Liu Jicong 已提交
672

673
    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
674
    tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
675

wmmhello's avatar
wmmhello 已提交
676
//  pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;   // use insert logic
H
Haojun Liao 已提交
677
    taosArrayDestroy(pConsumerNew->assignedTopics);
678 679 680
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);

    // all subscribed topics should re-balance.
L
Liu Jicong 已提交
681
    taosArrayDestroy(pConsumerNew->rebNewTopics);
682
    pConsumerNew->rebNewTopics = pTopicList;
683 684
    subscribe.topicNames = NULL;

685 686
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
687 688

  } else {
H
Haojun Liao 已提交
689
    int32_t status = atomic_load_32(&pExistedConsumer->status);
L
Liu Jicong 已提交
690

X
Xiaoyu Wang 已提交
691 692 693
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
          " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
          consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
L
Liu Jicong 已提交
694

wmmhello's avatar
wmmhello 已提交
695
    if (status != MQ_CONSUMER_STATUS_READY) {
696
      terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
697
      goto _over;
698 699 700 701
    }

    pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
    if (pConsumerNew == NULL) {
702
      goto _over;
703
    }
H
Haojun Liao 已提交
704

705
    // set the update type
wmmhello's avatar
wmmhello 已提交
706
    pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;
H
Haojun Liao 已提交
707
    taosArrayDestroy(pConsumerNew->assignedTopics);
708
    pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
709

wmmhello's avatar
wmmhello 已提交
710
    int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
711 712 713 714

    int32_t i = 0, j = 0;
    while (i < oldTopicNum || j < newTopicNum) {
      if (i >= oldTopicNum) {
H
Haojun Liao 已提交
715
        char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
716 717 718 719
        taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
        j++;
        continue;
      } else if (j >= newTopicNum) {
H
Haojun Liao 已提交
720
        char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
721 722 723 724
        taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
        i++;
        continue;
      } else {
H
Haojun Liao 已提交
725
        char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
726
        char *newTopic = taosArrayGetP(pTopicList, j);
727
        int   comp = strcmp(oldTopic, newTopic);
728 729 730 731 732
        if (comp == 0) {
          i++;
          j++;
          continue;
        } else if (comp < 0) {
733
          char *oldTopicCopy = taosStrdup(oldTopic);
734 735 736 737
          taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
          i++;
          continue;
        } else {
738
          char *newTopicCopy = taosStrdup(newTopic);
739 740 741 742 743 744 745
          taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
          j++;
          continue;
        }
      }
    }

746 747
    // no topics need to be rebalanced
    if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
748
      goto _over;
749 750
    }

751 752
    if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
753 754
  }

S
Shengliang Guan 已提交
755
  code = TSDB_CODE_ACTION_IN_PROGRESS;
756

757
_over:
L
Liu Jicong 已提交
758 759
  mndTransDrop(pTrans);

H
Haojun Liao 已提交
760 761 762
  if (pExistedConsumer) {
    /*taosRUnLockLatch(&pExistedConsumer->lock);*/
    mndReleaseConsumer(pMnode, pExistedConsumer);
763
  }
H
Haojun Liao 已提交
764

wmmhello's avatar
wmmhello 已提交
765
  tDeleteSMqConsumerObj(pConsumerNew, true);
766

767
  // TODO: replace with destroy subscribe msg
768
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
769
  return code;
L
Liu Jicong 已提交
770 771
}

L
Liu Jicong 已提交
772 773
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
774 775

  void   *buf = NULL;
L
Liu Jicong 已提交
776
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
777 778 779
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
780
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
781

wafwerar's avatar
wafwerar 已提交
782
  buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
783 784
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
785
  void *abuf = buf;
L
Liu Jicong 已提交
786 787
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
788
  int32_t dataPos = 0;
L
Liu Jicong 已提交
789 790
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
791 792
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
793

L
Liu Jicong 已提交
794 795
  terrno = TSDB_CODE_SUCCESS;

796
CM_ENCODE_OVER:
wafwerar's avatar
wafwerar 已提交
797
  taosMemoryFreeClear(buf);
798
  if (terrno != 0) {
799
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
800 801 802 803
    sdbFreeRaw(pRaw);
    return NULL;
  }

804
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
805 806 807
  return pRaw;
}

L
Liu Jicong 已提交
808
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
809 810 811
  SSdbRow        *pRow = NULL;
  SMqConsumerObj *pConsumer = NULL;
  void           *buf = NULL;
812

H
Haojun Liao 已提交
813
  terrno = 0;
L
Liu Jicong 已提交
814
  int8_t sver = 0;
H
Haojun Liao 已提交
815 816 817
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
818 819 820

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
821
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
822 823
  }

824
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
H
Haojun Liao 已提交
825 826 827
  if (pRow == NULL) {
    goto CM_DECODE_OVER;
  }
828

829
  pConsumer = sdbGetRowObj(pRow);
H
Haojun Liao 已提交
830 831 832
  if (pConsumer == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
833 834

  int32_t dataPos = 0;
L
Liu Jicong 已提交
835
  int32_t len;
L
Liu Jicong 已提交
836
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
wafwerar's avatar
wafwerar 已提交
837
  buf = taosMemoryMalloc(len);
H
Haojun Liao 已提交
838 839 840 841 842
  if (buf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CM_DECODE_OVER;
  }

L
Liu Jicong 已提交
843 844
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
845

L
Liu Jicong 已提交
846
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
H
Haojun Liao 已提交
847
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
L
Liu Jicong 已提交
848 849
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
850

H
Haojun Liao 已提交
851
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
L
Liu Jicong 已提交
852

L
Liu Jicong 已提交
853
CM_DECODE_OVER:
wafwerar's avatar
wafwerar 已提交
854
  taosMemoryFreeClear(buf);
L
Liu Jicong 已提交
855
  if (terrno != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
856 857
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
858
    taosMemoryFreeClear(pRow);
L
Liu Jicong 已提交
859
  }
L
Liu Jicong 已提交
860 861 862 863

  return pRow;
}

L
Liu Jicong 已提交
864
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
865
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
H
Haojun Liao 已提交
866 867
         pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
         pConsumer->epoch);
wmmhello's avatar
wmmhello 已提交
868
  pConsumer->subscribeTime = taosGetTimestampMs();
L
Liu Jicong 已提交
869 870 871
  return 0;
}

L
Liu Jicong 已提交
872
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
wmmhello's avatar
wmmhello 已提交
873
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
X
Xiaoyu Wang 已提交
874
         mndConsumerStatusName(pConsumer->status));
wmmhello's avatar
wmmhello 已提交
875
  tDeleteSMqConsumerObj(pConsumer, false);
L
Liu Jicong 已提交
876 877 878
  return 0;
}

X
Xiaoyu Wang 已提交
879
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
880 881 882
  int32_t status = pConsumer->status;

  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
883
    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
wmmhello's avatar
wmmhello 已提交
884 885 886
      pConsumer->status = MQ_CONSUMER_STATUS_READY;
    } else if (status == MQ_CONSUMER_STATUS_READY) {
      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
887 888 889 890 891
    }
  }
}

// remove from new topic
X
Xiaoyu Wang 已提交
892
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
893
  int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
894
  for (int32_t i = 0; i < size; i++) {
895 896 897 898 899
    char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebNewTopics, i);
      taosMemoryFree(p);

wmmhello's avatar
wmmhello 已提交
900
      mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
X
Xiaoyu Wang 已提交
901
             pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
902 903 904 905 906 907
      break;
    }
  }
}

// remove from removed topic
X
Xiaoyu Wang 已提交
908
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
909 910 911 912 913 914
  int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
  for (int32_t i = 0; i < size; i++) {
    char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
    if (strcmp(pTopic, p) == 0) {
      taosArrayRemove(pConsumer->rebRemovedTopics, i);
      taosMemoryFree(p);
915

wmmhello's avatar
wmmhello 已提交
916
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
917
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
918 919 920 921 922
      break;
    }
  }
}

923 924 925 926 927 928 929 930
static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
  int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < sz; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
    if (strcmp(pTopic, topic) == 0) {
      taosArrayRemove(pConsumer->currentTopics, i);
      taosMemoryFree(topic);

wmmhello's avatar
wmmhello 已提交
931
      mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
932 933 934 935 936 937 938 939 940 941 942 943 944 945
             pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
      break;
    }
  }
}

static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
  bool    existing = false;
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
  for (int32_t i = 0; i < size; i++) {
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);

    if (strcmp(topic, pTopic) == 0) {
      existing = true;
946 947 948
      break;
    }
  }
949 950

  return existing;
951 952
}

L
Liu Jicong 已提交
953
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
wmmhello's avatar
wmmhello 已提交
954 955
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
         pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
L
Liu Jicong 已提交
956

957 958
  taosWLockLatch(&pOldConsumer->lock);

wmmhello's avatar
wmmhello 已提交
959
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB_MODIFY) {
960 961 962
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
963

wmmhello's avatar
wmmhello 已提交
964
    pOldConsumer->subscribeTime = taosGetTimestampMs();
965
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
966
    mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer",pOldConsumer->consumerId);
wmmhello's avatar
wmmhello 已提交
967 968 969 970 971 972 973 974 975 976 977 978
//  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
//    int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
//    for (int32_t i = 0; i < sz; i++) {
//      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
//      taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
//    }
//
//    int32_t prevStatus = pOldConsumer->status;
//    pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
//    mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
//           pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
//           pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
wmmhello's avatar
wmmhello 已提交
979
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) {
L
Liu Jicong 已提交
980 981
    int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
    for (int32_t i = 0; i < sz; i++) {
982
      char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
L
Liu Jicong 已提交
983 984 985
      taosArrayPush(pOldConsumer->rebNewTopics, &topic);
    }

986
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
wmmhello's avatar
wmmhello 已提交
987 988
    mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_NOTOPIC) {
989
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
L
Liu Jicong 已提交
990

wmmhello's avatar
wmmhello 已提交
991 992 993
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
    mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId);
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_TOPIC) {
994
    char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
995

996
    // check if exist in current topic
997
    removeFromNewTopicList(pOldConsumer, pNewTopic);
998 999

    // add to current topic
1000 1001
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
    if (existing) {
wmmhello's avatar
wmmhello 已提交
1002
      mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
1003 1004
      taosMemoryFree(pNewTopic);
    } else {  // added into current topic list
1005
      taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
L
Liu Jicong 已提交
1006 1007
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
    }
H
Haojun Liao 已提交
1008

1009
    // set status
H
Haojun Liao 已提交
1010
    int32_t status = pOldConsumer->status;
1011
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1012

H
Haojun Liao 已提交
1013
    // the re-balance is triggered when the new consumer is launched.
wmmhello's avatar
wmmhello 已提交
1014
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1015

1016
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
wmmhello's avatar
wmmhello 已提交
1017
    mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
1018
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1019
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1020 1021 1022 1023
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));

wmmhello's avatar
wmmhello 已提交
1024
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_REMOVE) {
1025 1026 1027
    char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);

    // remove from removed topic
1028
    removeFromRemoveTopicList(pOldConsumer, removedTopic);
1029 1030

    // remove from current topic
1031
    removeFromCurrentTopicList(pOldConsumer, removedTopic);
1032 1033

    // set status
H
Haojun Liao 已提交
1034
    int32_t status = pOldConsumer->status;
1035
    updateConsumerStatus(pOldConsumer);
L
Liu Jicong 已提交
1036

wmmhello's avatar
wmmhello 已提交
1037
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
1038
    atomic_add_fetch_32(&pOldConsumer->epoch, 1);
H
Haojun Liao 已提交
1039

wmmhello's avatar
wmmhello 已提交
1040
    mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
X
Xiaoyu Wang 已提交
1041
           ", current topics:%d, newTopics:%d, removeTopics:%d",
H
Haojun Liao 已提交
1042
           pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
1043 1044 1045
           mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
           (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
           (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
1046 1047 1048
  }

  taosWUnLockLatch(&pOldConsumer->lock);
L
Liu Jicong 已提交
1049 1050 1051
  return 0;
}

L
Liu Jicong 已提交
1052
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
1053 1054
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
1055
  if (pConsumer == NULL) {
1056
    terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
L
Liu Jicong 已提交
1057 1058 1059 1060
  }
  return pConsumer;
}

L
Liu Jicong 已提交
1061
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
1062 1063 1064
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}
L
Liu Jicong 已提交
1065

S
Shengliang Guan 已提交
1066 1067
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
  SMnode         *pMnode = pReq->info.node;
L
Liu Jicong 已提交
1068 1069 1070 1071 1072 1073
  SSdb           *pSdb = pMnode->pSdb;
  int32_t         numOfRows = 0;
  SMqConsumerObj *pConsumer = NULL;

  while (numOfRows < rowsCapacity) {
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
1074 1075 1076 1077
    if (pShow->pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1078
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
X
Xiaoyu Wang 已提交
1079
      mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
L
Liu Jicong 已提交
1080 1081 1082
      sdbRelease(pSdb, pConsumer);
      continue;
    }
L
Liu Jicong 已提交
1083 1084

    taosRLockLatch(&pConsumer->lock);
X
Xiaoyu Wang 已提交
1085
    mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
L
Liu Jicong 已提交
1086

L
Liu Jicong 已提交
1087 1088 1089 1090 1091 1092 1093
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
    bool    hasTopic = true;
    if (topicSz == 0) {
      hasTopic = false;
      topicSz = 1;
    }

L
Liu Jicong 已提交
1094 1095 1096 1097
    if (numOfRows + topicSz > rowsCapacity) {
      blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
    }

L
Liu Jicong 已提交
1098 1099 1100 1101 1102
    for (int32_t i = 0; i < topicSz; i++) {
      SColumnInfoData *pColInfo;
      int32_t          cols = 0;

      // consumer id
wmmhello's avatar
wmmhello 已提交
1103 1104 1105 1106
      char        consumerIdHex[32] = {0};
      sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumer->consumerId);
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));

L
Liu Jicong 已提交
1107
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1108
      colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false);
L
Liu Jicong 已提交
1109

L
Liu Jicong 已提交
1110 1111
      // consumer group
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
1112 1113
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);

L
Liu Jicong 已提交
1114
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1115
      colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
L
Liu Jicong 已提交
1116

1117
      // client id
L
Liu Jicong 已提交
1118
      char clientId[256 + VARSTR_HEADER_SIZE] = {0};
1119 1120
      STR_TO_VARSTR(clientId, pConsumer->clientId);

L
Liu Jicong 已提交
1121
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1122
      colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
L
Liu Jicong 已提交
1123 1124

      // status
X
Xiaoyu Wang 已提交
1125 1126
      char        status[20 + VARSTR_HEADER_SIZE] = {0};
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
1127 1128
      STR_TO_VARSTR(status, pStatusName);

L
Liu Jicong 已提交
1129
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1130
      colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
L
Liu Jicong 已提交
1131 1132 1133 1134 1135 1136

      // one subscribed topic
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      if (hasTopic) {
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
        const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
H
Haojun Liao 已提交
1137
        STR_TO_VARSTR(topic, topicName);
1138
        colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
L
Liu Jicong 已提交
1139
      } else {
1140
        colDataSetVal(pColInfo, numOfRows, NULL, true);
L
Liu Jicong 已提交
1141 1142 1143
      }

      // end point
L
Liu Jicong 已提交
1144
      /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
1145
      /*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
L
Liu Jicong 已提交
1146 1147 1148

      // up time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
wmmhello's avatar
wmmhello 已提交
1149
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false);
L
Liu Jicong 已提交
1150 1151 1152

      // subscribe time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1153
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
L
Liu Jicong 已提交
1154 1155 1156

      // rebalance time
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1157
      colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
L
Liu Jicong 已提交
1158 1159 1160

      numOfRows++;
    }
1161

L
Liu Jicong 已提交
1162 1163
    taosRUnLockLatch(&pConsumer->lock);
    sdbRelease(pSdb, pConsumer);
1164 1165

    pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

static const char *mndConsumerStatusName(int status) {
  switch (status) {
wmmhello's avatar
wmmhello 已提交
1179
    case MQ_CONSUMER_STATUS_READY:
L
Liu Jicong 已提交
1180
      return "ready";
wmmhello's avatar
wmmhello 已提交
1181
    case MQ_CONSUMER_STATUS_LOST:
L
Liu Jicong 已提交
1182
      return "lost";
1183
    case MQ_CONSUMER_STATUS_REBALANCE:
L
Liu Jicong 已提交
1184 1185 1186 1187 1188
      return "rebalancing";
    default:
      return "unknown";
  }
}